diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..8d60f1a --- /dev/null +++ b/.dockerignore @@ -0,0 +1,31 @@ +# Git +.git/ +.github/ + +# Environment and secrets +.env +.env.* +.envprod.yaml +.envstage.yaml + +# Local credentials +local_auth/ +credentials.json +oauth.keys.json + +# IDE +.vscode/ +.cursor/ +.idea/ + +# Development +scratch/ +tests/ +*.log +__pycache__/ +*.py[cod] +.pytest_cache/ + +# Documentation +*.md +!README.md diff --git a/.env.example b/.env.example index 9154354..c15c182 100644 --- a/.env.example +++ b/.env.example @@ -6,4 +6,8 @@ NANGO_HOST=https://api.nango.dev STORAGE_PROVIDER=local # "local" for development, "gcs" for production LOCAL_STORAGE_DIR=/tmp/pfmcp-attachments # local provider only -GCS_BUCKET_NAME=your-bucket-name # gcs provider only \ No newline at end of file +GCS_BUCKET_NAME=your-bucket-name # gcs provider only + +# Delivery tracking (optional — watch/webhook only set up when defined) +GMAIL_PUBSUB_TOPIC=projects/your-project/topics/gmail-notifications +OUTLOOK_WEBHOOK_URL=https://your-domain.com/api/outlook/webhook \ No newline at end of file diff --git a/.github/workflows/production.yml b/.github/workflows/production.yml index 42c866d..1640e28 100644 --- a/.github/workflows/production.yml +++ b/.github/workflows/production.yml @@ -30,6 +30,8 @@ jobs: echo "NANGO_SECRET_KEY: ${{ secrets.PROD_NANGO_SECRET_KEY }}" >> .envprod.yaml echo 'PEAKFLO_API_BASE_URL: "https://api.peakflo.co/v1"' >> .envprod.yaml echo "GCS_BUCKET_NAME: ${{ vars.PROD_GCS_BUCKET_NAME }}" >> .envprod.yaml + echo "GMAIL_PUBSUB_TOPIC: ${{ vars.PROD_GMAIL_PUBSUB_TOPIC }}" >> .envprod.yaml + echo "OUTLOOK_WEBHOOK_URL: ${{ vars.PROD_OUTLOOK_WEBHOOK_URL }}" >> .envprod.yaml - name: GCP Auth uses: google-github-actions/auth@v2.1.2 diff --git a/.github/workflows/stage.yml b/.github/workflows/stage.yml index f47a86e..82b829c 100644 --- a/.github/workflows/stage.yml +++ b/.github/workflows/stage.yml @@ -30,6 +30,8 @@ jobs: echo "NANGO_SECRET_KEY: ${{ secrets.INT_NANGO_SECRET_KEY }}" >> .envstage.yaml echo 'PEAKFLO_API_BASE_URL: "https://stage-api.peakflo.co/v1"' >> .envstage.yaml echo "GCS_BUCKET_NAME: ${{ vars.INT_GCS_BUCKET_NAME }}" >> .envstage.yaml + echo "GMAIL_PUBSUB_TOPIC: ${{ vars.INT_GMAIL_PUBSUB_TOPIC }}" >> .envstage.yaml + echo "OUTLOOK_WEBHOOK_URL: ${{ vars.INT_OUTLOOK_WEBHOOK_URL }}" >> .envstage.yaml - name: GCP Auth uses: google-github-actions/auth@v2.1.2 diff --git a/src/servers/gmail/config.yaml b/src/servers/gmail/config.yaml index 3d46b37..f46a9d2 100644 --- a/src/servers/gmail/config.yaml +++ b/src/servers/gmail/config.yaml @@ -6,7 +6,7 @@ tools: - name: "read_emails" description: "Search and read emails in Gmail with full text body and attachment information" - name: "send_email" - description: "Send an email through Gmail with optional attachments" + description: "Send an email through Gmail with optional attachments and delivery tracking" - name: "forward_email" description: "Forward an email to recipients, preserving original content and attachments" - name: "update_email" diff --git a/src/servers/gmail/main.py b/src/servers/gmail/main.py index adc72af..db1b663 100644 --- a/src/servers/gmail/main.py +++ b/src/servers/gmail/main.py @@ -1,3 +1,4 @@ +import json import os import sys from typing import Optional, Iterable @@ -432,6 +433,10 @@ async def handle_list_tools() -> list[Tool]: "required": ["filename", "content", "mimeType"], }, }, + "track_delivery": { + "type": "boolean", + "description": "When true, returns structured JSON with channelMessageId and conversationId for delivery tracking", + }, }, "required": ["to", "subject", "body"], }, @@ -660,15 +665,57 @@ async def handle_call_tool( .execute() ) + track_delivery = arguments.get("track_delivery", False) + attachment_info = "" if arguments.get("attachments"): num_attachments = len(arguments["attachments"]) attachment_info = f" with {num_attachments} attachment(s)" + if track_delivery: + # Ensure Gmail push notifications are active so delivery + # events (bounces, reads, etc.) are forwarded via Pub/Sub. + # users().watch() is idempotent — safe to call on every send. + pubsub_topic = os.environ.get("GMAIL_PUBSUB_TOPIC") + if pubsub_topic: + try: + watch_response = ( + gmail_service.users() + .watch( + userId="me", + body={ + "topicName": pubsub_topic, + "labelIds": ["SENT", "INBOX"], + "labelFilterBehavior": "INCLUDE", + }, + ) + .execute() + ) + logger.info( + f"Gmail watch active — historyId={watch_response.get('historyId')}, " + f"expiration={watch_response.get('expiration')}" + ) + except Exception as watch_err: + logger.warning( + f"Failed to set Gmail watch (non-blocking): {watch_err}" + ) + else: + logger.debug("GMAIL_PUBSUB_TOPIC not set, skipping watch setup") + + # Always return structured JSON with tracking fields. + # channelMessageId maps to workflo's messages.channel_message_id column + # for matching delivery events back to the sent message. + # conversationId maps to Gmail's threadId — used to group messages + # in the same email thread. + result_data = { + "status": "sent", + "channelMessageId": sent_message.get("id", ""), + "conversationId": sent_message.get("threadId", ""), + } return [ TextContent( type="text", - text=f"Email sent successfully to {arguments['to']}{attachment_info}. Message ID: {sent_message['id']}", + text=json.dumps(result_data), ) ] except Exception as e: diff --git a/src/servers/outlook/config.yaml b/src/servers/outlook/config.yaml index 4254565..856380f 100644 --- a/src/servers/outlook/config.yaml +++ b/src/servers/outlook/config.yaml @@ -6,7 +6,7 @@ tools: - name: "read_emails" description: "Read emails from Outlook. Fetches emails based on specified filters." - name: "send_email" - description: "Send an email using Outlook" + description: "Send an email using Outlook with optional delivery tracking" - name: "move_email" description: "Move an email to a different folder like inbox, sentitems, drafts using Outlook" - name: "forward_email" diff --git a/src/servers/outlook/main.py b/src/servers/outlook/main.py index 9c6a079..907c25a 100644 --- a/src/servers/outlook/main.py +++ b/src/servers/outlook/main.py @@ -12,6 +12,7 @@ import json import logging +from datetime import datetime, timedelta, timezone from html import unescape from pathlib import Path @@ -62,6 +63,67 @@ async def create_outlook_client(user_id, api_key=None): raise +def _ensure_outlook_subscription(headers: dict, webhook_url: str) -> None: + """Ensure an active Microsoft Graph subscription exists for mail messages. + + Outlook subscriptions are NOT idempotent — we check for an existing one + before creating. The subscription covers created/updated messages so we + can track delivery events (bounces, read receipts, etc.). + + Max subscription lifetime for mail resources is 4230 minutes (~2.9 days). + """ + MAIL_RESOURCE = "me/messages" + MAX_EXPIRY_MINUTES = 4230 # Graph API maximum for mail resources + + # Check for existing subscriptions on the mail resource + list_resp = requests.get( + "https://graph.microsoft.com/v1.0/subscriptions", + headers=headers, + ) + + if list_resp.status_code == 200: + for sub in list_resp.json().get("value", []): + if ( + sub.get("resource") == MAIL_RESOURCE + and sub.get("notificationUrl") == webhook_url + ): + # Already have an active subscription for this resource + webhook + logger.info( + f"Outlook subscription already active — id={sub.get('id')}, " + f"expires={sub.get('expirationDateTime')}" + ) + return + + # No matching subscription found — create one + expiration = ( + datetime.now(timezone.utc) + timedelta(minutes=MAX_EXPIRY_MINUTES) + ).strftime("%Y-%m-%dT%H:%M:%S.0000000Z") + + subscription_body = { + "changeType": "created,updated", + "notificationUrl": webhook_url, + "resource": MAIL_RESOURCE, + "expirationDateTime": expiration, + "clientState": "delivery-tracking", + } + + create_resp = requests.post( + "https://graph.microsoft.com/v1.0/subscriptions", + headers=headers, + data=json.dumps(subscription_body), + ) + + if create_resp.status_code == 201: + sub_data = create_resp.json() + logger.info( + f"Outlook subscription created — id={sub_data.get('id')}, " + f"expires={sub_data.get('expirationDateTime')}" + ) + else: + error_msg = create_resp.json().get("error", {}).get("message", create_resp.text) + logger.warning(f"Failed to create Outlook subscription: {error_msg}") + + def create_server(user_id, api_key=None): """Create a new server instance with optional user context""" server = Server("outlook-server") @@ -229,6 +291,10 @@ async def handle_list_tools() -> list[Tool]: "type": "string", "description": "BCC email addresses (comma-separated)", }, + "track_delivery": { + "type": "boolean", + "description": "When true, uses draft-then-send to return structured JSON with channelMessageId and conversationId for delivery tracking", + }, }, "required": ["to", "subject", "body"], }, @@ -520,56 +586,117 @@ async def handle_call_tool( if email.strip() ] - # Prepare the email payload - email_payload = { - "message": { - "subject": subject, - "body": {"contentType": "Text", "content": body}, - "toRecipients": to_list, - "ccRecipients": cc_list, - "bccRecipients": bcc_list, - "internetMessageHeaders": [ - {"name": "X-Mailer", "value": "Microsoft Graph API"} - ], - }, - "saveToSentItems": "true", - } + track_delivery = arguments.get("track_delivery", False) headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", } - # Log the request details - logger.info(f"Sending email with payload: {email_payload}") + message_body = { + "subject": subject, + "body": {"contentType": "Text", "content": body}, + "toRecipients": to_list, + "ccRecipients": cc_list, + "bccRecipients": bcc_list, + "internetMessageHeaders": [ + {"name": "X-Mailer", "value": "Microsoft Graph API"} + ], + } + + # Always use draft-then-send flow to capture message IDs. + # /me/sendMail returns 202 with empty body (no IDs available), + # so draft-then-send is required to get channelMessageId and + # conversationId regardless of whether tracking is enabled. + logger.info("Using draft-then-send flow to capture message IDs") - response = requests.post( - "https://graph.microsoft.com/v1.0/me/sendMail", + # Step 1: Create draft + draft_response = requests.post( + "https://graph.microsoft.com/v1.0/me/messages", headers=headers, - data=json.dumps(email_payload), + data=json.dumps(message_body), ) - # Log the response - logger.info(f"Response status code: {response.status_code}") - logger.info(f"Response content: {response.content}") - - if response.status_code in [200, 202]: + if draft_response.status_code != 201: + error_message = ( + draft_response.json() + .get("error", {}) + .get("message", "Unknown error") + ) return [ TextContent( type="text", - text=f"Email sent successfully to {', '.join(to_recipients)}", + text=f"Failed to create draft: {error_message}", ) ] - else: - error_message = ( - response.json().get("error", {}).get("message", "Unknown error") - ) + + draft = draft_response.json() + draft_id = draft.get("id", "") + # internetMessageId is the RFC 2822 Message-ID header — it is + # stable across draft→send (unlike the Graph object ID which + # changes when the message moves from Drafts to Sent Items). + # This is the value we store as channelMessageId so that + # webhook notifications can be matched back to this message. + internet_message_id = draft.get("internetMessageId", "") + conversation_id = draft.get("conversationId", "") + + # Step 2: Send the draft + send_response = requests.post( + f"https://graph.microsoft.com/v1.0/me/messages/{draft_id}/send", + headers=headers, + ) + + if send_response.status_code not in [200, 202]: + error_message = "Failed to send draft" + try: + error_message = ( + send_response.json() + .get("error", {}) + .get("message", error_message) + ) + except Exception: + pass return [ TextContent( - type="text", text=f"Failed to send email: {error_message}" + type="text", + text=f"Failed to send email: {error_message}", ) ] + if track_delivery: + # Ensure Outlook change-notification subscription is active + # so delivery events are forwarded to our webhook endpoint. + webhook_url = os.environ.get("OUTLOOK_WEBHOOK_URL") + if webhook_url: + try: + _ensure_outlook_subscription(headers, webhook_url) + except Exception as sub_err: + logger.warning( + f"Failed to set Outlook subscription (non-blocking): {sub_err}" + ) + else: + logger.debug( + "OUTLOOK_WEBHOOK_URL not set, skipping subscription setup" + ) + + # Always return structured JSON with tracking fields. + # channelMessageId uses internetMessageId (RFC 2822 Message-ID) + # which is stable across draft→send, unlike the Graph object ID + # that changes when the message moves to Sent Items. + # This maps to workflo's messages.channel_message_id column + # for matching webhook delivery events back to the sent message. + result_data = { + "status": "sent", + "channelMessageId": internet_message_id, + "conversationId": conversation_id, + } + return [ + TextContent( + type="text", + text=json.dumps(result_data), + ) + ] + except Exception as e: logger.error(f"Error in send_email: {str(e)}") return [TextContent(type="text", text=f"Error: {str(e)}")] diff --git a/src/servers/remote.py b/src/servers/remote.py index 0baa4bf..9b3b103 100644 --- a/src/servers/remote.py +++ b/src/servers/remote.py @@ -1,4 +1,5 @@ import logging +import os import uvicorn import argparse import importlib.util @@ -14,6 +15,9 @@ from starlette.types import Receive, Scope, Send from prometheus_client import Counter, Gauge, generate_latest, CONTENT_TYPE_LATEST +# Production mode: set DEBUG=true in environment to enable debug mode +DEBUG_MODE = os.environ.get("DEBUG", "false").lower() == "true" + from mcp.server.lowlevel import Server from mcp.server import streamable_http_manager @@ -111,7 +115,7 @@ async def metrics_endpoint(request): routes = [Route("/metrics", endpoint=metrics_endpoint)] app = Starlette( - debug=True, + debug=DEBUG_MODE, routes=routes, ) @@ -214,15 +218,15 @@ async def handle_server_request( logger.info(f"Added stateless routes for server: {server_name}") - # Health checks + # Health checks — do not expose server list in unauthenticated endpoints async def root_handler(request): """Root endpoint that returns a simple 200 OK response""" return JSONResponse( { "status": "ok", - "message": "guMCP stateless server running", - "servers": list(servers.keys()), + "message": "pfMCP server running", "mode": "stateless", + "server_count": len(servers), } ) @@ -231,7 +235,7 @@ async def root_handler(request): async def health_check(request): """Health check endpoint""" return JSONResponse( - {"status": "ok", "servers": list(servers.keys()), "mode": "stateless"} + {"status": "ok", "mode": "stateless", "server_count": len(servers)} ) routes.append(Route("/health_check", endpoint=health_check)) @@ -246,7 +250,7 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]: logger.info("Application shutting down...") app = Starlette( - debug=True, + debug=DEBUG_MODE, routes=routes, lifespan=lifespan, )