From d0988e7e5f0d9ec9cbcb994733e9d5c99e555d92 Mon Sep 17 00:00:00 2001 From: dimavedenyapin Date: Fri, 3 Apr 2026 09:54:29 +0700 Subject: [PATCH 1/2] feat(gmail): add list_emails_structured tool for programmatic email retrieval Add a new `list_emails_structured` tool that returns emails as structured JSON, designed for consumption by workflow-builder trigger nodes and automation workflows. Unlike `read_emails` which returns human-readable text, this tool outputs machine-parseable JSON with typed fields. Features: - Structured JSON output with emails array, resultCount, and query - Optional filters: query, label_ids, max_results (capped at 100) - Toggleable body and attachment inclusion via include_body / include_attachments_info - Extra header extraction via include_headers parameter - Each email includes: id, threadId, historyId, from, to, cc, bcc, subject, date, snippet, labels, isUnread, messageId Co-Authored-By: Claude Opus 4.6 --- src/servers/gmail/config.yaml | 2 + src/servers/gmail/main.py | 143 +++++++ .../gmail/test_list_emails_structured.py | 397 ++++++++++++++++++ 3 files changed, 542 insertions(+) create mode 100644 tests/servers/gmail/test_list_emails_structured.py diff --git a/src/servers/gmail/config.yaml b/src/servers/gmail/config.yaml index f46a9d25..1f30e347 100644 --- a/src/servers/gmail/config.yaml +++ b/src/servers/gmail/config.yaml @@ -13,4 +13,6 @@ tools: description: "Update email labels (mark as read/unread, move to folders)" - name: "get_attachment" description: "Get a temporary download URL for an email attachment" + - name: "list_emails_structured" + description: "List emails as structured JSON for programmatic consumption by triggers and automation workflows" diff --git a/src/servers/gmail/main.py b/src/servers/gmail/main.py index db1b6631..80b44379 100644 --- a/src/servers/gmail/main.py +++ b/src/servers/gmail/main.py @@ -525,6 +525,42 @@ async def handle_list_tools() -> list[Tool]: "required": ["email_id", "attachment_id", "filename"], }, ), + Tool( + name="list_emails_structured", + description="List emails as structured JSON for programmatic consumption. Returns machine-readable email objects with metadata, body, and attachment info. Ideal for triggers and automation workflows.", + inputSchema={ + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "Gmail search query (e.g., 'is:unread', 'from:someone@example.com', 'newer_than:1d'). Default: 'in:inbox'", + }, + "label_ids": { + "type": "array", + "items": {"type": "string"}, + "description": "Filter by Gmail label IDs (e.g., ['INBOX', 'UNREAD']). Applied in addition to query.", + }, + "max_results": { + "type": "integer", + "description": "Maximum number of emails to return (default: 10, max: 100)", + }, + "include_body": { + "type": "boolean", + "description": "Include email body text/html in results (default: true)", + }, + "include_attachments_info": { + "type": "boolean", + "description": "Include attachment metadata in results (default: true)", + }, + "include_headers": { + "type": "array", + "items": {"type": "string"}, + "description": "Additional headers to include (e.g., ['Reply-To', 'Message-ID', 'In-Reply-To']). From, To, Cc, Bcc, Subject, Date are always included.", + }, + }, + "required": [], + }, + ), ] @server.call_tool() @@ -935,6 +971,113 @@ async def handle_call_tool( ) ] + elif name == "list_emails_structured": + query = (arguments or {}).get("query", "in:inbox") + label_ids = (arguments or {}).get("label_ids", None) + max_results = min(int((arguments or {}).get("max_results", 10)), 100) + include_body = (arguments or {}).get("include_body", True) + include_attachments_info_flag = (arguments or {}).get( + "include_attachments_info", True + ) + extra_headers = (arguments or {}).get("include_headers", []) + + # Build the list request + list_kwargs = {"userId": "me", "q": query, "maxResults": max_results} + if label_ids: + list_kwargs["labelIds"] = label_ids + + results = gmail_service.users().messages().list(**list_kwargs).execute() + + messages = results.get("messages", []) + if not messages: + result_data = {"emails": [], "resultCount": 0, "query": query} + return [TextContent(type="text", text=json.dumps(result_data))] + + # Standard headers always extracted + standard_headers = [ + "From", + "To", + "Cc", + "Bcc", + "Subject", + "Date", + "Message-ID", + ] + all_headers = list( + dict.fromkeys(standard_headers + [h for h in extra_headers]) + ) + + email_objects = [] + for message in messages: + msg = ( + gmail_service.users() + .messages() + .get(userId="me", id=message["id"], format="full") + .execute() + ) + + # Extract all requested headers into a dict + headers = {} + for header in msg.get("payload", {}).get("headers", []): + if header["name"] in all_headers: + headers[header["name"]] = header["value"] + + labels = msg.get("labelIds", []) + + email_obj = { + "id": message["id"], + "threadId": msg.get("threadId", ""), + "historyId": msg.get("historyId", ""), + "from": headers.get("From", ""), + "to": headers.get("To", ""), + "cc": headers.get("Cc", ""), + "bcc": headers.get("Bcc", ""), + "subject": headers.get("Subject", ""), + "date": headers.get("Date", ""), + "snippet": msg.get("snippet", ""), + "labels": labels, + "isUnread": "UNREAD" in labels, + "messageId": headers.get("Message-ID", ""), + } + + # Add any extra requested headers + if extra_headers: + email_obj["extraHeaders"] = { + h: headers.get(h, "") for h in extra_headers + } + + # Include body if requested + if include_body: + payload = msg.get("payload", {}) + body = parse_email_body(payload) + email_obj["body"] = { + "text": body.get("text", ""), + "html": body.get("html", ""), + } + + # Include attachment info if requested + if include_attachments_info_flag: + payload = msg.get("payload", {}) + attachments = get_attachments_info(payload) + email_obj["attachments"] = [ + { + "filename": att["filename"], + "mimeType": att["mimeType"], + "size": att.get("size", 0), + "attachmentId": att.get("attachmentId", ""), + } + for att in attachments + ] + + email_objects.append(email_obj) + + result_data = { + "emails": email_objects, + "resultCount": len(email_objects), + "query": query, + } + return [TextContent(type="text", text=json.dumps(result_data))] + raise ValueError(f"Unknown tool: {name}") return server diff --git a/tests/servers/gmail/test_list_emails_structured.py b/tests/servers/gmail/test_list_emails_structured.py new file mode 100644 index 00000000..c38c82b0 --- /dev/null +++ b/tests/servers/gmail/test_list_emails_structured.py @@ -0,0 +1,397 @@ +"""Unit tests for the list_emails_structured Gmail tool. + +These tests mock the Gmail API to verify structured output format, +filtering, and edge cases without requiring real credentials. +""" + +import json +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from base64 import urlsafe_b64encode + +from mcp.types import CallToolRequest, CallToolRequestParams + + +def _make_gmail_message( + msg_id="msg_001", + thread_id="thread_001", + history_id="12345", + subject="Test Subject", + sender="alice@example.com", + to="bob@example.com", + cc="", + bcc="", + date="Mon, 31 Mar 2026 10:00:00 +0000", + message_id_header="", + labels=None, + body_text="Hello world", + body_html="", + snippet="Hello world snippet", + attachments=None, +): + """Build a fake Gmail API message response.""" + if labels is None: + labels = ["INBOX", "UNREAD"] + + headers = [ + {"name": "From", "value": sender}, + {"name": "To", "value": to}, + {"name": "Subject", "value": subject}, + {"name": "Date", "value": date}, + {"name": "Message-ID", "value": message_id_header}, + ] + if cc: + headers.append({"name": "Cc", "value": cc}) + if bcc: + headers.append({"name": "Bcc", "value": bcc}) + + parts = [] + if body_text: + encoded = urlsafe_b64encode(body_text.encode()).decode() + parts.append( + { + "mimeType": "text/plain", + "body": {"data": encoded, "size": len(body_text)}, + } + ) + if body_html: + encoded = urlsafe_b64encode(body_html.encode()).decode() + parts.append( + { + "mimeType": "text/html", + "body": {"data": encoded, "size": len(body_html)}, + } + ) + + if attachments: + for att in attachments: + parts.append( + { + "filename": att["filename"], + "mimeType": att.get("mimeType", "application/octet-stream"), + "body": { + "size": att.get("size", 1024), + "attachmentId": att.get("attachmentId", "att_001"), + }, + } + ) + + payload = {"headers": headers, "mimeType": "multipart/mixed", "parts": parts} + + return { + "id": msg_id, + "threadId": thread_id, + "historyId": history_id, + "labelIds": labels, + "snippet": snippet, + "payload": payload, + } + + +def _build_mock_gmail_service(messages_list_response, message_get_responses): + """Build a mock Gmail service with list and get responses.""" + mock_service = MagicMock() + + # Chain: gmail_service.users().messages().list(...).execute() + mock_list = MagicMock() + mock_list.execute.return_value = messages_list_response + + mock_messages = MagicMock() + mock_messages.list.return_value = mock_list + + # Chain: gmail_service.users().messages().get(...).execute() + mock_get = MagicMock() + # If multiple messages, return them in order + if isinstance(message_get_responses, list): + mock_get.execute.side_effect = message_get_responses + else: + mock_get.execute.return_value = message_get_responses + mock_messages.get.return_value = mock_get + + mock_users = MagicMock() + mock_users.messages.return_value = mock_messages + mock_service.users.return_value = mock_users + + return mock_service + + +@pytest.fixture +def single_email_service(): + """Gmail service mock returning one email.""" + msg = _make_gmail_message() + return _build_mock_gmail_service( + messages_list_response={"messages": [{"id": "msg_001"}]}, + message_get_responses=msg, + ) + + +@pytest.fixture +def multi_email_service(): + """Gmail service mock returning two emails.""" + msg1 = _make_gmail_message( + msg_id="msg_001", + subject="First Email", + sender="alice@example.com", + labels=["INBOX", "UNREAD"], + ) + msg2 = _make_gmail_message( + msg_id="msg_002", + thread_id="thread_002", + subject="Second Email", + sender="charlie@example.com", + labels=["INBOX"], + body_text="Second body", + snippet="Second snippet", + ) + return _build_mock_gmail_service( + messages_list_response={"messages": [{"id": "msg_001"}, {"id": "msg_002"}]}, + message_get_responses=[msg1, msg2], + ) + + +@pytest.fixture +def empty_service(): + """Gmail service mock returning no emails.""" + return _build_mock_gmail_service( + messages_list_response={"messages": []}, + message_get_responses=None, + ) + + +@pytest.fixture +def email_with_attachments_service(): + """Gmail service mock returning an email with attachments.""" + msg = _make_gmail_message( + msg_id="msg_att", + subject="Email with Attachment", + attachments=[ + { + "filename": "report.pdf", + "mimeType": "application/pdf", + "size": 2048, + "attachmentId": "att_pdf_001", + }, + { + "filename": "photo.png", + "mimeType": "image/png", + "size": 4096, + "attachmentId": "att_png_001", + }, + ], + ) + return _build_mock_gmail_service( + messages_list_response={"messages": [{"id": "msg_att"}]}, + message_get_responses=msg, + ) + + +async def _invoke_tool(mock_service, arguments=None): + """Call the list_emails_structured tool via the MCP server and return parsed JSON.""" + with patch( + "src.servers.gmail.main.create_gmail_service", + new_callable=AsyncMock, + return_value=mock_service, + ): + from src.servers.gmail.main import create_server + + server_instance = create_server("test_user", api_key="test_key") + + # Look up the CallToolRequest handler by class key + handler = server_instance.request_handlers[CallToolRequest] + + request = CallToolRequest( + method="tools/call", + params=CallToolRequestParams( + name="list_emails_structured", + arguments=arguments or {}, + ), + ) + result = await handler(request) + # result is a ServerResult; .root is CallToolResult with .content list + text = result.root.content[0].text + return json.loads(text) + + +@pytest.mark.asyncio +async def test_basic_structured_output(single_email_service): + """Structured output contains expected top-level keys and email fields.""" + data = await _invoke_tool(single_email_service) + + assert "emails" in data + assert "resultCount" in data + assert "query" in data + assert data["resultCount"] == 1 + assert data["query"] == "in:inbox" + + email = data["emails"][0] + assert email["id"] == "msg_001" + assert email["threadId"] == "thread_001" + assert email["from"] == "alice@example.com" + assert email["to"] == "bob@example.com" + assert email["subject"] == "Test Subject" + assert email["isUnread"] is True + assert "INBOX" in email["labels"] + assert email["snippet"] == "Hello world snippet" + assert email["messageId"] == "" + + +@pytest.mark.asyncio +async def test_empty_result(empty_service): + """Returns empty list when no emails match.""" + data = await _invoke_tool(empty_service) + + assert data["emails"] == [] + assert data["resultCount"] == 0 + + +@pytest.mark.asyncio +async def test_multiple_emails(multi_email_service): + """Returns multiple emails in order.""" + data = await _invoke_tool(multi_email_service) + + assert data["resultCount"] == 2 + assert data["emails"][0]["subject"] == "First Email" + assert data["emails"][0]["isUnread"] is True + assert data["emails"][1]["subject"] == "Second Email" + assert data["emails"][1]["isUnread"] is False + + +@pytest.mark.asyncio +async def test_body_included_by_default(single_email_service): + """Email body is included when include_body is not specified (default true).""" + data = await _invoke_tool(single_email_service) + + email = data["emails"][0] + assert "body" in email + assert email["body"]["text"] == "Hello world" + + +@pytest.mark.asyncio +async def test_body_excluded_when_disabled(single_email_service): + """Email body is excluded when include_body is false.""" + data = await _invoke_tool(single_email_service, {"include_body": False}) + + email = data["emails"][0] + assert "body" not in email + + +@pytest.mark.asyncio +async def test_attachments_included(email_with_attachments_service): + """Attachment metadata is returned in structured format.""" + data = await _invoke_tool(email_with_attachments_service) + + email = data["emails"][0] + assert "attachments" in email + assert len(email["attachments"]) == 2 + + pdf = email["attachments"][0] + assert pdf["filename"] == "report.pdf" + assert pdf["mimeType"] == "application/pdf" + assert pdf["size"] == 2048 + assert pdf["attachmentId"] == "att_pdf_001" + + png = email["attachments"][1] + assert png["filename"] == "photo.png" + + +@pytest.mark.asyncio +async def test_attachments_excluded_when_disabled( + email_with_attachments_service, +): + """Attachments are excluded when include_attachments_info is false.""" + data = await _invoke_tool( + email_with_attachments_service, + {"include_attachments_info": False}, + ) + + email = data["emails"][0] + assert "attachments" not in email + + +@pytest.mark.asyncio +async def test_custom_query(single_email_service): + """Custom query parameter is passed to Gmail API and reflected in response.""" + data = await _invoke_tool(single_email_service, {"query": "from:alice@example.com"}) + + assert data["query"] == "from:alice@example.com" + + # Verify the query was passed to the API + mock_messages = single_email_service.users().messages() + mock_messages.list.assert_called_with( + userId="me", q="from:alice@example.com", maxResults=10 + ) + + +@pytest.mark.asyncio +async def test_label_ids_filter(single_email_service): + """label_ids parameter is passed to Gmail API list call.""" + await _invoke_tool( + single_email_service, + {"label_ids": ["INBOX", "UNREAD"]}, + ) + + mock_messages = single_email_service.users().messages() + mock_messages.list.assert_called_with( + userId="me", + q="in:inbox", + maxResults=10, + labelIds=["INBOX", "UNREAD"], + ) + + +@pytest.mark.asyncio +async def test_max_results_capped_at_100(single_email_service): + """max_results is capped at 100 even if caller requests more.""" + await _invoke_tool(single_email_service, {"max_results": 500}) + + mock_messages = single_email_service.users().messages() + call_kwargs = mock_messages.list.call_args + assert call_kwargs.kwargs.get("maxResults", call_kwargs[1].get("maxResults")) == 100 + + +@pytest.mark.asyncio +async def test_extra_headers(single_email_service): + """Extra headers requested via include_headers appear in extraHeaders.""" + # Add a Reply-To header to the mock message + msg = single_email_service.users().messages().get().execute() + msg["payload"]["headers"].append({"name": "Reply-To", "value": "reply@example.com"}) + # Reset the mock to return the updated message + single_email_service.users().messages().get().execute.return_value = msg + single_email_service.users().messages().get.return_value.execute.return_value = msg + + data = await _invoke_tool( + single_email_service, + {"include_headers": ["Reply-To"]}, + ) + + email = data["emails"][0] + assert "extraHeaders" in email + assert email["extraHeaders"]["Reply-To"] == "reply@example.com" + + +@pytest.mark.asyncio +async def test_output_is_valid_json(single_email_service): + """Tool output is valid JSON string that can be parsed back to dict.""" + with patch( + "src.servers.gmail.main.create_gmail_service", + new_callable=AsyncMock, + return_value=single_email_service, + ): + from src.servers.gmail.main import create_server + + server_instance = create_server("test_user", api_key="test_key") + handler = server_instance.request_handlers[CallToolRequest] + + request = CallToolRequest( + method="tools/call", + params=CallToolRequestParams( + name="list_emails_structured", + arguments={}, + ), + ) + result = await handler(request) + raw_text = result.root.content[0].text + # Must not throw + parsed = json.loads(raw_text) + assert isinstance(parsed, dict) + assert isinstance(parsed["emails"], list) From 76e9b4307d78a8980991bc0afb14565cdc095765 Mon Sep 17 00:00:00 2001 From: dimavedenyapin Date: Fri, 3 Apr 2026 09:59:19 +0700 Subject: [PATCH 2/2] refactor(gmail): use output_format param on read_emails instead of separate tool Replace the standalone list_emails_structured tool with an output_format parameter on the existing read_emails tool. When output_format='structured' is passed, read_emails returns structured JSON; otherwise it returns the original human-readable text format (backward compatible). Also adds label_ids and include_headers params to read_emails for structured mode filtering. Co-Authored-By: Claude Opus 4.6 --- src/servers/gmail/config.yaml | 4 +- src/servers/gmail/main.py | 271 ++++++++---------- .../gmail/test_list_emails_structured.py | 76 +++-- 3 files changed, 180 insertions(+), 171 deletions(-) diff --git a/src/servers/gmail/config.yaml b/src/servers/gmail/config.yaml index 1f30e347..c9466153 100644 --- a/src/servers/gmail/config.yaml +++ b/src/servers/gmail/config.yaml @@ -4,7 +4,7 @@ description: "Interact with Gmail emails and messages" documentation_path: "README.md" tools: - name: "read_emails" - description: "Search and read emails in Gmail with full text body and attachment information" + description: "Search and read emails in Gmail with full text body and attachment information. Supports structured JSON output via output_format parameter." - name: "send_email" description: "Send an email through Gmail with optional attachments and delivery tracking" - name: "forward_email" @@ -13,6 +13,4 @@ tools: description: "Update email labels (mark as read/unread, move to folders)" - name: "get_attachment" description: "Get a temporary download URL for an email attachment" - - name: "list_emails_structured" - description: "List emails as structured JSON for programmatic consumption by triggers and automation workflows" diff --git a/src/servers/gmail/main.py b/src/servers/gmail/main.py index 80b44379..7eecb765 100644 --- a/src/servers/gmail/main.py +++ b/src/servers/gmail/main.py @@ -364,7 +364,7 @@ async def handle_list_tools() -> list[Tool]: return [ Tool( name="read_emails", - description="Search and read emails in Gmail with full text body and attachment information", + description="Search and read emails in Gmail with full text body and attachment information. Supports structured JSON output for programmatic consumption.", inputSchema={ "type": "object", "properties": { @@ -374,7 +374,7 @@ async def handle_list_tools() -> list[Tool]: }, "max_results": { "type": "integer", - "description": "Maximum number of emails to return (default: 10)", + "description": "Maximum number of emails to return (default: 10, max: 100)", }, "include_body": { "type": "boolean", @@ -384,6 +384,21 @@ async def handle_list_tools() -> list[Tool]: "type": "boolean", "description": "Include attachment information in results (default: true)", }, + "output_format": { + "type": "string", + "enum": ["text", "structured"], + "description": "Output format: 'text' for human-readable (default), 'structured' for machine-readable JSON with typed fields (id, threadId, historyId, from, to, cc, bcc, subject, date, snippet, labels, isUnread, messageId, body, attachments)", + }, + "label_ids": { + "type": "array", + "items": {"type": "string"}, + "description": "Filter by Gmail label IDs (e.g., ['INBOX', 'UNREAD']). Applied in addition to query. Only used with output_format 'structured'.", + }, + "include_headers": { + "type": "array", + "items": {"type": "string"}, + "description": "Additional headers to include in structured output (e.g., ['Reply-To', 'In-Reply-To']). From, To, Cc, Bcc, Subject, Date are always included. Only used with output_format 'structured'.", + }, }, "required": ["query"], }, @@ -525,42 +540,6 @@ async def handle_list_tools() -> list[Tool]: "required": ["email_id", "attachment_id", "filename"], }, ), - Tool( - name="list_emails_structured", - description="List emails as structured JSON for programmatic consumption. Returns machine-readable email objects with metadata, body, and attachment info. Ideal for triggers and automation workflows.", - inputSchema={ - "type": "object", - "properties": { - "query": { - "type": "string", - "description": "Gmail search query (e.g., 'is:unread', 'from:someone@example.com', 'newer_than:1d'). Default: 'in:inbox'", - }, - "label_ids": { - "type": "array", - "items": {"type": "string"}, - "description": "Filter by Gmail label IDs (e.g., ['INBOX', 'UNREAD']). Applied in addition to query.", - }, - "max_results": { - "type": "integer", - "description": "Maximum number of emails to return (default: 10, max: 100)", - }, - "include_body": { - "type": "boolean", - "description": "Include email body text/html in results (default: true)", - }, - "include_attachments_info": { - "type": "boolean", - "description": "Include attachment metadata in results (default: true)", - }, - "include_headers": { - "type": "array", - "items": {"type": "string"}, - "description": "Additional headers to include (e.g., ['Reply-To', 'Message-ID', 'In-Reply-To']). From, To, Cc, Bcc, Subject, Date are always included.", - }, - }, - "required": [], - }, - ), ] @server.call_tool() @@ -581,18 +560,113 @@ async def handle_call_tool( raise ValueError("Missing query parameter") query = arguments["query"] - max_results = int(arguments.get("max_results", 10)) + max_results = min(int(arguments.get("max_results", 10)), 100) include_body = arguments.get("include_body", True) include_attachments_info = arguments.get("include_attachments_info", True) + output_format = arguments.get("output_format", "text") - results = ( - gmail_service.users() - .messages() - .list(userId="me", q=query, maxResults=max_results) - .execute() - ) + # Build the list request + list_kwargs = {"userId": "me", "q": query, "maxResults": max_results} + label_ids = arguments.get("label_ids") + if label_ids: + list_kwargs["labelIds"] = label_ids + + results = gmail_service.users().messages().list(**list_kwargs).execute() messages = results.get("messages", []) + + # --- Structured JSON output --- + if output_format == "structured": + if not messages: + result_data = { + "emails": [], + "resultCount": 0, + "query": query, + } + return [TextContent(type="text", text=json.dumps(result_data))] + + extra_headers = arguments.get("include_headers", []) + standard_headers = [ + "From", + "To", + "Cc", + "Bcc", + "Subject", + "Date", + "Message-ID", + ] + all_headers = list( + dict.fromkeys(standard_headers + [h for h in extra_headers]) + ) + + email_objects = [] + for message in messages: + msg = ( + gmail_service.users() + .messages() + .get(userId="me", id=message["id"], format="full") + .execute() + ) + + headers = {} + for header in msg.get("payload", {}).get("headers", []): + if header["name"] in all_headers: + headers[header["name"]] = header["value"] + + labels = msg.get("labelIds", []) + + email_obj = { + "id": message["id"], + "threadId": msg.get("threadId", ""), + "historyId": msg.get("historyId", ""), + "from": headers.get("From", ""), + "to": headers.get("To", ""), + "cc": headers.get("Cc", ""), + "bcc": headers.get("Bcc", ""), + "subject": headers.get("Subject", ""), + "date": headers.get("Date", ""), + "snippet": msg.get("snippet", ""), + "labels": labels, + "isUnread": "UNREAD" in labels, + "messageId": headers.get("Message-ID", ""), + } + + if extra_headers: + email_obj["extraHeaders"] = { + h: headers.get(h, "") for h in extra_headers + } + + if include_body: + payload = msg.get("payload", {}) + body = parse_email_body(payload) + email_obj["body"] = { + "text": body.get("text", ""), + "html": body.get("html", ""), + } + + if include_attachments_info: + payload = msg.get("payload", {}) + attachments = get_attachments_info(payload) + email_obj["attachments"] = [ + { + "filename": att["filename"], + "mimeType": att["mimeType"], + "size": att.get("size", 0), + "attachmentId": att.get("attachmentId", ""), + } + for att in attachments + ] + + email_objects.append(email_obj) + + result_data = { + "emails": email_objects, + "resultCount": len(email_objects), + "query": query, + } + return [TextContent(type="text", text=json.dumps(result_data))] + + # --- Default text output (unchanged) --- if not messages: return [ TextContent( @@ -971,113 +1045,6 @@ async def handle_call_tool( ) ] - elif name == "list_emails_structured": - query = (arguments or {}).get("query", "in:inbox") - label_ids = (arguments or {}).get("label_ids", None) - max_results = min(int((arguments or {}).get("max_results", 10)), 100) - include_body = (arguments or {}).get("include_body", True) - include_attachments_info_flag = (arguments or {}).get( - "include_attachments_info", True - ) - extra_headers = (arguments or {}).get("include_headers", []) - - # Build the list request - list_kwargs = {"userId": "me", "q": query, "maxResults": max_results} - if label_ids: - list_kwargs["labelIds"] = label_ids - - results = gmail_service.users().messages().list(**list_kwargs).execute() - - messages = results.get("messages", []) - if not messages: - result_data = {"emails": [], "resultCount": 0, "query": query} - return [TextContent(type="text", text=json.dumps(result_data))] - - # Standard headers always extracted - standard_headers = [ - "From", - "To", - "Cc", - "Bcc", - "Subject", - "Date", - "Message-ID", - ] - all_headers = list( - dict.fromkeys(standard_headers + [h for h in extra_headers]) - ) - - email_objects = [] - for message in messages: - msg = ( - gmail_service.users() - .messages() - .get(userId="me", id=message["id"], format="full") - .execute() - ) - - # Extract all requested headers into a dict - headers = {} - for header in msg.get("payload", {}).get("headers", []): - if header["name"] in all_headers: - headers[header["name"]] = header["value"] - - labels = msg.get("labelIds", []) - - email_obj = { - "id": message["id"], - "threadId": msg.get("threadId", ""), - "historyId": msg.get("historyId", ""), - "from": headers.get("From", ""), - "to": headers.get("To", ""), - "cc": headers.get("Cc", ""), - "bcc": headers.get("Bcc", ""), - "subject": headers.get("Subject", ""), - "date": headers.get("Date", ""), - "snippet": msg.get("snippet", ""), - "labels": labels, - "isUnread": "UNREAD" in labels, - "messageId": headers.get("Message-ID", ""), - } - - # Add any extra requested headers - if extra_headers: - email_obj["extraHeaders"] = { - h: headers.get(h, "") for h in extra_headers - } - - # Include body if requested - if include_body: - payload = msg.get("payload", {}) - body = parse_email_body(payload) - email_obj["body"] = { - "text": body.get("text", ""), - "html": body.get("html", ""), - } - - # Include attachment info if requested - if include_attachments_info_flag: - payload = msg.get("payload", {}) - attachments = get_attachments_info(payload) - email_obj["attachments"] = [ - { - "filename": att["filename"], - "mimeType": att["mimeType"], - "size": att.get("size", 0), - "attachmentId": att.get("attachmentId", ""), - } - for att in attachments - ] - - email_objects.append(email_obj) - - result_data = { - "emails": email_objects, - "resultCount": len(email_objects), - "query": query, - } - return [TextContent(type="text", text=json.dumps(result_data))] - raise ValueError(f"Unknown tool: {name}") return server diff --git a/tests/servers/gmail/test_list_emails_structured.py b/tests/servers/gmail/test_list_emails_structured.py index c38c82b0..4474b10f 100644 --- a/tests/servers/gmail/test_list_emails_structured.py +++ b/tests/servers/gmail/test_list_emails_structured.py @@ -1,6 +1,6 @@ -"""Unit tests for the list_emails_structured Gmail tool. +"""Unit tests for read_emails with output_format='structured'. -These tests mock the Gmail API to verify structured output format, +These tests mock the Gmail API to verify structured JSON output format, filtering, and edge cases without requiring real credentials. """ @@ -11,6 +11,9 @@ from mcp.types import CallToolRequest, CallToolRequestParams +# Default structured arguments — query is required for read_emails +_STRUCTURED_DEFAULTS = {"query": "in:inbox", "output_format": "structured"} + def _make_gmail_message( msg_id="msg_001", @@ -92,16 +95,13 @@ def _build_mock_gmail_service(messages_list_response, message_get_responses): """Build a mock Gmail service with list and get responses.""" mock_service = MagicMock() - # Chain: gmail_service.users().messages().list(...).execute() mock_list = MagicMock() mock_list.execute.return_value = messages_list_response mock_messages = MagicMock() mock_messages.list.return_value = mock_list - # Chain: gmail_service.users().messages().get(...).execute() mock_get = MagicMock() - # If multiple messages, return them in order if isinstance(message_get_responses, list): mock_get.execute.side_effect = message_get_responses else: @@ -186,7 +186,10 @@ def email_with_attachments_service(): async def _invoke_tool(mock_service, arguments=None): - """Call the list_emails_structured tool via the MCP server and return parsed JSON.""" + """Call read_emails with output_format=structured and return parsed JSON.""" + # Merge caller args on top of structured defaults + merged = {**_STRUCTURED_DEFAULTS, **(arguments or {})} + with patch( "src.servers.gmail.main.create_gmail_service", new_callable=AsyncMock, @@ -195,23 +198,47 @@ async def _invoke_tool(mock_service, arguments=None): from src.servers.gmail.main import create_server server_instance = create_server("test_user", api_key="test_key") - - # Look up the CallToolRequest handler by class key handler = server_instance.request_handlers[CallToolRequest] request = CallToolRequest( method="tools/call", params=CallToolRequestParams( - name="list_emails_structured", - arguments=arguments or {}, + name="read_emails", + arguments=merged, ), ) result = await handler(request) - # result is a ServerResult; .root is CallToolResult with .content list text = result.root.content[0].text return json.loads(text) +async def _invoke_tool_raw(mock_service, arguments=None): + """Call read_emails and return the raw text (for testing text vs structured).""" + merged = {**(arguments or {})} + if "query" not in merged: + merged["query"] = "in:inbox" + + with patch( + "src.servers.gmail.main.create_gmail_service", + new_callable=AsyncMock, + return_value=mock_service, + ): + from src.servers.gmail.main import create_server + + server_instance = create_server("test_user", api_key="test_key") + handler = server_instance.request_handlers[CallToolRequest] + + request = CallToolRequest( + method="tools/call", + params=CallToolRequestParams( + name="read_emails", + arguments=merged, + ), + ) + result = await handler(request) + return result.root.content[0].text + + @pytest.mark.asyncio async def test_basic_structured_output(single_email_service): """Structured output contains expected top-level keys and email fields.""" @@ -352,10 +379,8 @@ async def test_max_results_capped_at_100(single_email_service): @pytest.mark.asyncio async def test_extra_headers(single_email_service): """Extra headers requested via include_headers appear in extraHeaders.""" - # Add a Reply-To header to the mock message msg = single_email_service.users().messages().get().execute() msg["payload"]["headers"].append({"name": "Reply-To", "value": "reply@example.com"}) - # Reset the mock to return the updated message single_email_service.users().messages().get().execute.return_value = msg single_email_service.users().messages().get.return_value.execute.return_value = msg @@ -385,13 +410,32 @@ async def test_output_is_valid_json(single_email_service): request = CallToolRequest( method="tools/call", params=CallToolRequestParams( - name="list_emails_structured", - arguments={}, + name="read_emails", + arguments={"query": "in:inbox", "output_format": "structured"}, ), ) result = await handler(request) raw_text = result.root.content[0].text - # Must not throw parsed = json.loads(raw_text) assert isinstance(parsed, dict) assert isinstance(parsed["emails"], list) + + +@pytest.mark.asyncio +async def test_text_format_is_default(single_email_service): + """Without output_format, read_emails returns human-readable text (not JSON).""" + raw = await _invoke_tool_raw(single_email_service) + + # Text format starts with "Found N emails:" + assert raw.startswith("Found 1 emails:") + # Should NOT be valid JSON + with pytest.raises(json.JSONDecodeError): + json.loads(raw) + + +@pytest.mark.asyncio +async def test_text_format_explicit(single_email_service): + """output_format='text' returns human-readable text.""" + raw = await _invoke_tool_raw(single_email_service, {"output_format": "text"}) + assert "Found 1 emails:" in raw + assert "From: alice@example.com" in raw