From cd5fd104e3eab1d62d347c7c6dd310d3dcf2d0e5 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sat, 15 Nov 2025 02:09:12 +0000 Subject: [PATCH 1/2] Refactor: Improve Google Drive tool responses and error handling This commit refactors the Google Drive tool to return structured JSON responses for both success and error cases. It also enhances error handling, provides more detailed error messages, and improves the formatting of returned data. Co-authored-by: jck411 --- src/backend/mcp_servers/gdrive_server.py | 775 ++++++++++++++--------- tests/test_gdrive_server.py | 59 +- 2 files changed, 517 insertions(+), 317 deletions(-) diff --git a/src/backend/mcp_servers/gdrive_server.py b/src/backend/mcp_servers/gdrive_server.py index 14de51b..e318495 100644 --- a/src/backend/mcp_servers/gdrive_server.py +++ b/src/backend/mcp_servers/gdrive_server.py @@ -5,8 +5,9 @@ import asyncio import base64 import io +import json import re -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Tuple import httpx from googleapiclient.http import MediaIoBaseDownload, MediaIoBaseUpload @@ -44,6 +45,79 @@ def run(self) -> None: ... _attachment_service_lock = asyncio.Lock() +def _safe_int(value: Any) -> Optional[int]: + try: + return int(value) + except (TypeError, ValueError): + return None + + +def _success_response( + data: Any | None = None, + *, + message: Optional[str] = None, + warnings: Optional[Sequence[str]] = None, + meta: Optional[Dict[str, Any]] = None, +) -> str: + payload: Dict[str, Any] = {"ok": True} + if message: + payload["message"] = message + if data is not None: + payload["data"] = data + if warnings: + payload["warnings"] = list(warnings) + if meta: + payload["meta"] = meta + return json.dumps(payload) + + +def _error_response( + message: str, + *, + code: Optional[str] = None, + details: Optional[Dict[str, Any]] = None, + warnings: Optional[Sequence[str]] = None, +) -> str: + payload: Dict[str, Any] = { + "ok": False, + "error": {"message": message}, + } + if code: + payload["error"]["code"] = code + if details: + payload["error"]["details"] = details + if warnings: + payload["warnings"] = list(warnings) + return json.dumps(payload) + + +def _format_drive_item(item: Dict[str, Any]) -> Dict[str, Any]: + return { + "id": item.get("id"), + "name": item.get("name"), + "mime_type": item.get("mimeType"), + "size_bytes": _safe_int(item.get("size")), + "modified_time": item.get("modifiedTime"), + "web_view_link": item.get("webViewLink"), + "web_content_link": item.get("webContentLink"), + "icon_link": item.get("iconLink"), + "parents": item.get("parents"), + "drive_id": item.get("driveId"), + } + + +def _format_permission(permission: Dict[str, Any]) -> Dict[str, Any]: + return { + "id": permission.get("id"), + "type": permission.get("type"), + "role": permission.get("role"), + "email_address": permission.get("emailAddress"), + "domain": permission.get("domain"), + "allow_file_discovery": permission.get("allowFileDiscovery"), + "display_name": permission.get("displayName"), + } + + async def _get_attachment_service() -> AttachmentService: """Get or create the attachment service instance.""" global _attachment_service @@ -443,30 +517,38 @@ async def search_drive_files( """ service, error_msg = _get_drive_service_or_error(user_email) if error_msg: - return error_msg + return _error_response(error_msg, code="DRIVE_SERVICE_ERROR") assert service is not None - # Use conservative heuristic to detect Drive query syntax is_structured = _is_structured_drive_query(query) escaped_query = _escape_query_term(query) + mime_filter: Optional[str] = None + search_terms: Optional[str] = None - # Build intelligent search query: - # 1. If structured query (has field operators), use as-is - # 2. If file type query (e.g., "image", "pdf"), filter by MIME type - # 3. Otherwise, search filename only (metadata search) if is_structured: final_query = query + strategy = "structured" else: - # Check if this is a file type query mime_filter = _detect_file_type_query(query) - if mime_filter: - # Extract any additional search terms (words that aren't the file type) - query_lower = query.lower() - # Remove common file type keywords from the search terms - search_terms = query_lower + search_terms_value = query.lower() for keywords, _ in [ - (["image", "images", "photo", "photos", "picture", "pictures", "img", "png", "jpg", "jpeg", "gif"], None), + ( + [ + "image", + "images", + "photo", + "photos", + "picture", + "pictures", + "img", + "png", + "jpg", + "jpeg", + "gif", + ], + None, + ), (["pdf", "pdfs"], None), (["document", "documents", "doc", "docs"], None), (["spreadsheet", "spreadsheets", "sheet", "sheets"], None), @@ -476,27 +558,29 @@ async def search_drive_files( (["audio", "sound", "music"], None), ]: for keyword in keywords: - # Remove the keyword and common connecting words - pattern = r'\b' + re.escape(keyword) + r'\b' - search_terms = re.sub(pattern, '', search_terms) - - # Clean up the remaining terms - search_terms = re.sub(r'\b(latest|recent|new|old|my)\b', '', search_terms) - search_terms = search_terms.strip() - - # Build query: MIME type filter + optional name search + pattern = r"\b" + re.escape(keyword) + r"\b" + search_terms_value = re.sub(pattern, "", search_terms_value) + + search_terms_value = re.sub( + r"\b(latest|recent|new|old|my)\b", "", search_terms_value + ) + search_terms_value = search_terms_value.strip() + search_terms = search_terms_value or None + if search_terms: escaped_terms = _escape_query_term(search_terms) final_query = f"{mime_filter} and name contains '{escaped_terms}'" + strategy = "mime_filter_with_name" else: final_query = mime_filter + strategy = "mime_filter" else: - # Regular text search - search in filename only final_query = f"name contains '{escaped_query}'" + strategy = "name_contains" params = _build_drive_list_params( query=final_query, - page_size=min(page_size, 100), # Per-page limit + page_size=min(page_size, 100), drive_id=drive_id, include_items_from_all_drives=include_items_from_all_drives, corpora=corpora, @@ -505,7 +589,6 @@ async def search_drive_files( files: List[Dict[str, Any]] = [] page_token: Optional[str] = None - # Paginate until we have enough results or no more pages while len(files) < page_size: if page_token: params["pageToken"] = page_token @@ -513,7 +596,9 @@ async def search_drive_files( try: results = await asyncio.to_thread(service.files().list(**params).execute) except Exception as exc: - return f"Error searching Drive files: {exc}" + return _error_response( + f"Error searching Drive files: {exc}", code="GOOGLE_API_ERROR" + ) page_files = results.get("files", []) files.extend(page_files) @@ -522,25 +607,29 @@ async def search_drive_files( if not page_token or len(files) >= page_size: break - # Trim to requested size files = files[:page_size] - - if not files: - return f"No files found for '{query}'." - - lines = [ - f"Found {len(files)} files for {user_email} matching '{query}':", - "", - ] - for item in files: - size_text = f", Size: {item.get('size', 'N/A')}" if "size" in item else "" - lines.append( - f'- Name: "{item.get("name", "(unknown)")}" ' - f"(ID: {item.get('id', 'unknown')}, Type: {item.get('mimeType', 'unknown')}" - f"{size_text}, Modified: {item.get('modifiedTime', 'N/A')}) " - f"Link: {item.get('webViewLink', '#')}" - ) - return "\n".join(lines) + formatted_files = [_format_drive_item(item) for item in files] + + return _success_response( + data={ + "user_email": user_email, + "count": len(formatted_files), + "files": formatted_files, + "next_page_token": page_token, + "query": { + "original": query, + "resolved": final_query, + "strategy": strategy, + "mime_filter": mime_filter, + "search_terms": search_terms, + "page_size": page_size, + "drive_id": drive_id, + "include_items_from_all_drives": include_items_from_all_drives, + "corpora": corpora, + }, + }, + message=f"No files found for '{query}'." if not files else None, + ) @mcp.tool("gdrive_list_folder") @@ -564,7 +653,7 @@ async def list_drive_items( """ service, error_msg = _get_drive_service_or_error(user_email) if error_msg: - return error_msg + return _error_response(error_msg, code="DRIVE_SERVICE_ERROR") assert service is not None resolved_id, display_label, warnings = await _resolve_folder_reference( @@ -578,10 +667,16 @@ async def list_drive_items( ) if resolved_id is None: - detail_lines = [display_label or "Unable to resolve folder selection."] - if warnings: - detail_lines.extend(warnings) - return "\n".join(detail_lines) + return _error_response( + display_label or "Unable to resolve folder selection.", + code="FOLDER_RESOLUTION_FAILED", + details={ + "folder_id": folder_id, + "folder_name": folder_name, + "folder_path": folder_path, + }, + warnings=warnings, + ) query = f"'{resolved_id}' in parents and trashed=false" params = _build_drive_list_params( @@ -595,7 +690,6 @@ async def list_drive_items( files: List[Dict[str, Any]] = [] page_token: Optional[str] = None - # Paginate until we have enough results or no more pages while len(files) < page_size: if page_token: params["pageToken"] = page_token @@ -603,7 +697,9 @@ async def list_drive_items( try: results = await asyncio.to_thread(service.files().list(**params).execute) except Exception as exc: - return f"Error listing Drive items: {exc}" + return _error_response( + f"Error listing Drive items: {exc}", code="GOOGLE_API_ERROR" + ) page_files = results.get("files", []) files.extend(page_files) @@ -612,30 +708,32 @@ async def list_drive_items( if not page_token or len(files) >= page_size: break - # Trim to requested size files = files[:page_size] - - if not files: - response_lines = [f"No items found in folder '{display_label}'."] - if warnings: - response_lines.extend(warnings) - return "\n".join(response_lines) - - lines = [ - f"Found {len(files)} items in folder '{display_label}' for {user_email}:", - "", - ] - for item in files: - size_text = f", Size: {item.get('size', 'N/A')}" if "size" in item else "" - lines.append( - f'- Name: "{item.get("name", "(unknown)")}" ' - f"(ID: {item.get('id', 'unknown')}, Type: {item.get('mimeType', 'unknown')}" - f"{size_text}, Modified: {item.get('modifiedTime', 'N/A')}) " - f"Link: {item.get('webViewLink', '#')}" - ) - if warnings: - lines.extend(["", *warnings]) - return "\n".join(lines) + formatted_items = [_format_drive_item(item) for item in files] + + return _success_response( + data={ + "user_email": user_email, + "folder": { + "requested": { + "id": folder_id, + "name": folder_name, + "path": folder_path, + }, + "resolved_id": resolved_id, + "display_label": display_label, + }, + "count": len(formatted_items), + "items": formatted_items, + "next_page_token": page_token, + }, + warnings=warnings or None, + message=( + f"No items found in folder '{display_label}'." + if not formatted_items + else None + ), + ) @mcp.tool("gdrive_get_file_content") @@ -645,7 +743,7 @@ async def get_drive_file_content( ) -> str: service, error_msg = _get_drive_service_or_error(user_email) if error_msg: - return error_msg + return _error_response(error_msg, code="DRIVE_SERVICE_ERROR") assert service is not None try: @@ -659,7 +757,10 @@ async def get_drive_file_content( .execute ) except Exception as exc: - return f"Error retrieving metadata for file {file_id}: {exc}" + return _error_response( + f"Error retrieving metadata for file {file_id}: {exc}", + code="GOOGLE_API_ERROR", + ) mime_type = metadata.get("mimeType", "") export_mappings = { @@ -678,29 +779,33 @@ async def get_drive_file_content( try: content_bytes = await _download_request_bytes(request) except ValueError as exc: - return f"File too large: {exc}" + return _error_response(f"File too large: {exc}", code="FILE_TOO_LARGE") except Exception as exc: - return f"Error downloading file content: {exc}" + return _error_response( + f"Error downloading file content: {exc}", code="GOOGLE_API_ERROR" + ) + content_length = len(content_bytes) office_mime_types = { "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/vnd.openxmlformats-officedocument.presentationml.presentation", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", } + extraction_method = "utf-8-decode" body_text: str + if mime_type == "application/pdf": try: - # Run PDF extraction in thread pool to avoid blocking event loop - def _extract_pdf() -> str: + def _extract_pdf() -> Tuple[str, bool]: payload = base64.b64encode(content_bytes).decode("ascii") result: Dict[str, Any] = kb_extract_bytes( content_base64=payload, mime_type=mime_type, ) text = str(result.get("content") or result.get("text") or "").strip() + used_ocr = False - # If no text was extracted, try an OCR pass as a fallback if not text: try: result_ocr: Dict[str, Any] = kb_extract_bytes( @@ -711,54 +816,66 @@ def _extract_pdf() -> str: text = str( result_ocr.get("content") or result_ocr.get("text") or "" ).strip() + used_ocr = bool(text) except Exception: pass - return text + return text, used_ocr - body_text = await asyncio.to_thread(_extract_pdf) + extracted_text, used_ocr = await asyncio.to_thread(_extract_pdf) + extraction_method = "pdf_extractor_ocr" if used_ocr else "pdf_extractor" + body_text = extracted_text if not body_text: - # Fall back to a best-effort UTF-8 decode or binary notice try: body_text = content_bytes.decode("utf-8") + extraction_method = "utf-8-decode" except UnicodeDecodeError: body_text = ( f"[Binary or unsupported text encoding for mimeType '{mime_type}' - " - f"{len(content_bytes)} bytes]" + f"{content_length} bytes]" ) + extraction_method = "binary-placeholder" except Exception: - # If extraction fails for any reason, degrade gracefully try: body_text = content_bytes.decode("utf-8") + extraction_method = "utf-8-decode" except UnicodeDecodeError: body_text = ( f"[Binary or unsupported text encoding for mimeType '{mime_type}' - " - f"{len(content_bytes)} bytes]" + f"{content_length} bytes]" ) + extraction_method = "binary-placeholder" elif mime_type in office_mime_types: - # Office documents are binary formats - note the limitation try: body_text = content_bytes.decode("utf-8") except UnicodeDecodeError: body_text = ( f"[Binary Office document - text extraction not supported for mimeType '{mime_type}' - " - f"{len(content_bytes)} bytes]" + f"{content_length} bytes]" ) + extraction_method = "binary-placeholder" else: try: body_text = content_bytes.decode("utf-8") except UnicodeDecodeError: body_text = ( f"[Binary or unsupported text encoding for mimeType '{mime_type}' - " - f"{len(content_bytes)} bytes]" + f"{content_length} bytes]" ) - - header = ( - f'File: "{metadata.get("name", "Unknown File")}" ' - f"(ID: {file_id}, Type: {mime_type})\n" - f"Link: {metadata.get('webViewLink', '#')}\n\n--- CONTENT ---\n" + extraction_method = "binary-placeholder" + + return _success_response( + data={ + "file": _format_drive_item(metadata), + "content": body_text, + "extraction": { + "export_mime_type": export_mime, + "method": extraction_method, + "byte_length": content_length, + }, + "user_email": user_email, + } ) - return header + body_text @mcp.tool("gdrive_create_file") @@ -771,14 +888,15 @@ async def create_drive_file( file_url: Optional[str] = None, ) -> str: if not content and not file_url: - return "You must provide either 'content' or 'file_url'." + return _error_response( + "You must provide either 'content' or 'file_url'.", code="INVALID_INPUT" + ) service, error_msg = _get_drive_service_or_error(user_email) if error_msg: - return error_msg + return _error_response(error_msg, code="DRIVE_SERVICE_ERROR") assert service is not None - # Normalize parent folder ID normalized_folder_id = _normalize_parent_id(folder_id) data: bytes @@ -787,38 +905,51 @@ async def create_drive_file( async with httpx.AsyncClient( timeout=httpx.Timeout(10.0, connect=5.0) ) as client: - # Check Content-Length before downloading if available head_resp = await client.head(file_url, follow_redirects=True) if head_resp.status_code == 200: content_length = head_resp.headers.get("Content-Length") if content_length and int(content_length) > MAX_CONTENT_BYTES: - return ( - f"File at URL is too large ({int(content_length)} bytes). " - f"Maximum allowed size is {MAX_CONTENT_BYTES} bytes " - f"(~{MAX_CONTENT_BYTES // (1024 * 1024)}MB)." + return _error_response( + ( + f"File at URL is too large ({int(content_length)} bytes). " + f"Maximum allowed size is {MAX_CONTENT_BYTES} bytes " + f"(~{MAX_CONTENT_BYTES // (1024 * 1024)}MB)." + ), + code="PAYLOAD_TOO_LARGE", ) resp = await client.get(file_url, follow_redirects=True) resp.raise_for_status() data = await resp.aread() - # Check actual size after download if len(data) > MAX_CONTENT_BYTES: - return ( - f"File content from URL is too large ({len(data)} bytes). " - f"Maximum allowed size is {MAX_CONTENT_BYTES} bytes " - f"(~{MAX_CONTENT_BYTES // (1024 * 1024)}MB)." + return _error_response( + ( + f"File content from URL is too large ({len(data)} bytes). " + f"Maximum allowed size is {MAX_CONTENT_BYTES} bytes " + f"(~{MAX_CONTENT_BYTES // (1024 * 1024)}MB)." + ), + code="PAYLOAD_TOO_LARGE", ) content_type = resp.headers.get("Content-Type") if content_type and content_type != "application/octet-stream": mime_type = content_type except httpx.TimeoutException: - return f"Request timed out while fetching file from URL '{file_url}'." + return _error_response( + f"Request timed out while fetching file from URL '{file_url}'.", + code="FILE_DOWNLOAD_TIMEOUT", + ) except httpx.HTTPStatusError as exc: - return f"HTTP error fetching file from URL '{file_url}': {exc.response.status_code}" + return _error_response( + f"HTTP error fetching file from URL '{file_url}': {exc.response.status_code}", + code="FILE_DOWNLOAD_ERROR", + ) except Exception as exc: - return f"Failed to fetch file from URL '{file_url}': {exc}" + return _error_response( + f"Failed to fetch file from URL '{file_url}': {exc}", + code="FILE_DOWNLOAD_ERROR", + ) else: data = (content or "").encode("utf-8") @@ -836,19 +967,24 @@ async def create_drive_file( .create( body=metadata, media_body=media, - fields="id, name, webViewLink", + fields="id, name, mimeType, webViewLink", supportsAllDrives=True, ) .execute ) except Exception as exc: - return f"Error creating Drive file: {exc}" + return _error_response( + f"Error creating Drive file: {exc}", code="GOOGLE_API_ERROR" + ) - link = created.get("webViewLink", "N/A") - return ( - f"Successfully created file '{created.get('name', file_name)}' " - f"(ID: {created.get('id', 'unknown')}) in folder '{normalized_folder_id}' for {user_email}. " - f"Link: {link}" + created.setdefault("mimeType", mime_type) + + return _success_response( + data={ + "file": _format_drive_item(created), + "parent_folder_id": normalized_folder_id, + "user_email": user_email, + } ) @@ -860,7 +996,7 @@ async def delete_drive_file( ) -> str: service, error_msg = _get_drive_service_or_error(user_email) if error_msg: - return error_msg + return _error_response(error_msg, code="DRIVE_SERVICE_ERROR") assert service is not None try: @@ -874,10 +1010,14 @@ async def delete_drive_file( .execute ) except Exception as exc: - return f"Error retrieving Drive file {file_id}: {exc}" + return _error_response( + f"Error retrieving Drive file {file_id}: {exc}", + code="GOOGLE_API_ERROR", + ) - file_name = ( - metadata.get("name", "(unknown)") if isinstance(metadata, dict) else "(unknown)" + file_name = metadata.get("name", "(unknown)") if isinstance(metadata, dict) else "(unknown)" + previous_parents = ( + (metadata.get("parents") or []) if isinstance(metadata, dict) else None ) try: @@ -885,7 +1025,15 @@ async def delete_drive_file( await asyncio.to_thread( service.files().delete(fileId=file_id, supportsAllDrives=True).execute ) - return f"File '{file_name}' (ID: {file_id}) permanently deleted." + return _success_response( + data={ + "file_id": file_id, + "file_name": file_name, + "action": "permanently_deleted", + "user_email": user_email, + "previous_parent_ids": previous_parents, + } + ) trashed = await asyncio.to_thread( service.files() @@ -898,12 +1046,22 @@ async def delete_drive_file( .execute ) except Exception as exc: - return f"Error deleting Drive file {file_id}: {exc}" + return _error_response( + f"Error deleting Drive file {file_id}: {exc}", code="GOOGLE_API_ERROR" + ) trashed_name = ( trashed.get("name", file_name) if isinstance(trashed, dict) else file_name ) - return f"File '{trashed_name}' (ID: {file_id}) moved to trash." + return _success_response( + data={ + "file_id": file_id, + "file_name": trashed_name, + "action": "trashed", + "user_email": user_email, + "previous_parent_ids": previous_parents, + } + ) @mcp.tool("gdrive_move_file") @@ -914,7 +1072,7 @@ async def move_drive_file( ) -> str: service, error_msg = _get_drive_service_or_error(user_email) if error_msg: - return error_msg + return _error_response(error_msg, code="DRIVE_SERVICE_ERROR") assert service is not None try: @@ -928,9 +1086,14 @@ async def move_drive_file( .execute ) except Exception as exc: - return f"Error retrieving Drive file {file_id}: {exc}" + return _error_response( + f"Error retrieving Drive file {file_id}: {exc}", + code="GOOGLE_API_ERROR", + ) - current_parents = metadata.get("parents", []) if isinstance(metadata, dict) else [] + current_parents = ( + (metadata.get("parents") or []) if isinstance(metadata, dict) else [] + ) remove_parents = ",".join(current_parents) update_kwargs = { @@ -947,15 +1110,23 @@ async def move_drive_file( service.files().update(**update_kwargs).execute ) except Exception as exc: - return f"Error moving Drive file {file_id}: {exc}" + return _error_response( + f"Error moving Drive file {file_id}: {exc}", code="GOOGLE_API_ERROR" + ) new_name = ( updated.get("name", metadata.get("name", "(unknown)")) if isinstance(updated, dict) else metadata.get("name", "(unknown)") ) - return ( - f"File '{new_name}' (ID: {file_id}) moved to folder '{destination_folder_id}'." + return _success_response( + data={ + "file_id": file_id, + "file_name": new_name, + "destination_folder_id": destination_folder_id, + "previous_parent_ids": current_parents, + "user_email": user_email, + } ) @@ -968,7 +1139,7 @@ async def copy_drive_file( ) -> str: service, error_msg = _get_drive_service_or_error(user_email) if error_msg: - return error_msg + return _error_response(error_msg, code="DRIVE_SERVICE_ERROR") assert service is not None body: Dict[str, object] = {} @@ -984,19 +1155,22 @@ async def copy_drive_file( .copy( fileId=file_id, body=body, - fields="id, name, webViewLink", + fields="id, name, mimeType, webViewLink", supportsAllDrives=True, ) .execute ) except Exception as exc: - return f"Error copying Drive file {file_id}: {exc}" + return _error_response( + f"Error copying Drive file {file_id}: {exc}", code="GOOGLE_API_ERROR" + ) - copy_name = copied.get("name", "(unknown)") - copy_id = copied.get("id", "(unknown)") - link = copied.get("webViewLink", "N/A") - return ( - f"Created copy '{copy_name}' (ID: {copy_id}) from file {file_id}. Link: {link}" + return _success_response( + data={ + "source_file_id": file_id, + "file": _format_drive_item(copied), + "user_email": user_email, + } ) @@ -1007,11 +1181,14 @@ async def rename_drive_file( user_email: str = DEFAULT_USER_EMAIL, ) -> str: if not new_name.strip(): - return "A non-empty new_name is required to rename a file." + return _error_response( + "A non-empty new_name is required to rename a file.", + code="INVALID_INPUT", + ) service, error_msg = _get_drive_service_or_error(user_email) if error_msg: - return error_msg + return _error_response(error_msg, code="DRIVE_SERVICE_ERROR") assert service is not None try: @@ -1026,12 +1203,20 @@ async def rename_drive_file( .execute ) except Exception as exc: - return f"Error renaming Drive file {file_id}: {exc}" + return _error_response( + f"Error renaming Drive file {file_id}: {exc}", code="GOOGLE_API_ERROR" + ) final_name = ( updated.get("name", new_name) if isinstance(updated, dict) else new_name ) - return f"File {file_id} renamed to '{final_name}'." + return _success_response( + data={ + "file_id": file_id, + "file_name": final_name, + "user_email": user_email, + } + ) @mcp.tool("gdrive_create_folder") @@ -1041,14 +1226,16 @@ async def create_drive_folder( parent_folder_id: str = "root", ) -> str: if not folder_name.strip(): - return "A non-empty folder_name is required to create a folder." + return _error_response( + "A non-empty folder_name is required to create a folder.", + code="INVALID_INPUT", + ) service, error_msg = _get_drive_service_or_error(user_email) if error_msg: - return error_msg + return _error_response(error_msg, code="DRIVE_SERVICE_ERROR") assert service is not None - # Normalize parent folder ID normalized_parent = _normalize_parent_id(parent_folder_id) body = { @@ -1062,19 +1249,23 @@ async def create_drive_folder( service.files() .create( body=body, - fields="id, name, parents, webViewLink", + fields="id, name, mimeType, parents, webViewLink", supportsAllDrives=True, ) .execute ) except Exception as exc: - return f"Error creating Drive folder '{folder_name}': {exc}" + return _error_response( + f"Error creating Drive folder '{folder_name}': {exc}", + code="GOOGLE_API_ERROR", + ) - folder_id = created.get("id", "(unknown)") - link = created.get("webViewLink", "N/A") - return ( - f"Created folder '{created.get('name', folder_name)}' " - f"(ID: {folder_id}) under parent '{normalized_parent}'. Link: {link}" + return _success_response( + data={ + "folder": _format_drive_item(created), + "parent_folder_id": normalized_parent, + "user_email": user_email, + } ) @@ -1085,7 +1276,7 @@ async def get_drive_file_permissions( ) -> str: service, error_msg = _get_drive_service_or_error(user_email) if error_msg: - return error_msg + return _error_response(error_msg, code="DRIVE_SERVICE_ERROR") assert service is not None try: @@ -1102,75 +1293,46 @@ async def get_drive_file_permissions( .execute ) except Exception as exc: - return f"Error retrieving permissions for file {file_id}: {exc}" - - lines = [ - f"File: {metadata.get('name', 'Unknown')}", - f"ID: {file_id}", - f"Type: {metadata.get('mimeType', 'Unknown')}", - f"Size: {metadata.get('size', 'N/A')} bytes", - f"Modified: {metadata.get('modifiedTime', 'N/A')}", - "", - "Sharing Status:", - f" Shared: {metadata.get('shared', False)}", - ] - sharing_user = metadata.get("sharingUser") - if sharing_user: - lines.append( - f" Shared by: {sharing_user.get('displayName', 'Unknown')} " - f"({sharing_user.get('emailAddress', 'Unknown')})" + return _error_response( + f"Error retrieving permissions for file {file_id}: {exc}", + code="GOOGLE_API_ERROR", ) - permissions = metadata.get("permissions", []) - if permissions: - lines.append(f" Number of permissions: {len(permissions)}") - lines.append(" Permissions:") - for perm in permissions: - perm_type = perm.get("type", "unknown") - role = perm.get("role", "unknown") - if perm_type == "anyone": - lines.append(f" - Anyone with the link ({role})") - elif perm_type in {"user", "group"}: - lines.append( - f" - {perm_type.title()}: {perm.get('emailAddress', 'unknown')} ({role})" - ) - elif perm_type == "domain": - lines.append(f" - Domain: {perm.get('domain', 'unknown')} ({role})") - else: - lines.append(f" - {perm_type} ({role})") - else: - lines.append(" No additional permissions (private file)") - - lines.extend( - [ - "", - "URLs:", - f" View Link: {metadata.get('webViewLink', 'N/A')}", - ] - ) - if metadata.get("webContentLink"): - lines.append(f" Direct Download Link: {metadata['webContentLink']}") - - # Check for public link permission + permissions = metadata.get("permissions") or [] has_public = _has_anyone_link_access(permissions) - if has_public: - lines.extend( - [ - "", - "✅ This file is shared with 'Anyone with the link'.", - ] - ) - else: - lines.extend( - [ - "", - "❌ This file is NOT shared with 'Anyone with the link'.", - " To fix: Right-click the file in Google Drive → Share → Anyone with the link → Viewer", - ] - ) - - return "\n".join(lines) + owners = [ + { + "display_name": owner.get("displayName"), + "email_address": owner.get("emailAddress"), + } + for owner in (metadata.get("owners") or []) + ] + sharing_user = metadata.get("sharingUser") or {} + + return _success_response( + data={ + "file": _format_drive_item(metadata), + "size_bytes": _safe_int(metadata.get("size")), + "modified_time": metadata.get("modifiedTime"), + "owners": owners, + "permissions": [_format_permission(p) for p in permissions], + "shared": metadata.get("shared", False), + "sharing_user": { + "display_name": sharing_user.get("displayName"), + "email_address": sharing_user.get("emailAddress"), + } + if sharing_user + else None, + "viewers_can_copy_content": metadata.get("viewersCanCopyContent"), + "links": { + "view": metadata.get("webViewLink"), + "download": metadata.get("webContentLink"), + }, + "has_public_link": has_public, + "user_email": user_email, + } + ) @mcp.tool("gdrive_display_image") @@ -1179,28 +1341,18 @@ async def display_drive_image( session_id: str, user_email: str = DEFAULT_USER_EMAIL, ) -> str: - """Download an image from Google Drive and display it in the chat. - - This tool downloads an image file from Google Drive and stores it for display - in the chat interface. The image becomes part of the conversation history. - - Args: - file_id: The Google Drive file ID - session_id: The chat session ID (required to store the attachment) - user_email: The user's email address for authentication - - Returns: - A message with attachment details including signed URL for display - """ + """Download an image from Google Drive and display it in the chat.""" if not session_id or not session_id.strip(): - return "session_id is required to display the image in chat." - + return _error_response( + "session_id is required to display the image in chat.", + code="INVALID_INPUT", + ) + service, error_msg = _get_drive_service_or_error(user_email) if error_msg: - return error_msg + return _error_response(error_msg, code="DRIVE_SERVICE_ERROR") assert service is not None - # Get file metadata try: metadata = await asyncio.to_thread( service.files() @@ -1212,33 +1364,37 @@ async def display_drive_image( .execute ) except Exception as exc: - return f"Error retrieving metadata for file {file_id}: {exc}" + return _error_response( + f"Error retrieving metadata for file {file_id}: {exc}", + code="GOOGLE_API_ERROR", + ) mime_type = metadata.get("mimeType", "") file_name = metadata.get("name", "image") - file_size = metadata.get("size") - # Verify it's an image if not mime_type.startswith("image/"): - return ( - f"Error: File '{file_name}' is not an image (type: {mime_type}). " - "Only image files can be displayed. Use gdrive_get_file_content for other file types." + return _error_response( + ( + f"File '{file_name}' is not an image (type: {mime_type}). " + "Only image files can be displayed." + ), + code="UNSUPPORTED_MIME_TYPE", ) - # Download the image request = service.files().get_media(fileId=file_id) - + try: image_bytes = await _download_request_bytes(request, max_size=MAX_CONTENT_BYTES) except ValueError as exc: - return f"Image too large: {exc}" + return _error_response(f"Image too large: {exc}", code="FILE_TOO_LARGE") except Exception as exc: - return f"Error downloading image: {exc}" + return _error_response( + f"Error downloading image: {exc}", code="GOOGLE_API_ERROR" + ) - # Store the image using AttachmentService try: from backend.services.attachments import AttachmentError, AttachmentTooLarge - + attachment_service = await _get_attachment_service() record = await attachment_service.save_bytes( session_id=session_id, @@ -1247,24 +1403,31 @@ async def display_drive_image( filename_hint=file_name, ) except AttachmentTooLarge as exc: - return f"Image rejected: {exc}" + return _error_response(f"Image rejected: {exc}", code="PAYLOAD_TOO_LARGE") except AttachmentError as exc: - return f"Failed to store image: {exc}" + return _error_response( + f"Failed to store image: {exc}", code="ATTACHMENT_STORAGE_ERROR" + ) - # Extract attachment details attachment_metadata = record.get("metadata") or {} stored_filename = attachment_metadata.get("filename") or file_name signed_url = record.get("signed_url") or record.get("display_url") expires_at = record.get("expires_at") or record.get("signed_url_expires_at") - lines = [ - f"Image '{stored_filename}' from Google Drive displayed in chat!", - f"attachment_id: {record.get('attachment_id')}", - f"Filename: {stored_filename}", - f"Size: {record.get('size_bytes')} bytes", - ] - - return "\n".join(lines) + return _success_response( + data={ + "file": _format_drive_item(metadata), + "attachment": { + "attachment_id": record.get("attachment_id"), + "filename": stored_filename, + "size_bytes": record.get("size_bytes"), + "signed_url": signed_url, + "signed_url_expires_at": expires_at, + }, + "session_id": session_id, + "user_email": user_email, + } + ) @mcp.tool("gdrive_check_public_access") @@ -1274,7 +1437,7 @@ async def check_drive_file_public_access( ) -> str: service, error_msg = _get_drive_service_or_error(user_email) if error_msg: - return error_msg + return _error_response(error_msg, code="DRIVE_SERVICE_ERROR") assert service is not None escaped_name = _escape_query_term(file_name) @@ -1290,20 +1453,32 @@ async def check_drive_file_public_access( try: results = await asyncio.to_thread(service.files().list(**params).execute) except Exception as exc: - return f"Error searching for file '{file_name}': {exc}" + return _error_response( + f"Error searching for file '{file_name}': {exc}", + code="GOOGLE_API_ERROR", + ) files = results.get("files", []) - if not files: - return f"No file found with name '{file_name}'." + candidate_summaries = [ + { + "id": item.get("id"), + "name": item.get("name"), + "mime_type": item.get("mimeType"), + "web_view_link": item.get("webViewLink"), + "shared": item.get("shared"), + } + for item in files + ] - lines: List[str] = [] - if len(files) > 1: - lines.append(f"Found {len(files)} files with name '{file_name}':") - for item in files: - item_name = item.get("name", "(unknown)") - item_id = item.get("id", "unknown") - lines.append(f" - {item_name} (ID: {item_id})") - lines.extend(["", "Checking the first file...", ""]) + if not files: + return _success_response( + data={ + "file_name": file_name, + "matches": [], + "user_email": user_email, + }, + message=f"No file found with name '{file_name}'.", + ) first = files[0] file_id = first.get("id") @@ -1318,38 +1493,36 @@ async def check_drive_file_public_access( .execute ) except Exception as exc: - return f"Error retrieving permissions for file '{file_id}': {exc}" + return _error_response( + f"Error retrieving permissions for file '{file_id}': {exc}", + code="GOOGLE_API_ERROR", + ) - permissions = metadata.get("permissions", []) - # Check for public link permission + permissions = metadata.get("permissions") or [] has_public = _has_anyone_link_access(permissions) - - lines.extend( - [ - f"File: {metadata.get('name', 'Unknown')}", - f"ID: {metadata.get('id', 'unknown')}", - f"Type: {metadata.get('mimeType', 'unknown')}", - f"Shared: {metadata.get('shared', False)}", - "", - ] + remediation = ( + "Drive → Share → 'Anyone with the link' → 'Viewer'" + if not has_public + else None ) - if has_public: - lines.extend( - [ - "✅ PUBLIC ACCESS ENABLED - This file is publicly shared.", - f"Direct link: https://drive.google.com/uc?export=view&id={file_id}", - ] - ) - else: - lines.extend( - [ - "❌ NO PUBLIC ACCESS - File is not publicly shared.", - "Fix: Drive → Share → 'Anyone with the link' → 'Viewer'.", - ] - ) - - return "\n".join(lines) + return _success_response( + data={ + "file_name": file_name, + "matches": candidate_summaries, + "evaluated_file": { + "file": _format_drive_item(metadata), + "permissions": [_format_permission(p) for p in permissions], + "has_public_link": has_public, + "shared": metadata.get("shared", False), + "direct_public_link": f"https://drive.google.com/uc?export=view&id={file_id}" + if has_public + else None, + "remediation": remediation, + }, + "user_email": user_email, + } + ) def run() -> None: # pragma: no cover - integration entrypoint diff --git a/tests/test_gdrive_server.py b/tests/test_gdrive_server.py index 4b4a2d8..f374cbf 100644 --- a/tests/test_gdrive_server.py +++ b/tests/test_gdrive_server.py @@ -3,6 +3,7 @@ from __future__ import annotations import base64 +import json from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -26,10 +27,12 @@ async def test_search_drive_files_auth_error(mock_get_drive_service): mock_get_drive_service.side_effect = ValueError("Missing credentials") result = await search_drive_files("important doc") + payload = json.loads(result) - assert "Authentication error" in result - assert "Missing credentials" in result - assert "Connect Google Services" in result + assert payload["ok"] is False + assert "Authentication error" in payload["error"]["message"] + assert "Missing credentials" in payload["error"]["message"] + assert "Connect Google Services" in payload["error"]["message"] @pytest.mark.asyncio @@ -77,8 +80,11 @@ async def fake_to_thread(func, *args, **kwargs): user_email="user@example.com", ) - assert "Found 1 items in folder 'bps'" in result - assert "notes.txt" in result + payload = json.loads(result) + assert payload["ok"] is True + assert payload["data"]["folder"]["display_label"] == "bps" + assert payload["data"]["count"] == 1 + assert payload["data"]["items"][0]["name"] == "notes.txt" assert files_api.list.call_count == 2 @@ -143,7 +149,10 @@ async def fake_to_thread(func, *args, **kwargs): result = await delete_drive_file("file123", user_email="user@example.com") - assert "moved to trash" in result + payload = json.loads(result) + assert payload["ok"] is True + assert payload["data"]["action"] == "trashed" + assert payload["data"]["file_id"] == "file123" files_api.update.assert_called_once() kwargs = files_api.update.call_args.kwargs assert kwargs["fileId"] == "file123" @@ -172,7 +181,10 @@ async def fake_to_thread(func, *args, **kwargs): "file123", user_email="user@example.com", permanent=True ) - assert "permanently deleted" in result + payload = json.loads(result) + assert payload["ok"] is True + assert payload["data"]["action"] == "permanently_deleted" + assert payload["data"]["file_id"] == "file123" files_api.delete.assert_called_once_with(fileId="file123", supportsAllDrives=True) files_api.update.assert_not_called() @@ -205,7 +217,10 @@ async def fake_to_thread(func, *args, **kwargs): "file123", destination_folder_id="newParent", user_email="user@example.com" ) - assert "moved to folder 'newParent'" in result + payload = json.loads(result) + assert payload["ok"] is True + assert payload["data"]["destination_folder_id"] == "newParent" + assert payload["data"]["file_id"] == "file123" kwargs = mock_service.files.return_value.update.call_args.kwargs assert kwargs["addParents"] == "newParent" assert kwargs["removeParents"] == "oldParent" @@ -237,7 +252,10 @@ async def fake_to_thread(func, *args, **kwargs): destination_folder_id="destFolder", ) - assert "Created copy 'Report Copy'" in result + payload = json.loads(result) + assert payload["ok"] is True + assert payload["data"]["file"]["name"] == "Report Copy" + assert payload["data"]["source_file_id"] == "file123" files_api.copy.assert_called_once() kwargs = files_api.copy.call_args.kwargs assert kwargs["fileId"] == "file123" @@ -266,7 +284,9 @@ async def fake_to_thread(func, *args, **kwargs): "file123", new_name="New Name", user_email="user@example.com" ) - assert "renamed to 'New Name'" in result + payload = json.loads(result) + assert payload["ok"] is True + assert payload["data"]["file_name"] == "New Name" files_api.update.assert_called_once() kwargs = files_api.update.call_args.kwargs assert kwargs["body"] == {"name": "New Name"} @@ -300,8 +320,11 @@ async def fake_to_thread(func, *args, **kwargs): result = await get_drive_file_content("file123", user_email="user@example.com") - assert "Report.pdf" in result - assert "Hello world from PDF" in result + payload = json.loads(result) + assert payload["ok"] is True + assert payload["data"]["file"]["name"] == "Report.pdf" + assert "Hello world from PDF" in payload["data"]["content"] + assert payload["data"]["extraction"]["method"] == "pdf_extractor" mock_extract.assert_called_once() @@ -328,7 +351,10 @@ async def fake_to_thread(func, *args, **kwargs): "Project", parent_folder_id="rootFolder", user_email="user@example.com" ) - assert "Created folder 'Project'" in result + payload = json.loads(result) + assert payload["ok"] is True + assert payload["data"]["folder"]["name"] == "Project" + assert payload["data"]["folder"]["parents"] == ["rootFolder"] files_api.create.assert_called_once() kwargs = files_api.create.call_args.kwargs assert kwargs["body"]["name"] == "Project" @@ -412,9 +438,10 @@ async def fake_to_thread(func, *args, **kwargs): # The query should NOT do a text search for "image" assert "name contains 'image'" not in query_param - # Verify result contains the image - assert "vacation.jpg" in result - assert "image/jpeg" in result + payload = json.loads(result) + assert payload["ok"] is True + assert payload["data"]["files"][0]["name"] == "vacation.jpg" + assert payload["data"]["files"][0]["mime_type"] == "image/jpeg" @pytest.mark.asyncio From 9244bde5c28faf5cf54716c149d4c6429502a5d2 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sat, 15 Nov 2025 02:42:46 +0000 Subject: [PATCH 2/2] Refactor file type detection in GDrive server Co-authored-by: jck411 --- src/backend/mcp_servers/gdrive_server.py | 237 ++++++++++++++--------- tests/test_gdrive_server.py | 29 +++ 2 files changed, 175 insertions(+), 91 deletions(-) diff --git a/src/backend/mcp_servers/gdrive_server.py b/src/backend/mcp_servers/gdrive_server.py index e318495..19a1848 100644 --- a/src/backend/mcp_servers/gdrive_server.py +++ b/src/backend/mcp_servers/gdrive_server.py @@ -192,6 +192,131 @@ def _has_anyone_link_access(permissions: List[Dict[str, Any]]) -> bool: ) +FILE_TYPE_MAPPINGS: List[Tuple[Tuple[str, ...], str]] = [ + ( + ( + "image", + "images", + "photo", + "photos", + "picture", + "pictures", + "img", + "png", + "jpg", + "jpeg", + "gif", + "svg", + "tiff", + "bmp", + "webp", + ), + "mimeType contains 'image/'", + ), + ( + ( + "pdf", + "pdfs", + ), + "mimeType = 'application/pdf'", + ), + ( + ( + "document", + "documents", + "doc", + "docs", + "google doc", + "google docs", + "word", + "docx", + ), + "mimeType = 'application/vnd.google-apps.document'", + ), + ( + ( + "spreadsheet", + "spreadsheets", + "sheet", + "sheets", + "google sheet", + "google sheets", + "excel", + "xlsx", + "xls", + ), + "mimeType = 'application/vnd.google-apps.spreadsheet'", + ), + ( + ( + "presentation", + "presentations", + "slide", + "slides", + "google slide", + "google slides", + "powerpoint", + "ppt", + "pptx", + ), + "mimeType = 'application/vnd.google-apps.presentation'", + ), + ( + ( + "folder", + "folders", + "directory", + "directories", + ), + "mimeType = 'application/vnd.google-apps.folder'", + ), + ( + ( + "video", + "videos", + "movie", + "movies", + "mp4", + "avi", + "mov", + "mkv", + ), + "mimeType contains 'video/'", + ), + ( + ( + "audio", + "sound", + "music", + "mp3", + "wav", + "flac", + ), + "mimeType contains 'audio/'", + ), + ( + ( + "text file", + "text files", + "txt", + ), + "mimeType = 'text/plain'", + ), +] + +_file_type_keyword_order: List[str] = [] +_file_type_keyword_seen: set[str] = set() +for keywords, _ in FILE_TYPE_MAPPINGS: + for keyword in keywords: + if keyword not in _file_type_keyword_seen: + _file_type_keyword_seen.add(keyword) + _file_type_keyword_order.append(keyword) +FILE_TYPE_KEYWORDS: Tuple[str, ...] = tuple(_file_type_keyword_order) + +del _file_type_keyword_order +del _file_type_keyword_seen + + def _build_drive_list_params( *, query: str, @@ -277,71 +402,24 @@ def _is_structured_drive_query(query: str) -> bool: return False -def _detect_file_type_query(query: str) -> Optional[str]: - """Detect if query is asking for a specific file type and return MIME type filter. - - Maps common file type keywords to Google Drive MIME type queries. - Returns None if no file type is detected. - - Examples: - "image" -> "mimeType contains 'image/'" - "latest pdf" -> "mimeType = 'application/pdf'" - "spreadsheet" -> "mimeType = 'application/vnd.google-apps.spreadsheet'" - """ +def _find_file_type_mapping( + query: str, +) -> Optional[Tuple[Tuple[str, ...], str]]: query_lower = query.lower() - - # Map keywords to MIME type filters - # Using "contains" for broader matches, "=" for exact matches - type_mappings = [ - # Images - match any image type - (["image", "images", "photo", "photos", "picture", "pictures", "img", "png", "jpg", "jpeg", "gif"], - "mimeType contains 'image/'"), - - # PDFs - (["pdf", "pdfs"], - "mimeType = 'application/pdf'"), - - # Google Docs - (["document", "documents", "doc", "docs", "google doc", "google docs"], - "mimeType = 'application/vnd.google-apps.document'"), - - # Google Sheets - (["spreadsheet", "spreadsheets", "sheet", "sheets", "google sheet", "google sheets"], - "mimeType = 'application/vnd.google-apps.spreadsheet'"), - - # Google Slides - (["presentation", "presentations", "slide", "slides", "google slide", "google slides"], - "mimeType = 'application/vnd.google-apps.presentation'"), - - # Folders - (["folder", "folders", "directory", "directories"], - "mimeType = 'application/vnd.google-apps.folder'"), - - # Videos - (["video", "videos", "movie", "movies", "mp4", "avi", "mov"], - "mimeType contains 'video/'"), - - # Audio - (["audio", "sound", "music", "mp3", "wav"], - "mimeType contains 'audio/'"), - - # Text files - (["text file", "text files", "txt"], - "mimeType = 'text/plain'"), - ] - - # Check each mapping - for keywords, mime_filter in type_mappings: - # Check if any keyword matches the query (as whole word or part of phrase) + for keywords, mime_filter in FILE_TYPE_MAPPINGS: for keyword in keywords: - # Match keyword as whole word or with common modifiers - pattern = r'\b' + re.escape(keyword) + r'\b' + pattern = r"\b" + re.escape(keyword) + r"\b" if re.search(pattern, query_lower): - return mime_filter - + return keywords, mime_filter return None +def _detect_file_type_query(query: str) -> Optional[str]: + """Detect if query is asking for a specific file type and return MIME type filter.""" + match = _find_file_type_mapping(query) + return match[1] if match else None + + async def _locate_child_folder( service: Any, *, @@ -529,42 +607,19 @@ async def search_drive_files( final_query = query strategy = "structured" else: - mime_filter = _detect_file_type_query(query) - if mime_filter: + mapping = _find_file_type_mapping(query) + if mapping: + _, detected_mime_filter = mapping + mime_filter = detected_mime_filter search_terms_value = query.lower() - for keywords, _ in [ - ( - [ - "image", - "images", - "photo", - "photos", - "picture", - "pictures", - "img", - "png", - "jpg", - "jpeg", - "gif", - ], - None, - ), - (["pdf", "pdfs"], None), - (["document", "documents", "doc", "docs"], None), - (["spreadsheet", "spreadsheets", "sheet", "sheets"], None), - (["presentation", "presentations", "slide", "slides"], None), - (["folder", "folders"], None), - (["video", "videos", "movie", "movies"], None), - (["audio", "sound", "music"], None), - ]: - for keyword in keywords: - pattern = r"\b" + re.escape(keyword) + r"\b" - search_terms_value = re.sub(pattern, "", search_terms_value) + for keyword in FILE_TYPE_KEYWORDS: + pattern = r"\b" + re.escape(keyword) + r"\b" + search_terms_value = re.sub(pattern, " ", search_terms_value) search_terms_value = re.sub( - r"\b(latest|recent|new|old|my)\b", "", search_terms_value + r"\b(latest|recent|new|old|my)\b", " ", search_terms_value ) - search_terms_value = search_terms_value.strip() + search_terms_value = re.sub(r"\s+", " ", search_terms_value).strip() search_terms = search_terms_value or None if search_terms: diff --git a/tests/test_gdrive_server.py b/tests/test_gdrive_server.py index f374cbf..f940026 100644 --- a/tests/test_gdrive_server.py +++ b/tests/test_gdrive_server.py @@ -469,3 +469,32 @@ async def fake_to_thread(func, *args, **kwargs): assert "mimeType = 'application/vnd.google-apps.spreadsheet'" in query_param assert "name contains 'budget'" in query_param assert " and " in query_param + + +@pytest.mark.asyncio +@patch("backend.mcp_servers.gdrive_server.asyncio.to_thread") +@patch("backend.mcp_servers.gdrive_server.get_drive_service") +async def test_search_text_file_strips_keywords(mock_get_drive_service, mock_to_thread): + mock_service = MagicMock() + mock_get_drive_service.return_value = mock_service + + files_api = mock_service.files.return_value + files_api.list.return_value.execute.return_value = {"files": []} + + async def fake_to_thread(func, *args, **kwargs): + return func(*args, **kwargs) + + mock_to_thread.side_effect = fake_to_thread + + await search_drive_files( + query="latest text file project plan", + user_email="user@example.com", + page_size=5, + ) + + call_kwargs = files_api.list.call_args.kwargs + query_param = call_kwargs.get("q") + + assert "mimeType = 'text/plain'" in query_param + assert "name contains 'project plan'" in query_param + assert "text file" not in query_param