Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Git
.git/
.github/

# Environment and secrets
.env
.env.*
.envprod.yaml
.envstage.yaml

# Local credentials
local_auth/
credentials.json
oauth.keys.json

# IDE
.vscode/
.cursor/
.idea/

# Development
scratch/
tests/
*.log
__pycache__/
*.py[cod]
.pytest_cache/

# Documentation
*.md
!README.md
6 changes: 5 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,8 @@ NANGO_HOST=https://api.nango.dev

STORAGE_PROVIDER=local # "local" for development, "gcs" for production
LOCAL_STORAGE_DIR=/tmp/pfmcp-attachments # local provider only
GCS_BUCKET_NAME=your-bucket-name # gcs provider only
GCS_BUCKET_NAME=your-bucket-name # gcs provider only

# Delivery tracking (optional — watch/webhook only set up when defined)
GMAIL_PUBSUB_TOPIC=projects/your-project/topics/gmail-notifications
OUTLOOK_WEBHOOK_URL=https://your-domain.com/api/outlook/webhook
2 changes: 2 additions & 0 deletions .github/workflows/production.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ jobs:
echo "NANGO_SECRET_KEY: ${{ secrets.PROD_NANGO_SECRET_KEY }}" >> .envprod.yaml
echo 'PEAKFLO_API_BASE_URL: "https://api.peakflo.co/v1"' >> .envprod.yaml
echo "GCS_BUCKET_NAME: ${{ vars.PROD_GCS_BUCKET_NAME }}" >> .envprod.yaml
echo "GMAIL_PUBSUB_TOPIC: ${{ vars.PROD_GMAIL_PUBSUB_TOPIC }}" >> .envprod.yaml
echo "OUTLOOK_WEBHOOK_URL: ${{ vars.PROD_OUTLOOK_WEBHOOK_URL }}" >> .envprod.yaml

- name: GCP Auth
uses: google-github-actions/auth@v2.1.2
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/stage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ jobs:
echo "NANGO_SECRET_KEY: ${{ secrets.INT_NANGO_SECRET_KEY }}" >> .envstage.yaml
echo 'PEAKFLO_API_BASE_URL: "https://stage-api.peakflo.co/v1"' >> .envstage.yaml
echo "GCS_BUCKET_NAME: ${{ vars.INT_GCS_BUCKET_NAME }}" >> .envstage.yaml
echo "GMAIL_PUBSUB_TOPIC: ${{ vars.INT_GMAIL_PUBSUB_TOPIC }}" >> .envstage.yaml
echo "OUTLOOK_WEBHOOK_URL: ${{ vars.INT_OUTLOOK_WEBHOOK_URL }}" >> .envstage.yaml

- name: GCP Auth
uses: google-github-actions/auth@v2.1.2
Expand Down
2 changes: 1 addition & 1 deletion src/servers/gmail/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ tools:
- name: "read_emails"
description: "Search and read emails in Gmail with full text body and attachment information"
- name: "send_email"
description: "Send an email through Gmail with optional attachments"
description: "Send an email through Gmail with optional attachments and delivery tracking"
- name: "forward_email"
description: "Forward an email to recipients, preserving original content and attachments"
- name: "update_email"
Expand Down
49 changes: 48 additions & 1 deletion src/servers/gmail/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import os
import sys
from typing import Optional, Iterable
Expand Down Expand Up @@ -432,6 +433,10 @@ async def handle_list_tools() -> list[Tool]:
"required": ["filename", "content", "mimeType"],
},
},
"track_delivery": {
"type": "boolean",
"description": "When true, returns structured JSON with channelMessageId and conversationId for delivery tracking",
},
},
"required": ["to", "subject", "body"],
},
Expand Down Expand Up @@ -660,15 +665,57 @@ async def handle_call_tool(
.execute()
)

track_delivery = arguments.get("track_delivery", False)

attachment_info = ""
if arguments.get("attachments"):
num_attachments = len(arguments["attachments"])
attachment_info = f" with {num_attachments} attachment(s)"

if track_delivery:
# Ensure Gmail push notifications are active so delivery
# events (bounces, reads, etc.) are forwarded via Pub/Sub.
# users().watch() is idempotent — safe to call on every send.
pubsub_topic = os.environ.get("GMAIL_PUBSUB_TOPIC")
if pubsub_topic:
try:
watch_response = (
gmail_service.users()
.watch(
userId="me",
body={
"topicName": pubsub_topic,
"labelIds": ["SENT", "INBOX"],
"labelFilterBehavior": "INCLUDE",
},
)
.execute()
)
logger.info(
f"Gmail watch active — historyId={watch_response.get('historyId')}, "
f"expiration={watch_response.get('expiration')}"
)
except Exception as watch_err:
logger.warning(
f"Failed to set Gmail watch (non-blocking): {watch_err}"
)
else:
logger.debug("GMAIL_PUBSUB_TOPIC not set, skipping watch setup")

# Always return structured JSON with tracking fields.
# channelMessageId maps to workflo's messages.channel_message_id column
# for matching delivery events back to the sent message.
# conversationId maps to Gmail's threadId — used to group messages
# in the same email thread.
result_data = {
"status": "sent",
"channelMessageId": sent_message.get("id", ""),
"conversationId": sent_message.get("threadId", ""),
}
return [
TextContent(
type="text",
text=f"Email sent successfully to {arguments['to']}{attachment_info}. Message ID: {sent_message['id']}",
text=json.dumps(result_data),
)
]
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion src/servers/outlook/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ tools:
- name: "read_emails"
description: "Read emails from Outlook. Fetches emails based on specified filters."
- name: "send_email"
description: "Send an email using Outlook"
description: "Send an email using Outlook with optional delivery tracking"
- name: "move_email"
description: "Move an email to a different folder like inbox, sentitems, drafts using Outlook"
- name: "forward_email"
Expand Down
187 changes: 157 additions & 30 deletions src/servers/outlook/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import json
import logging
from datetime import datetime, timedelta, timezone
from html import unescape
from pathlib import Path

Expand Down Expand Up @@ -62,6 +63,67 @@ async def create_outlook_client(user_id, api_key=None):
raise


def _ensure_outlook_subscription(headers: dict, webhook_url: str) -> None:
"""Ensure an active Microsoft Graph subscription exists for mail messages.

Outlook subscriptions are NOT idempotent — we check for an existing one
before creating. The subscription covers created/updated messages so we
can track delivery events (bounces, read receipts, etc.).

Max subscription lifetime for mail resources is 4230 minutes (~2.9 days).
"""
MAIL_RESOURCE = "me/messages"
MAX_EXPIRY_MINUTES = 4230 # Graph API maximum for mail resources

# Check for existing subscriptions on the mail resource
list_resp = requests.get(
"https://graph.microsoft.com/v1.0/subscriptions",
headers=headers,
)

if list_resp.status_code == 200:
for sub in list_resp.json().get("value", []):
if (
sub.get("resource") == MAIL_RESOURCE
and sub.get("notificationUrl") == webhook_url
):
# Already have an active subscription for this resource + webhook
logger.info(
f"Outlook subscription already active — id={sub.get('id')}, "
f"expires={sub.get('expirationDateTime')}"
)
return

# No matching subscription found — create one
expiration = (
datetime.now(timezone.utc) + timedelta(minutes=MAX_EXPIRY_MINUTES)
).strftime("%Y-%m-%dT%H:%M:%S.0000000Z")

subscription_body = {
"changeType": "created,updated",
"notificationUrl": webhook_url,
"resource": MAIL_RESOURCE,
"expirationDateTime": expiration,
"clientState": "delivery-tracking",
}

create_resp = requests.post(
"https://graph.microsoft.com/v1.0/subscriptions",
headers=headers,
data=json.dumps(subscription_body),
)

if create_resp.status_code == 201:
sub_data = create_resp.json()
logger.info(
f"Outlook subscription created — id={sub_data.get('id')}, "
f"expires={sub_data.get('expirationDateTime')}"
)
else:
error_msg = create_resp.json().get("error", {}).get("message", create_resp.text)
logger.warning(f"Failed to create Outlook subscription: {error_msg}")


def create_server(user_id, api_key=None):
"""Create a new server instance with optional user context"""
server = Server("outlook-server")
Expand Down Expand Up @@ -229,6 +291,10 @@ async def handle_list_tools() -> list[Tool]:
"type": "string",
"description": "BCC email addresses (comma-separated)",
},
"track_delivery": {
"type": "boolean",
"description": "When true, uses draft-then-send to return structured JSON with channelMessageId and conversationId for delivery tracking",
},
},
"required": ["to", "subject", "body"],
},
Expand Down Expand Up @@ -520,56 +586,117 @@ async def handle_call_tool(
if email.strip()
]

# Prepare the email payload
email_payload = {
"message": {
"subject": subject,
"body": {"contentType": "Text", "content": body},
"toRecipients": to_list,
"ccRecipients": cc_list,
"bccRecipients": bcc_list,
"internetMessageHeaders": [
{"name": "X-Mailer", "value": "Microsoft Graph API"}
],
},
"saveToSentItems": "true",
}
track_delivery = arguments.get("track_delivery", False)

headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}

# Log the request details
logger.info(f"Sending email with payload: {email_payload}")
message_body = {
"subject": subject,
"body": {"contentType": "Text", "content": body},
"toRecipients": to_list,
"ccRecipients": cc_list,
"bccRecipients": bcc_list,
"internetMessageHeaders": [
{"name": "X-Mailer", "value": "Microsoft Graph API"}
],
}

# Always use draft-then-send flow to capture message IDs.
# /me/sendMail returns 202 with empty body (no IDs available),
# so draft-then-send is required to get channelMessageId and
# conversationId regardless of whether tracking is enabled.
logger.info("Using draft-then-send flow to capture message IDs")

response = requests.post(
"https://graph.microsoft.com/v1.0/me/sendMail",
# Step 1: Create draft
draft_response = requests.post(
"https://graph.microsoft.com/v1.0/me/messages",
headers=headers,
data=json.dumps(email_payload),
data=json.dumps(message_body),
)

# Log the response
logger.info(f"Response status code: {response.status_code}")
logger.info(f"Response content: {response.content}")

if response.status_code in [200, 202]:
if draft_response.status_code != 201:
error_message = (
draft_response.json()
.get("error", {})
.get("message", "Unknown error")
)
return [
TextContent(
type="text",
text=f"Email sent successfully to {', '.join(to_recipients)}",
text=f"Failed to create draft: {error_message}",
)
]
else:
error_message = (
response.json().get("error", {}).get("message", "Unknown error")
)

draft = draft_response.json()
draft_id = draft.get("id", "")
# internetMessageId is the RFC 2822 Message-ID header — it is
# stable across draft→send (unlike the Graph object ID which
# changes when the message moves from Drafts to Sent Items).
# This is the value we store as channelMessageId so that
# webhook notifications can be matched back to this message.
internet_message_id = draft.get("internetMessageId", "")
conversation_id = draft.get("conversationId", "")

# Step 2: Send the draft
send_response = requests.post(
f"https://graph.microsoft.com/v1.0/me/messages/{draft_id}/send",
headers=headers,
)

if send_response.status_code not in [200, 202]:
error_message = "Failed to send draft"
try:
error_message = (
send_response.json()
.get("error", {})
.get("message", error_message)
)
except Exception:
pass
return [
TextContent(
type="text", text=f"Failed to send email: {error_message}"
type="text",
text=f"Failed to send email: {error_message}",
)
]

if track_delivery:
# Ensure Outlook change-notification subscription is active
# so delivery events are forwarded to our webhook endpoint.
webhook_url = os.environ.get("OUTLOOK_WEBHOOK_URL")
if webhook_url:
try:
_ensure_outlook_subscription(headers, webhook_url)
except Exception as sub_err:
logger.warning(
f"Failed to set Outlook subscription (non-blocking): {sub_err}"
)
else:
logger.debug(
"OUTLOOK_WEBHOOK_URL not set, skipping subscription setup"
)

# Always return structured JSON with tracking fields.
# channelMessageId uses internetMessageId (RFC 2822 Message-ID)
# which is stable across draft→send, unlike the Graph object ID
# that changes when the message moves to Sent Items.
# This maps to workflo's messages.channel_message_id column
# for matching webhook delivery events back to the sent message.
result_data = {
"status": "sent",
"channelMessageId": internet_message_id,
"conversationId": conversation_id,
}
return [
TextContent(
type="text",
text=json.dumps(result_data),
)
]

except Exception as e:
logger.error(f"Error in send_email: {str(e)}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
Expand Down
Loading
Loading