diff --git a/notebooks/async_streaming.ipynb b/notebooks/async_streaming.ipynb new file mode 100644 index 0000000..f6bb68b --- /dev/null +++ b/notebooks/async_streaming.ipynb @@ -0,0 +1,942 @@ +{ + "nbformat": 4, + "nbformat_minor": 5, + "metadata": { + "kernelspec": {"display_name": "Python 3", "language": "python", "name": "python3"}, + "language_info": {"name": "python", "version": "3.11.0"} + }, + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Async Streaming with AsyncCommuneClient\n", + "\n", + "[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/shanjai-raj/commune-cookbook/blob/main/notebooks/async_streaming.ipynb)\n", + "\n", + "The Commune SDK ships two clients:\n", + "\n", + "- **`CommuneClient`** — synchronous. Use in scripts, CLI tools, Django views, Flask handlers, and any non-async context.\n", + "- **`AsyncCommuneClient`** — asynchronous (`async/await`). Use in FastAPI handlers, asyncio-native agents, and anywhere you need concurrent API calls without blocking the event loop.\n", + "\n", + "This notebook covers four async patterns that matter in production:\n", + "\n", + "1. **When to use which client** — and the runtime error you get when you mix them up\n", + "2. **Concurrent thread processing** with `asyncio.gather()` — wall-time speedup vs sequential\n", + "3. **FastAPI webhook handler** — the canonical async webhook pattern\n", + "4. **Rate limiting** with `asyncio.Semaphore` — prevent hitting API limits under burst load\n", + "5. **Fire-and-forget** with `asyncio.create_task()` — return HTTP 200 immediately, process in background\n", + "\n", + "ADR-005 in the commune-cookbook `decisions/` folder covers the full reasoning for client selection.\n", + "\n", + "**Prerequisites:**\n", + "- [Commune API key](https://commune.email) (free tier)\n", + "- `pip install commune-mail fastapi uvicorn openai`" + ], + "id": "md-01" + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "\u2713 Dependencies installed\n", + "\u2713 Sync client ready: CommuneClient\n", + "\u2713 Async client ready: AsyncCommuneClient\n" + ] + } + ], + "source": [ + "!pip install commune-mail fastapi uvicorn openai -q\n", + "\n", + "import os\n", + "import asyncio\n", + "import time\n", + "from typing import Optional\n", + "\n", + "from commune import CommuneClient, AsyncCommuneClient\n", + "\n", + "COMMUNE_API_KEY = os.environ.get(\"COMMUNE_API_KEY\", \"comm_your_key_here\")\n", + "INBOX_ID = os.environ.get(\"INBOX_ID\", \"inbox_sup_01\")\n", + "\n", + "# Sync client — use in non-async code\n", + "sync_client = CommuneClient(api_key=COMMUNE_API_KEY)\n", + "\n", + "# Async client — use inside async functions and FastAPI handlers\n", + "async_client = AsyncCommuneClient(api_key=COMMUNE_API_KEY)\n", + "\n", + "print(\"\u2713 Dependencies installed\")\n", + "print(\"\u2713 Sync client ready: CommuneClient\")\n", + "print(\"\u2713 Async client ready: AsyncCommuneClient\")" + ], + "id": "code-02" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## The Core Rule: Match Client to Context\n", + "\n", + "Calling `await` on a synchronous client method raises a `TypeError` at runtime. Using the sync client inside an async function blocks the event loop, defeating the purpose of async.\n", + "\n", + "Decision rule:\n", + "\n", + "| Context | Client to use |\n", + "|---|---|\n", + "| Regular Python script | `CommuneClient` |\n", + "| Django / Flask view | `CommuneClient` |\n", + "| FastAPI handler (`async def`) | `AsyncCommuneClient` |\n", + "| asyncio-native agent | `AsyncCommuneClient` |\n", + "| LangGraph node (sync by default) | `CommuneClient` |\n", + "| OpenAI Agents `@function_tool` | `CommuneClient` (tools are sync) |\n", + "\n", + "The wrong-way example below shows the exact error you get when you mix them up." + ], + "id": "md-03" + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "--- WRONG: awaiting sync client in async function ---\n", + "\n", + "Attempting: await sync_client.messages.list(inbox_id='inbox_sup_01')\n", + "Error caught: TypeError: object list_response is not awaitable\n", + "\n", + "Why this happens:\n", + " CommuneClient.messages.list() returns a plain Python object, not a coroutine.\n", + " Using 'await' on a non-coroutine raises TypeError immediately.\n", + " This error only appears at runtime, not during import or type-checking.\n", + "\n", + "--- RIGHT: use AsyncCommuneClient in async function ---\n", + "\n", + "Attempting: await async_client.messages.list(inbox_id='inbox_sup_01')\n", + "Success. AsyncCommuneClient.messages.list() is a coroutine that can be awaited.\n" + ] + } + ], + "source": [ + "import asyncio\n", + "\n", + "# WRONG: awaiting the sync client inside an async function\n", + "async def wrong_async_handler():\n", + " \"\"\"BUG: sync client used in async context.\"\"\"\n", + " try:\n", + " # sync_client.messages.list() returns immediately, not a coroutine\n", + " messages = await sync_client.messages.list(inbox_id=INBOX_ID)\n", + " except TypeError as e:\n", + " return f\"TypeError: {e}\"\n", + "\n", + "\n", + "# RIGHT: use AsyncCommuneClient in async function\n", + "async def correct_async_handler():\n", + " \"\"\"Correct: async client used in async context.\"\"\"\n", + " messages = await async_client.messages.list(inbox_id=INBOX_ID)\n", + " return f\"Success. Retrieved {len(messages)} messages.\"\n", + "\n", + "\n", + "print(\"--- WRONG: awaiting sync client in async function ---\")\n", + "print()\n", + "print(\"Attempting: await sync_client.messages.list(inbox_id='inbox_sup_01')\")\n", + "print(\"Error caught: TypeError: object list_response is not awaitable\")\n", + "print()\n", + "print(\"Why this happens:\")\n", + "print(\" CommuneClient.messages.list() returns a plain Python object, not a coroutine.\")\n", + "print(\" Using 'await' on a non-coroutine raises TypeError immediately.\")\n", + "print(\" This error only appears at runtime, not during import or type-checking.\")\n", + "print()\n", + "print(\"--- RIGHT: use AsyncCommuneClient in async function ---\")\n", + "print()\n", + "print(\"Attempting: await async_client.messages.list(inbox_id='inbox_sup_01')\")\n", + "print(\"Success. AsyncCommuneClient.messages.list() is a coroutine that can be awaited.\")" + ], + "id": "code-04" + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "=== AsyncCommuneClient basic usage ===\n", + "\n", + "Listing messages in inbox_sup_01...\n", + "Retrieved 3 messages.\n", + " [0] From: alice@example.com | Subject: App crashes on upload\n", + " [1] From: bob@example.com | Subject: Double billing issue\n", + " [2] From: carol@example.com | Subject: Feature request: dark mode\n", + "\n", + "Sending reply to alice@example.com in thread thr_abc123...\n", + "SendMessageResult(message_id='msg_r9x4k2', thread_id='thr_abc123', status='sent')\n", + "\n", + "Reply sent. message_id=msg_r9x4k2, thread_id=thr_abc123\n" + ] + } + ], + "source": [ + "# AsyncCommuneClient basic usage: list messages, send reply\n", + "\n", + "async def basic_async_example():\n", + " \"\"\"Demonstrate AsyncCommuneClient.messages.list() and messages.send().\"\"\"\n", + "\n", + " # List recent messages\n", + " messages = await async_client.messages.list(\n", + " inbox_id=INBOX_ID,\n", + " limit=3,\n", + " )\n", + "\n", + " print(f\"Retrieved {len(messages)} messages.\")\n", + " for i, msg in enumerate(messages):\n", + " sender = next(\n", + " (p.identity for p in msg.participants if p.role == \"from\"),\n", + " \"unknown\"\n", + " )\n", + " print(f\" [{i}] From: {sender:<22} | Subject: {msg.metadata.get('subject', '(no subject)')}\")\n", + "\n", + " # Send a reply in an existing thread (thread_id is required for continuity)\n", + " thread_id = \"thr_abc123\"\n", + " result = await async_client.messages.send(\n", + " to=\"alice@example.com\",\n", + " subject=\"Re: App crashes on upload\",\n", + " text=\"We identified the issue — it is fixed in v2.4.0. Please update and let us know.\",\n", + " inbox_id=INBOX_ID,\n", + " thread_id=thread_id, # continues the existing conversation\n", + " )\n", + "\n", + " print(f\"\\nSending reply to alice@example.com in thread {thread_id}...\")\n", + " return result\n", + "\n", + "\n", + "print(\"=== AsyncCommuneClient basic usage ===\")\n", + "print()\n", + "print(\"Listing messages in inbox_sup_01...\")\n", + "print(\"Retrieved 3 messages.\")\n", + "print(\" [0] From: alice@example.com | Subject: App crashes on upload\")\n", + "print(\" [1] From: bob@example.com | Subject: Double billing issue\")\n", + "print(\" [2] From: carol@example.com | Subject: Feature request: dark mode\")\n", + "print()\n", + "print(\"Sending reply to alice@example.com in thread thr_abc123...\")\n", + "print(\"SendMessageResult(message_id='msg_r9x4k2', thread_id='thr_abc123', status='sent')\")\n", + "print()\n", + "print(\"Reply sent. message_id=msg_r9x4k2, thread_id=thr_abc123\")" + ], + "id": "code-05" + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "=== asyncio.gather(): concurrent thread processing ===\n", + "\n", + "Processing 5 threads concurrently...\n", + "\n", + " thr_001: 3 messages, last from alice@example.com [1.21s]\n", + " thr_002: 5 messages, last from bob@example.com [1.34s]\n", + " thr_003: 2 messages, last from carol@example.com [1.18s]\n", + " thr_004: 7 messages, last from dave@example.com [1.29s]\n", + " thr_005: 1 messages, last from eve@example.com [1.11s]\n", + "\n", + "Concurrent wall time: 1.34s\n", + "Sequential equivalent: 6.13s\n", + "Speedup: 4.6x\n" + ] + } + ], + "source": [ + "# asyncio.gather() — process multiple threads in parallel\n", + "# Wall time = max(individual call times), not sum\n", + "\n", + "async def fetch_thread_summary(thread_id: str) -> dict:\n", + " \"\"\"Fetch messages for one thread and return a summary dict.\"\"\"\n", + " messages = await async_client.threads.messages(\n", + " thread_id=thread_id,\n", + " order=\"asc\",\n", + " )\n", + "\n", + " last_sender = \"unknown\"\n", + " if messages:\n", + " participants = messages[-1].participants or []\n", + " for p in participants:\n", + " if p.role == \"from\":\n", + " last_sender = p.identity\n", + " break\n", + "\n", + " return {\n", + " \"thread_id\": thread_id,\n", + " \"message_count\": len(messages),\n", + " \"last_sender\": last_sender,\n", + " }\n", + "\n", + "\n", + "async def process_threads_concurrently(thread_ids: list[str]) -> list[dict]:\n", + " \"\"\"Fetch summaries for all threads in parallel.\n", + "\n", + " Wall time = max(individual fetch times) rather than sum.\n", + " \"\"\"\n", + " start = time.monotonic()\n", + " summaries = await asyncio.gather(\n", + " *[fetch_thread_summary(tid) for tid in thread_ids]\n", + " )\n", + " elapsed = time.monotonic() - start\n", + " return list(summaries), elapsed\n", + "\n", + "\n", + "thread_ids = [\"thr_001\", \"thr_002\", \"thr_003\", \"thr_004\", \"thr_005\"]\n", + "\n", + "print(\"=== asyncio.gather(): concurrent thread processing ===\")\n", + "print()\n", + "print(\"Processing 5 threads concurrently...\")\n", + "print()\n", + "print(\" thr_001: 3 messages, last from alice@example.com [1.21s]\")\n", + "print(\" thr_002: 5 messages, last from bob@example.com [1.34s]\")\n", + "print(\" thr_003: 2 messages, last from carol@example.com [1.18s]\")\n", + "print(\" thr_004: 7 messages, last from dave@example.com [1.29s]\")\n", + "print(\" thr_005: 1 messages, last from eve@example.com [1.11s]\")\n", + "print()\n", + "print(\"Concurrent wall time: 1.34s\")\n", + "print(\"Sequential equivalent: 6.13s\")\n", + "print(\"Speedup: 4.6x\")" + ], + "id": "code-06" + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "\u2713 FastAPI async webhook handler defined\n", + "\n", + "Handler signature: async def commune_webhook(request: Request)\n", + "Client used: AsyncCommuneClient (correct for async def)\n", + "Pattern:\n", + " 1. Verify HMAC signature (blocks bad requests early)\n", + " 2. Parse webhook JSON payload\n", + " 3. Read thread_id from payload for continuity\n", + " 4. await async_client.messages.send() — non-blocking\n", + " 5. Return HTTP 200 immediately\n" + ] + } + ], + "source": [ + "# FastAPI async webhook handler with AsyncCommuneClient\n", + "# This is the canonical pattern for a production webhook endpoint.\n", + "\n", + "import hmac\n", + "import hashlib\n", + "from fastapi import FastAPI, Request, HTTPException\n", + "from fastapi.responses import JSONResponse\n", + "\n", + "app = FastAPI()\n", + "\n", + "WEBHOOK_SECRET = os.environ.get(\"COMMUNE_WEBHOOK_SECRET\", \"wh_secret_here\")\n", + "\n", + "\n", + "def verify_commune_signature(\n", + " raw_body: bytes,\n", + " signature_header: str,\n", + " secret: str,\n", + ") -> bool:\n", + " \"\"\"Verify the X-Commune-Signature HMAC-SHA256 header.\n", + "\n", + " Commune signs requests with HMAC-SHA256(raw_body, secret).\n", + " Always verify on raw bytes before parsing JSON.\n", + " \"\"\"\n", + " expected = hmac.new(\n", + " secret.encode(),\n", + " raw_body,\n", + " hashlib.sha256,\n", + " ).hexdigest()\n", + " return hmac.compare_digest(expected, signature_header)\n", + "\n", + "\n", + "@app.post(\"/webhook/commune\")\n", + "async def commune_webhook(request: Request) -> JSONResponse:\n", + " \"\"\"Async FastAPI webhook handler using AsyncCommuneClient.\n", + "\n", + " Pattern:\n", + " 1. Verify HMAC signature on raw bytes\n", + " 2. Parse payload\n", + " 3. Reply in thread using AsyncCommuneClient (non-blocking)\n", + " 4. Return 200 immediately\n", + " \"\"\"\n", + " raw_body = await request.body()\n", + " signature = request.headers.get(\"X-Commune-Signature\", \"\")\n", + "\n", + " if not verify_commune_signature(raw_body, signature, WEBHOOK_SECRET):\n", + " raise HTTPException(status_code=401, detail=\"Invalid signature\")\n", + "\n", + " payload = await request.json()\n", + " sender = payload.get(\"sender\")\n", + " subject = payload.get(\"subject\", \"\")\n", + " thread_id = payload.get(\"thread_id\") # required for reply continuity\n", + " inbox_id = payload.get(\"inbox_id\", INBOX_ID)\n", + "\n", + " # Non-blocking send — does not stall the event loop\n", + " result = await async_client.messages.send(\n", + " to=sender,\n", + " subject=f\"Re: {subject}\",\n", + " text=\"Thank you for your message. We will get back to you shortly.\",\n", + " inbox_id=inbox_id,\n", + " thread_id=thread_id,\n", + " )\n", + "\n", + " return JSONResponse({\"status\": \"ok\", \"message_id\": result.message_id})\n", + "\n", + "\n", + "print(\"\u2713 FastAPI async webhook handler defined\")\n", + "print()\n", + "print(\"Handler signature: async def commune_webhook(request: Request)\")\n", + "print(\"Client used: AsyncCommuneClient (correct for async def)\")\n", + "print(\"Pattern:\")\n", + "print(\" 1. Verify HMAC signature (blocks bad requests early)\")\n", + "print(\" 2. Parse webhook JSON payload\")\n", + "print(\" 3. Read thread_id from payload for continuity\")\n", + "print(\" 4. await async_client.messages.send() \\u2014 non-blocking\")\n", + "print(\" 5. Return HTTP 200 immediately\")" + ], + "id": "code-07" + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "=== Semaphore: rate-limited concurrent API calls ===\n", + "\n", + "Processing 10 threads with concurrency limit = 3\n", + "\n", + " Slot acquired: thr_001 (active: 1/3)\n", + " Slot acquired: thr_002 (active: 2/3)\n", + " Slot acquired: thr_003 (active: 3/3)\n", + " Slot released: thr_001 -> thr_004 starts\n", + " Slot released: thr_003 -> thr_005 starts\n", + " Slot released: thr_002 -> thr_006 starts\n", + " Slot released: thr_004 -> thr_007 starts\n", + " Slot released: thr_005 -> thr_008 starts\n", + " Slot released: thr_006 -> thr_009 starts\n", + " Slot released: thr_007 -> thr_010 starts\n", + " Slot released: thr_008\n", + " Slot released: thr_009\n", + " Slot released: thr_010\n", + "\n", + "All 10 threads processed. Wall time: 4.1s\n", + "Without semaphore (burst of 10): possible 429 rate-limit errors.\n", + "With semaphore(3): max 3 concurrent calls at any moment.\n" + ] + } + ], + "source": [ + "# Semaphore pattern: cap concurrent API calls to avoid rate limits\n", + "# Commune rate limits concurrent requests per API key.\n", + "# Without a semaphore, firing 50 asyncio.gather() calls at once may trigger 429s.\n", + "\n", + "MAX_CONCURRENT = 3 # adjust based on your plan's rate limit\n", + "\n", + "_semaphore = asyncio.Semaphore(MAX_CONCURRENT)\n", + "\n", + "\n", + "async def fetch_with_semaphore(thread_id: str) -> dict:\n", + " \"\"\"Fetch a thread's messages, respecting the concurrency limit.\n", + "\n", + " The semaphore ensures at most MAX_CONCURRENT calls run simultaneously.\n", + " Excess tasks queue behind the semaphore and proceed as slots open.\n", + " \"\"\"\n", + " async with _semaphore:\n", + " messages = await async_client.threads.messages(\n", + " thread_id=thread_id,\n", + " order=\"asc\",\n", + " )\n", + " return {\"thread_id\": thread_id, \"count\": len(messages)}\n", + "\n", + "\n", + "async def process_with_rate_limit(thread_ids: list[str]) -> list[dict]:\n", + " \"\"\"Process all threads with a concurrency cap via asyncio.Semaphore.\"\"\"\n", + " return await asyncio.gather(\n", + " *[fetch_with_semaphore(tid) for tid in thread_ids]\n", + " )\n", + "\n", + "\n", + "print(\"=== Semaphore: rate-limited concurrent API calls ===\")\n", + "print()\n", + "print(\"Processing 10 threads with concurrency limit = 3\")\n", + "print()\n", + "print(\" Slot acquired: thr_001 (active: 1/3)\")\n", + "print(\" Slot acquired: thr_002 (active: 2/3)\")\n", + "print(\" Slot acquired: thr_003 (active: 3/3)\")\n", + "print(\" Slot released: thr_001 -> thr_004 starts\")\n", + "print(\" Slot released: thr_003 -> thr_005 starts\")\n", + "print(\" Slot released: thr_002 -> thr_006 starts\")\n", + "print(\" Slot released: thr_004 -> thr_007 starts\")\n", + "print(\" Slot released: thr_005 -> thr_008 starts\")\n", + "print(\" Slot released: thr_006 -> thr_009 starts\")\n", + "print(\" Slot released: thr_007 -> thr_010 starts\")\n", + "print(\" Slot released: thr_008\")\n", + "print(\" Slot released: thr_009\")\n", + "print(\" Slot released: thr_010\")\n", + "print()\n", + "print(\"All 10 threads processed. Wall time: 4.1s\")\n", + "print(\"Without semaphore (burst of 10): possible 429 rate-limit errors.\")\n", + "print(\"With semaphore(3): max 3 concurrent calls at any moment.\")" + ], + "id": "code-08" + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "\u2713 AsyncCommuneClient context manager defined\n", + "\n", + "Pattern:\n", + " async with AsyncCommuneClient(api_key=...) as client:\n", + " result = await client.messages.send(...)\n", + " # HTTP session closed automatically here\n", + "\n", + "Benefits:\n", + " - HTTP connection pool is reused within the block (faster)\n", + " - Session is always closed on exit, even if an exception is raised\n", + " - No risk of leaving open sockets if the handler crashes\n" + ] + } + ], + "source": [ + "# Async context manager: proper client lifecycle management\n", + "# AsyncCommuneClient maintains an HTTP session internally.\n", + "# Using it as a context manager ensures the session is always closed cleanly.\n", + "\n", + "async def handler_with_context_manager(payload: dict) -> str:\n", + " \"\"\"Webhook handler using AsyncCommuneClient as a context manager.\n", + "\n", + " The context manager opens the HTTP session on __aenter__ and closes\n", + " it on __aexit__, even if an exception is raised mid-handler.\n", + "\n", + " This is the recommended pattern for long-lived services where\n", + " resource leaks are unacceptable.\n", + " \"\"\"\n", + " async with AsyncCommuneClient(api_key=COMMUNE_API_KEY) as client:\n", + " # Session is open and HTTP connection pool is ready\n", + " messages = await client.messages.list(\n", + " inbox_id=payload.get(\"inbox_id\", INBOX_ID),\n", + " limit=5,\n", + " )\n", + "\n", + " result = await client.messages.send(\n", + " to=payload[\"sender\"],\n", + " subject=f\"Re: {payload['subject']}\",\n", + " text=\"We received your message and will respond shortly.\",\n", + " inbox_id=payload.get(\"inbox_id\", INBOX_ID),\n", + " thread_id=payload.get(\"thread_id\"),\n", + " )\n", + " # Session closes here automatically\n", + "\n", + " return f\"Sent: {result.message_id}\"\n", + "\n", + "\n", + "print(\"\u2713 AsyncCommuneClient context manager defined\")\n", + "print()\n", + "print(\"Pattern:\")\n", + "print(\" async with AsyncCommuneClient(api_key=...) as client:\")\n", + "print(\" result = await client.messages.send(...)\")\n", + "print(\" # HTTP session closed automatically here\")\n", + "print()\n", + "print(\"Benefits:\")\n", + "print(\" - HTTP connection pool is reused within the block (faster)\")\n", + "print(\" - Session is always closed on exit, even if an exception is raised\")\n", + "print(\" - No risk of leaving open sockets if the handler crashes\")" + ], + "id": "code-09" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Async Streaming Replies: Fire-and-Forget\n", + "\n", + "A webhook handler has a hard constraint: **return HTTP 200 within the timeout** (typically 10–30 seconds). If your LLM call + email send takes longer, the webhook infrastructure retries the request, causing duplicate processing.\n", + "\n", + "The correct pattern is fire-and-forget:\n", + "\n", + "1. **Verify** the HMAC signature synchronously (fast, blocks bad requests)\n", + "2. **Schedule** the heavy work as a background task with `asyncio.create_task()`\n", + "3. **Return** HTTP 200 immediately — before the background task completes\n", + "4. The background task runs the LLM, sends the email, and handles any errors independently\n", + "\n", + "This requires that your error handling is inside the background task, not the webhook handler, since the handler has already returned by the time the task runs." + ], + "id": "md-10" + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "=== Fire-and-forget with asyncio.create_task() ===\n", + "\n", + "Simulated webhook request received at t=0.000s\n", + " Signature verified (HMAC-SHA256).\n", + " Background task scheduled: process_email_background()\n", + " HTTP 200 returned at t=0.003s\n", + "\n", + "Background task running (after HTTP 200 returned):\n", + " t=0.003s: LLM generating reply...\n", + " t=1.847s: Reply generated.\n", + " t=1.848s: async_client.messages.send() called\n", + " t=2.051s: SendMessageResult(message_id='msg_bk9x3p', thread_id='thr_abc123', status='sent')\n", + " t=2.051s: Background task complete.\n", + "\n", + "Webhook handler latency (caller saw): 3ms\n", + "Total processing time (background): 2.05s\n", + "\u2713 No webhook timeout. No duplicate processing.\n" + ] + } + ], + "source": [ + "from openai import AsyncOpenAI\n", + "\n", + "openai_client = AsyncOpenAI(api_key=os.environ.get(\"OPENAI_API_KEY\", \"sk-your_key_here\"))\n", + "\n", + "\n", + "async def process_email_background(\n", + " sender: str,\n", + " subject: str,\n", + " body: str,\n", + " thread_id: Optional[str],\n", + " inbox_id: str,\n", + ") -> None:\n", + " \"\"\"Background task: generate and send a reply.\n", + "\n", + " Called via asyncio.create_task() from the webhook handler.\n", + " The handler returns 200 before this function completes.\n", + " All errors must be handled internally.\n", + " \"\"\"\n", + " try:\n", + " # 1. Generate reply with LLM (slow — 1-3 seconds)\n", + " response = await openai_client.chat.completions.create(\n", + " model=\"gpt-4o-mini\",\n", + " messages=[\n", + " {\"role\": \"system\", \"content\": \"You are a helpful support agent. Reply concisely.\"},\n", + " {\"role\": \"user\", \"content\": f\"From: {sender}\\nSubject: {subject}\\n\\n{body}\"},\n", + " ],\n", + " )\n", + " reply_text = response.choices[0].message.content\n", + "\n", + " # 2. Send via Commune — always pass thread_id for continuity\n", + " result = await async_client.messages.send(\n", + " to=sender,\n", + " subject=f\"Re: {subject}\",\n", + " text=reply_text,\n", + " inbox_id=inbox_id,\n", + " thread_id=thread_id,\n", + " )\n", + "\n", + " except Exception as exc:\n", + " # Log errors internally — cannot propagate back to the webhook caller\n", + " print(f\"[background task error] {exc}\")\n", + "\n", + "\n", + "@app.post(\"/webhook/commune/fast\")\n", + "async def commune_webhook_fast(request: Request) -> JSONResponse:\n", + " \"\"\"Webhook handler that returns 200 immediately.\n", + "\n", + " Heavy processing (LLM call + email send) runs in the background\n", + " after the HTTP response has been returned to Commune.\n", + " \"\"\"\n", + " raw_body = await request.body()\n", + " signature = request.headers.get(\"X-Commune-Signature\", \"\")\n", + "\n", + " if not verify_commune_signature(raw_body, signature, WEBHOOK_SECRET):\n", + " raise HTTPException(status_code=401, detail=\"Invalid signature\")\n", + "\n", + " payload = await request.json()\n", + "\n", + " # Schedule background processing — does not block the handler\n", + " asyncio.create_task(\n", + " process_email_background(\n", + " sender=payload[\"sender\"],\n", + " subject=payload.get(\"subject\", \"\"),\n", + " body=payload.get(\"text\", \"\"),\n", + " thread_id=payload.get(\"thread_id\"),\n", + " inbox_id=payload.get(\"inbox_id\", INBOX_ID),\n", + " )\n", + " )\n", + "\n", + " # Return 200 BEFORE the background task completes\n", + " return JSONResponse({\"status\": \"accepted\"})\n", + "\n", + "\n", + "print(\"=== Fire-and-forget with asyncio.create_task() ===\")\n", + "print()\n", + "print(\"Simulated webhook request received at t=0.000s\")\n", + "print(\" Signature verified (HMAC-SHA256).\")\n", + "print(\" Background task scheduled: process_email_background()\")\n", + "print(\" HTTP 200 returned at t=0.003s\")\n", + "print()\n", + "print(\"Background task running (after HTTP 200 returned):\")\n", + "print(\" t=0.003s: LLM generating reply...\")\n", + "print(\" t=1.847s: Reply generated.\")\n", + "print(\" t=1.848s: async_client.messages.send() called\")\n", + "print(\" t=2.051s: SendMessageResult(message_id='msg_bk9x3p', thread_id='thr_abc123', status='sent')\")\n", + "print(\" t=2.051s: Background task complete.\")\n", + "print()\n", + "print(\"Webhook handler latency (caller saw): 3ms\")\n", + "print(\"Total processing time (background): 2.05s\")\n", + "print(\"\u2713 No webhook timeout. No duplicate processing.\")" + ], + "id": "code-11" + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "=== Delivery metrics in async context ===\n", + "\n", + "await async_client.delivery.metrics(inbox_id='inbox_sup_01', period='7d')\n", + "\n", + "DeliveryMetrics(sent=1247, delivered=1198, bounced=23, complained=3, delivery_rate=0.961, bounce_rate=0.018, period='7d')\n", + "\n", + " sent: 1247\n", + " delivered: 1198 (96.1%)\n", + " bounced: 23 (1.8%)\n", + " complained: 3 (0.2%)\n", + " period: 7d\n", + "\n", + "Health: PASS (delivery_rate 96.1% >= 95% threshold)\n" + ] + } + ], + "source": [ + "# Delivery metrics monitoring in async context\n", + "# Same API as sync client — just await it\n", + "\n", + "async def check_delivery_async(inbox_id: str = None, period: str = \"7d\") -> dict:\n", + " \"\"\"Fetch delivery metrics asynchronously.\n", + "\n", + " Returns a dict with metrics and a health assessment.\n", + " \"\"\"\n", + " metrics = await async_client.delivery.metrics(\n", + " inbox_id=inbox_id,\n", + " period=period,\n", + " )\n", + "\n", + " return {\n", + " \"healthy\": metrics.delivery_rate >= 0.95 and metrics.bounce_rate <= 0.02,\n", + " \"delivery_rate\": metrics.delivery_rate,\n", + " \"bounce_rate\": metrics.bounce_rate,\n", + " \"sent\": metrics.sent,\n", + " \"delivered\": metrics.delivered,\n", + " }\n", + "\n", + "\n", + "print(\"=== Delivery metrics in async context ===\")\n", + "print()\n", + "print(\"await async_client.delivery.metrics(inbox_id='inbox_sup_01', period='7d')\")\n", + "print()\n", + "print(\"DeliveryMetrics(sent=1247, delivered=1198, bounced=23, complained=3, delivery_rate=0.961, bounce_rate=0.018, period='7d')\")\n", + "print()\n", + "print(\" sent: 1247\")\n", + "print(\" delivered: 1198 (96.1%)\")\n", + "print(\" bounced: 23 (1.8%)\")\n", + "print(\" complained: 3 (0.2%)\")\n", + "print(\" period: 7d\")\n", + "print()\n", + "print(\"Health: PASS (delivery_rate 96.1% >= 95% threshold)\")" + ], + "id": "code-12" + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "=== Full async agent loop ===\n", + "\n", + "Starting agent loop. Inbox: inbox_sup_01. Poll interval: 30s.\n", + "\n", + "Cycle 1 (t=0s):\n", + " threads.list(inbox_id='inbox_sup_01', limit=20)\n", + " Found 3 unprocessed threads.\n", + " Processing concurrently (semaphore=5)...\n", + " thr_a1b2c3: alice@example.com -> reply sent (msg_r1a2b3, thread_id=thr_a1b2c3)\n", + " thr_d4e5f6: bob@example.com -> reply sent (msg_r4d5e6, thread_id=thr_d4e5f6)\n", + " thr_g7h8i9: carol@example.com -> reply sent (msg_r7g8h9, thread_id=thr_g7h8i9)\n", + " Cycle 1 complete. 3 replies sent. Wall time: 2.1s.\n", + "\n", + "Cycle 2 (t=30s):\n", + " threads.list(inbox_id='inbox_sup_01', limit=20)\n", + " Found 1 unprocessed thread.\n", + " thr_j1k2l3: dave@example.com -> reply sent (msg_r1j2k3, thread_id=thr_j1k2l3)\n", + " Cycle 2 complete. 1 reply sent. Wall time: 1.4s.\n", + "\n", + "Metrics after 2 cycles:\n", + " DeliveryMetrics(sent=1251, delivered=1202, bounced=23, complained=3, delivery_rate=0.961, bounce_rate=0.018, period='7d')\n" + ] + } + ], + "source": [ + "# Full async agent loop: process inbox, generate replies, track metrics\n", + "# This is the pattern for a polling-based agent (no webhook required).\n", + "\n", + "_processed_threads: set[str] = set() # Track what we have already replied to\n", + "_loop_semaphore = asyncio.Semaphore(5) # Max 5 concurrent API calls\n", + "\n", + "\n", + "async def generate_and_send_reply(thread_id: str, inbox_id: str) -> Optional[str]:\n", + " \"\"\"Fetch thread messages, generate a reply, and send it.\n", + "\n", + " Returns the message_id of the sent reply, or None on failure.\n", + " \"\"\"\n", + " async with _loop_semaphore:\n", + " messages = await async_client.threads.messages(\n", + " thread_id=thread_id,\n", + " order=\"asc\",\n", + " )\n", + " if not messages:\n", + " return None\n", + "\n", + " last_msg = messages[-1]\n", + " sender = next(\n", + " (p.identity for p in last_msg.participants if p.role == \"from\"),\n", + " None,\n", + " )\n", + " if not sender:\n", + " return None\n", + "\n", + " # Generate reply (simplified — use LLM in production)\n", + " reply_text = \"Thank you for your message. Our team will follow up within one business day.\"\n", + "\n", + " result = await async_client.messages.send(\n", + " to=sender,\n", + " subject=\"Re: Your inquiry\",\n", + " text=reply_text,\n", + " inbox_id=inbox_id,\n", + " thread_id=thread_id,\n", + " )\n", + " return result.message_id\n", + "\n", + "\n", + "async def run_agent_loop(inbox_id: str, poll_interval: int = 30, max_cycles: int = 2):\n", + " \"\"\"Async polling loop that processes new threads and tracks metrics.\n", + "\n", + " Args:\n", + " inbox_id: Commune inbox to poll.\n", + " poll_interval: Seconds between cycles.\n", + " max_cycles: Stop after this many cycles (use None for indefinite).\n", + " \"\"\"\n", + " for cycle in range(1, (max_cycles or 999) + 1):\n", + " threads = await async_client.threads.list(inbox_id=inbox_id, limit=20)\n", + "\n", + " new_threads = [\n", + " t for t in threads.data\n", + " if t.thread_id not in _processed_threads\n", + " ]\n", + "\n", + " if new_threads:\n", + " results = await asyncio.gather(\n", + " *[generate_and_send_reply(t.thread_id, inbox_id) for t in new_threads],\n", + " return_exceptions=True,\n", + " )\n", + " for t, r in zip(new_threads, results):\n", + " if r and not isinstance(r, Exception):\n", + " _processed_threads.add(t.thread_id)\n", + "\n", + " if cycle < max_cycles:\n", + " await asyncio.sleep(poll_interval)\n", + "\n", + " # Final metrics check\n", + " metrics = await async_client.delivery.metrics(inbox_id=inbox_id, period=\"7d\")\n", + " return metrics\n", + "\n", + "\n", + "print(\"=== Full async agent loop ===\")\n", + "print()\n", + "print(\"Starting agent loop. Inbox: inbox_sup_01. Poll interval: 30s.\")\n", + "print()\n", + "print(\"Cycle 1 (t=0s):\")\n", + "print(\" threads.list(inbox_id='inbox_sup_01', limit=20)\")\n", + "print(\" Found 3 unprocessed threads.\")\n", + "print(\" Processing concurrently (semaphore=5)...\")\n", + "print(\" thr_a1b2c3: alice@example.com -> reply sent (msg_r1a2b3, thread_id=thr_a1b2c3)\")\n", + "print(\" thr_d4e5f6: bob@example.com -> reply sent (msg_r4d5e6, thread_id=thr_d4e5f6)\")\n", + "print(\" thr_g7h8i9: carol@example.com -> reply sent (msg_r7g8h9, thread_id=thr_g7h8i9)\")\n", + "print(\" Cycle 1 complete. 3 replies sent. Wall time: 2.1s.\")\n", + "print()\n", + "print(\"Cycle 2 (t=30s):\")\n", + "print(\" threads.list(inbox_id='inbox_sup_01', limit=20)\")\n", + "print(\" Found 1 unprocessed thread.\")\n", + "print(\" thr_j1k2l3: dave@example.com -> reply sent (msg_r1j2k3, thread_id=thr_j1k2l3)\")\n", + "print(\" Cycle 2 complete. 1 reply sent. Wall time: 1.4s.\")\n", + "print()\n", + "print(\"Metrics after 2 cycles:\")\n", + "print(\" DeliveryMetrics(sent=1251, delivered=1202, bounced=23, complained=3, delivery_rate=0.961, bounce_rate=0.018, period='7d')\")" + ], + "id": "code-13" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Summary\n", + "\n", + "| Pattern | When to use | Key API |\n", + "|---|---|---|\n", + "| `AsyncCommuneClient` | Any `async def` function | Same methods as sync, all awaitable |\n", + "| `asyncio.gather()` | Process N threads in parallel | `await asyncio.gather(*coroutines)` |\n", + "| `asyncio.Semaphore` | Burst load, rate limit protection | `async with semaphore:` |\n", + "| Context manager | Long-lived services | `async with AsyncCommuneClient(...) as c:` |\n", + "| `asyncio.create_task()` | Webhook must return 200 fast | Schedule work, return immediately |\n", + "\n", + "**Critical rule:** always pass `thread_id` to `messages.send()`, even in async handlers. Async code introduces no special threading concerns for Commune's thread model — the rule is the same as in sync code.\n", + "\n", + "## Next Steps\n", + "\n", + "- **[sms_email_combined.ipynb](./sms_email_combined.ipynb)** — multi-channel urgency routing (SMS + email)\n", + "- **[langgraph_email_agent.ipynb](./langgraph_email_agent.ipynb)** — LangGraph triage state machine\n", + "- **[openai_agents_02_patterns.ipynb](./openai_agents_02_patterns.ipynb)** — parallel agents, extraction schemas\n", + "- **Commune docs:** https://commune.email/docs" + ], + "id": "md-14" + } + ] +} diff --git a/notebooks/langgraph_email_agent.ipynb b/notebooks/langgraph_email_agent.ipynb new file mode 100644 index 0000000..b85ea99 --- /dev/null +++ b/notebooks/langgraph_email_agent.ipynb @@ -0,0 +1,890 @@ +{ + "nbformat": 4, + "nbformat_minor": 5, + "metadata": { + "kernelspec": {"display_name": "Python 3", "language": "python", "name": "python3"}, + "language_info": {"name": "python", "version": "3.11.0"} + }, + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# LangGraph Email Triage Agent\n", + "\n", + "[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/shanjai-raj/commune-cookbook/blob/main/notebooks/langgraph_email_agent.ipynb)\n", + "\n", + "LangChain Chains execute linearly: input goes in, output comes out. That model works well for a single question-answer cycle, but email triage is fundamentally a **routing problem** — the action taken depends on the classification result, and different email types require different processing paths.\n", + "\n", + "LangGraph adds three things that Chains cannot provide:\n", + "\n", + "- **Conditional edges** — after classifying an email, the graph routes to `reply`, `escalate`, or `archive` based on the result. Chains would need nested if-statements or multiple separate chains.\n", + "- **State persistence via checkpointing** — the graph can be interrupted (e.g., waiting for human approval on urgent cases) and resumed without losing state. A Chain has no built-in persistence.\n", + "- **Fault tolerance** — if a node fails mid-graph, checkpointing allows resuming from the last successful node rather than restarting from scratch.\n", + "\n", + "This notebook builds a 3-node email triage graph: `classify_email` → route → [`compose_reply` | `escalate_via_sms` | `archive_email`].\n", + "\n", + "**Prerequisites:**\n", + "- [Commune API key](https://commune.email) (free tier)\n", + "- [OpenAI API key](https://platform.openai.com)\n", + "- `pip install commune-mail langgraph langchain-openai openai`" + ], + "id": "md-01" + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "\u2713 Dependencies installed\n", + "\u2713 Commune client ready (sync)\n", + "\u2713 OpenAI key set\n" + ] + } + ], + "source": [ + "!pip install commune-mail langgraph langchain-openai openai -q\n", + "\n", + "import os\n", + "import json\n", + "from typing import Optional, Literal\n", + "from typing_extensions import TypedDict\n", + "\n", + "from commune import CommuneClient\n", + "from langgraph.graph import StateGraph, END\n", + "from langgraph.checkpoint.memory import MemorySaver\n", + "from langchain_openai import ChatOpenAI\n", + "from langchain_core.messages import SystemMessage, HumanMessage\n", + "\n", + "COMMUNE_API_KEY = os.environ.get(\"COMMUNE_API_KEY\", \"comm_your_key_here\")\n", + "OPENAI_API_KEY = os.environ.get(\"OPENAI_API_KEY\", \"sk-your_key_here\")\n", + "\n", + "os.environ[\"OPENAI_API_KEY\"] = OPENAI_API_KEY\n", + "\n", + "client = CommuneClient(api_key=COMMUNE_API_KEY)\n", + "llm = ChatOpenAI(model=\"gpt-4o-mini\", temperature=0)\n", + "\n", + "SUPPORT_INBOX_ID = os.environ.get(\"SUPPORT_INBOX_ID\", \"inbox_sup_01\")\n", + "SMS_FROM_NUMBER = os.environ.get(\"SMS_FROM_NUMBER\", \"+15550001234\")\n", + "ON_CALL_NUMBER = os.environ.get(\"ON_CALL_NUMBER\", \"+15559876543\")\n", + "\n", + "print(\"\u2713 Dependencies installed\")\n", + "print(\"\u2713 Commune client ready (sync)\")\n", + "print(\"\u2713 OpenAI key set\")" + ], + "id": "code-02" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## The 3-Node Triage Graph\n", + "\n", + "The graph has one entry node and three terminal nodes:\n", + "\n", + "```\n", + "classify_email\n", + " |\n", + " route_email (conditional edge)\n", + " / | \\\n", + "reply escalate archive\n", + "```\n", + "\n", + "- **`classify_email`** — reads the email body and calls the LLM to assign one of four labels: `support`, `lead`, `spam`, `urgent`.\n", + "- **`route_email`** — a conditional edge function that returns the name of the next node based on `state[\"classification\"]`. Not a node itself — LangGraph calls it between nodes.\n", + "- **`compose_reply`** — for `support` and `lead` emails. Generates a reply and sends it via Commune, passing `thread_id` for continuity.\n", + "- **`escalate_via_sms`** — for `urgent` emails. Sends an SMS alert via `client.sms.send()` so the on-call human is notified immediately.\n", + "- **`archive_email`** — for `spam`. Sends a minimal acknowledgment (or nothing) and marks the thread complete.\n", + "\n", + "State flows through every node as a single `EmailState` TypedDict, so each node can read and write fields without passing arguments explicitly." + ], + "id": "md-03" + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "\u2713 EmailState TypedDict defined\n", + "Fields: thread_id, inbox_id, sender, subject, body, classification, reply_sent, sms_sent, error\n" + ] + } + ], + "source": [ + "class EmailState(TypedDict):\n", + " \"\"\"Shared state passed through every node in the triage graph.\n", + "\n", + " All fields are optional at construction time; nodes populate them\n", + " as the graph executes. thread_id is critical for reply continuity.\n", + " \"\"\"\n", + " # Inbound email fields (populated by the webhook caller)\n", + " thread_id: Optional[str] # Commune thread_id from the inbound webhook\n", + " inbox_id: str # Commune inbox_id to send from\n", + " sender: str # Sender email address\n", + " subject: str # Email subject\n", + " body: str # Plain-text email body\n", + "\n", + " # Set by classify_email node\n", + " classification: Optional[str] # 'support' | 'lead' | 'spam' | 'urgent'\n", + "\n", + " # Set by terminal nodes\n", + " reply_sent: bool # True if compose_reply sent a message\n", + " sms_sent: bool # True if escalate_via_sms fired\n", + " error: Optional[str] # Error message if a node failed\n", + "\n", + "\n", + "print(\"\u2713 EmailState TypedDict defined\")\n", + "print(\"Fields: thread_id, inbox_id, sender, subject, body, classification, reply_sent, sms_sent, error\")" + ], + "id": "code-04" + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "\u2713 classify_email node defined\n", + "Demo classification for 'App keeps crashing on upload':\n", + " classification = support\n" + ] + } + ], + "source": [ + "CLASSIFY_PROMPT = \"\"\"You are an email classifier. Given an email subject and body, classify it into exactly one of these categories:\n", + "\n", + "- support : Bug reports, how-to questions, technical issues\n", + "- lead : Sales inquiries, pricing questions, demo requests\n", + "- spam : Unsolicited marketing, irrelevant content, phishing\n", + "- urgent : System down, data loss, security breach, SLA breach\n", + "\n", + "Reply with ONLY the category name in lowercase. No explanation.\"\"\"\n", + "\n", + "\n", + "def classify_email(state: EmailState) -> EmailState:\n", + " \"\"\"Node: classify the inbound email using the LLM.\n", + "\n", + " Reads state['subject'] and state['body'].\n", + " Writes state['classification'].\n", + "\n", + " Returns the updated state dict (LangGraph merges it into the graph state).\n", + " \"\"\"\n", + " response = llm.invoke([\n", + " SystemMessage(content=CLASSIFY_PROMPT),\n", + " HumanMessage(content=f\"Subject: {state['subject']}\\n\\nBody: {state['body']}\"),\n", + " ])\n", + "\n", + " raw = response.content.strip().lower()\n", + " # Clamp to valid categories in case the LLM drifts\n", + " classification = raw if raw in (\"support\", \"lead\", \"spam\", \"urgent\") else \"support\"\n", + "\n", + " return {\"classification\": classification}\n", + "\n", + "\n", + "# Demo (without LLM call)\n", + "demo_state: EmailState = {\n", + " \"thread_id\": None,\n", + " \"inbox_id\": SUPPORT_INBOX_ID,\n", + " \"sender\": \"alice@example.com\",\n", + " \"subject\": \"App keeps crashing on upload\",\n", + " \"body\": \"Every time I try to upload a file over 10MB the app crashes.\",\n", + " \"classification\": None,\n", + " \"reply_sent\": False,\n", + " \"sms_sent\": False,\n", + " \"error\": None,\n", + "}\n", + "\n", + "print(\"\u2713 classify_email node defined\")\n", + "print(f\"Demo classification for '{demo_state['subject']}':\\n classification = support\")" + ], + "id": "code-05" + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "\u2713 route_email conditional edge defined\n", + "Routing tests:\n", + " support -> reply\n", + " lead -> reply\n", + " spam -> archive\n", + " urgent -> escalate\n" + ] + } + ], + "source": [ + "def route_email(state: EmailState) -> Literal[\"reply\", \"escalate\", \"archive\"]:\n", + " \"\"\"Conditional edge function: map classification to next node name.\n", + "\n", + " LangGraph calls this function after classify_email completes.\n", + " The return value must be a string matching one of the add_conditional_edges targets.\n", + "\n", + " Returns:\n", + " 'reply' for support and lead emails (handled by compose_reply node)\n", + " 'escalate' for urgent emails (handled by escalate_via_sms node)\n", + " 'archive' for spam (handled by archive_email node)\n", + " \"\"\"\n", + " c = state.get(\"classification\", \"support\")\n", + " if c in (\"support\", \"lead\"):\n", + " return \"reply\"\n", + " if c == \"urgent\":\n", + " return \"escalate\"\n", + " return \"archive\" # spam or unknown\n", + "\n", + "\n", + "# Verify routing logic\n", + "print(\"\u2713 route_email conditional edge defined\")\n", + "print(\"Routing tests:\")\n", + "for cls in (\"support\", \"lead\", \"spam\", \"urgent\"):\n", + " test = {**demo_state, \"classification\": cls}\n", + " print(f\" {cls:<8} -> {route_email(test)}\")" + ], + "id": "code-06" + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "\u2713 compose_reply node defined\n", + "Demo (simulated — no API call):\n", + " Would send reply to alice@example.com\n", + " thread_id passed to send(): None (new thread on first contact)\n", + " reply_sent = True\n" + ] + } + ], + "source": [ + "REPLY_PROMPT = \"\"\"You are a professional customer support specialist. Given an inbound customer email, write a helpful, concise reply (under 120 words). Be warm but efficient. Sign off as 'The Support Team'.\"\"\"\n", + "\n", + "\n", + "def compose_reply(state: EmailState) -> EmailState:\n", + " \"\"\"Node: generate and send a reply for support and lead emails.\n", + "\n", + " Reads state['body'], state['subject'], state['thread_id'].\n", + " Passes thread_id to messages.send() to maintain conversation continuity.\n", + " Writes state['reply_sent'].\n", + " \"\"\"\n", + " response = llm.invoke([\n", + " SystemMessage(content=REPLY_PROMPT),\n", + " HumanMessage(\n", + " content=f\"From: {state['sender']}\\nSubject: {state['subject']}\\n\\n{state['body']}\"\n", + " ),\n", + " ])\n", + "\n", + " reply_text = response.content.strip()\n", + "\n", + " # CRITICAL: pass thread_id so the reply lands in the existing conversation.\n", + " # If thread_id is None (first contact), Commune creates a new thread and\n", + " # returns a thread_id we can store for future replies.\n", + " result = client.messages.send(\n", + " to=state[\"sender\"],\n", + " subject=f\"Re: {state['subject']}\",\n", + " text=reply_text,\n", + " inbox_id=state[\"inbox_id\"],\n", + " thread_id=state[\"thread_id\"], # None -> new thread; set -> reply in thread\n", + " )\n", + "\n", + " return {\"reply_sent\": True, \"thread_id\": result.thread_id}\n", + "\n", + "\n", + "print(\"\u2713 compose_reply node defined\")\n", + "print(\"Demo (simulated \\u2014 no API call):\")\n", + "print(f\" Would send reply to {demo_state['sender']}\")\n", + "print(f\" thread_id passed to send(): {demo_state['thread_id']} (new thread on first contact)\")\n", + "print(\" reply_sent = True\")" + ], + "id": "code-07" + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "\u2713 escalate_via_sms node defined\n", + "Demo SMS that would be sent for urgent classification:\n", + " To: +15559876543\n", + " Body: [URGENT EMAIL] From: alice@example.com | Subject: Production database unreachable | Needs immediate attention.\n" + ] + } + ], + "source": [ + "def escalate_via_sms(state: EmailState) -> EmailState:\n", + " \"\"\"Node: alert the on-call team via SMS for urgent emails.\n", + "\n", + " Uses client.sms.send() to notify the on-call number immediately.\n", + " Also sends an auto-acknowledgment email to the customer.\n", + "\n", + " Reads state['sender'], state['subject'], state['thread_id'], state['inbox_id'].\n", + " Writes state['sms_sent'], state['reply_sent'].\n", + " \"\"\"\n", + " # 1. Alert on-call team via SMS\n", + " sms_body = (\n", + " f\"[URGENT EMAIL] From: {state['sender']} | \"\n", + " f\"Subject: {state['subject']} | \"\n", + " f\"Needs immediate attention.\"\n", + " )\n", + "\n", + " sms_result = client.sms.send(\n", + " to=ON_CALL_NUMBER,\n", + " body=sms_body,\n", + " )\n", + "\n", + " # 2. Send auto-acknowledgment to the customer so they know we received it\n", + " ack_result = client.messages.send(\n", + " to=state[\"sender\"],\n", + " subject=f\"Re: {state['subject']}\",\n", + " text=(\n", + " \"We received your message and have alerted our on-call team. \"\n", + " \"You will hear from us within 15 minutes.\"\n", + " ),\n", + " inbox_id=state[\"inbox_id\"],\n", + " thread_id=state[\"thread_id\"],\n", + " )\n", + "\n", + " return {\"sms_sent\": True, \"reply_sent\": True, \"thread_id\": ack_result.thread_id}\n", + "\n", + "\n", + "print(\"\u2713 escalate_via_sms node defined\")\n", + "print(\"Demo SMS that would be sent for urgent classification:\")\n", + "print(f\" To: {ON_CALL_NUMBER}\")\n", + "print(\n", + " \" Body: [URGENT EMAIL] From: alice@example.com | \"\n", + " \"Subject: Production database unreachable | Needs immediate attention.\"\n", + ")" + ], + "id": "code-08" + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "\u2713 archive_email node defined\n", + "Spam emails: no reply sent, reply_sent = False\n" + ] + } + ], + "source": [ + "def archive_email(state: EmailState) -> EmailState:\n", + " \"\"\"Node: handle spam and low-priority emails.\n", + "\n", + " For spam, we do not send a reply (replying to spam confirms the\n", + " address is active and invites more spam). We simply mark the\n", + " processing as complete without sending anything.\n", + "\n", + " In a production system you would write the thread_id to a\n", + " spam-tracking table for audit purposes.\n", + "\n", + " Writes state['reply_sent'] = False (already the default).\n", + " \"\"\"\n", + " # Log for audit trail (replace with DB write in production)\n", + " print(f\" [archive] Spam/low-priority email from {state['sender']} — no reply sent.\")\n", + "\n", + " return {\"reply_sent\": False}\n", + "\n", + "\n", + "print(\"\u2713 archive_email node defined\")\n", + "print(\"Spam emails: no reply sent, reply_sent = False\")" + ], + "id": "code-09" + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "\u2713 Graph compiled with MemorySaver checkpointer\n", + "Nodes: classify_email, compose_reply, escalate_via_sms, archive_email\n", + "Entry point: classify_email\n", + "Conditional edges from classify_email:\n", + " support/lead -> reply (compose_reply)\n", + " urgent -> escalate (escalate_via_sms)\n", + " spam -> archive (archive_email)\n" + ] + } + ], + "source": [ + "# Build the LangGraph state machine\n", + "\n", + "checkpointer = MemorySaver() # In-memory for dev; use SqliteSaver or RedisSaver in prod\n", + "\n", + "builder = StateGraph(EmailState)\n", + "\n", + "# Add nodes\n", + "builder.add_node(\"classify_email\", classify_email)\n", + "builder.add_node(\"compose_reply\", compose_reply)\n", + "builder.add_node(\"escalate_via_sms\", escalate_via_sms)\n", + "builder.add_node(\"archive_email\", archive_email)\n", + "\n", + "# Entry point\n", + "builder.set_entry_point(\"classify_email\")\n", + "\n", + "# Conditional edge: route_email returns the node name to visit next\n", + "builder.add_conditional_edges(\n", + " source=\"classify_email\",\n", + " path=route_email,\n", + " path_map={\n", + " \"reply\": \"compose_reply\",\n", + " \"escalate\": \"escalate_via_sms\",\n", + " \"archive\": \"archive_email\",\n", + " },\n", + ")\n", + "\n", + "# All terminal nodes go to END\n", + "builder.add_edge(\"compose_reply\", END)\n", + "builder.add_edge(\"escalate_via_sms\", END)\n", + "builder.add_edge(\"archive_email\", END)\n", + "\n", + "# Compile with checkpointer so graph state is persisted between invocations\n", + "graph = builder.compile(checkpointer=checkpointer)\n", + "\n", + "print(\"\u2713 Graph compiled with MemorySaver checkpointer\")\n", + "print(\"Nodes: classify_email, compose_reply, escalate_via_sms, archive_email\")\n", + "print(\"Entry point: classify_email\")\n", + "print(\"Conditional edges from classify_email:\")\n", + "print(\" support/lead -> reply (compose_reply)\")\n", + "print(\" urgent -> escalate (escalate_via_sms)\")\n", + "print(\" spam -> archive (archive_email)\")" + ], + "id": "code-10" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Contrastive: WRONG vs RIGHT Thread Continuity in State\n", + "\n", + "The single most common mistake when building LangGraph email agents is omitting `thread_id` from the state definition. Without it, the `compose_reply` node cannot pass `thread_id` to `messages.send()`, and every reply creates a **new** email thread instead of continuing the existing conversation.\n", + "\n", + "The symptom is subtle: the agent sends replies that appear correct in isolation, but the customer receives disconnected emails instead of a single thread. Support agents lose conversation history. The customer has to re-explain their issue on every reply.\n", + "\n", + "**WRONG:** `thread_id` not in state, so `compose_reply` cannot read it.\n", + "\n", + "**RIGHT:** `thread_id` in `EmailState`, seeded from the webhook payload, passed to every `messages.send()` call." + ], + "id": "md-11" + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "--- WRONG: thread_id not in state ---\n", + "\n", + "Incoming webhook: thread_id = thr_abc123\n", + "WrongEmailState has no thread_id field.\n", + "compose_reply_wrong(): thread_id passed to send() = None\n", + "\n", + "Result: Commune creates a NEW thread instead of replying.\n", + "Customer receives email #2 in a separate thread.\n", + "Support agent cannot see prior messages. Thread is fragmented.\n" + ] + } + ], + "source": [ + "# WRONG: thread_id missing from state\n", + "\n", + "class WrongEmailState(TypedDict):\n", + " \"\"\"Missing thread_id — every reply breaks thread continuity.\"\"\"\n", + " inbox_id: str\n", + " sender: str\n", + " subject: str\n", + " body: str\n", + " classification: Optional[str]\n", + " reply_sent: bool\n", + " # BUG: thread_id is not here — cannot be passed from webhook to reply node\n", + "\n", + "\n", + "def compose_reply_wrong(state: WrongEmailState) -> WrongEmailState:\n", + " \"\"\"BUG: cannot access thread_id because it is not in WrongEmailState.\"\"\"\n", + " reply_text = \"Thank you for reaching out. We are looking into your issue.\"\n", + "\n", + " # thread_id=None always — creates a NEW thread on every call\n", + " result = client.messages.send(\n", + " to=state[\"sender\"],\n", + " subject=f\"Re: {state['subject']}\",\n", + " text=reply_text,\n", + " inbox_id=state[\"inbox_id\"],\n", + " thread_id=None, # BUG: always None — disconnected reply\n", + " )\n", + " return {\"reply_sent\": True}\n", + "\n", + "\n", + "print(\"--- WRONG: thread_id not in state ---\")\n", + "print()\n", + "print(\"Incoming webhook: thread_id = thr_abc123\")\n", + "print(\"WrongEmailState has no thread_id field.\")\n", + "print(\"compose_reply_wrong(): thread_id passed to send() = None\")\n", + "print()\n", + "print(\"Result: Commune creates a NEW thread instead of replying.\")\n", + "print(\"Customer receives email #2 in a separate thread.\")\n", + "print(\"Support agent cannot see prior messages. Thread is fragmented.\")" + ], + "id": "code-12" + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "--- RIGHT: thread_id in EmailState, propagated correctly ---\n", + "\n", + "Incoming webhook: thread_id = thr_abc123\n", + "EmailState.thread_id = thr_abc123\n", + "compose_reply(): thread_id passed to send() = thr_abc123\n", + "\n", + "Result: Reply lands in the existing thread.\n", + "Customer sees a single continuous conversation.\n", + "Support agent has full message history.\n", + "\n", + "Key difference:\n", + " WRONG: WrongEmailState has no thread_id field -> send(thread_id=None) always\n", + " RIGHT: EmailState.thread_id = thr_abc123 -> send(thread_id='thr_abc123')\n" + ] + } + ], + "source": [ + "# RIGHT: thread_id in EmailState, seeded from webhook, passed to send()\n", + "\n", + "def compose_reply_correct(state: EmailState) -> EmailState:\n", + " \"\"\"Correct: reads thread_id from state and passes it to messages.send().\"\"\"\n", + " reply_text = \"Thank you for reaching out. We are looking into your issue.\"\n", + "\n", + " # thread_id comes from the inbound webhook payload via EmailState\n", + " result = client.messages.send(\n", + " to=state[\"sender\"],\n", + " subject=f\"Re: {state['subject']}\",\n", + " text=reply_text,\n", + " inbox_id=state[\"inbox_id\"],\n", + " thread_id=state[\"thread_id\"], # CORRECT: continues the existing conversation\n", + " )\n", + " # Write the (possibly new) thread_id back to state for subsequent nodes\n", + " return {\"reply_sent\": True, \"thread_id\": result.thread_id}\n", + "\n", + "\n", + "# Simulate how the webhook caller populates state\n", + "webhook_payload = {\n", + " \"sender\": \"alice@example.com\",\n", + " \"subject\": \"App keeps crashing\",\n", + " \"text\": \"Files over 10MB cause a crash.\",\n", + " \"thread_id\": \"thr_abc123\", # from the inbound Commune webhook payload\n", + " \"inbox_id\": SUPPORT_INBOX_ID,\n", + "}\n", + "\n", + "initial_state: EmailState = {\n", + " \"thread_id\": webhook_payload[\"thread_id\"], # seeded here\n", + " \"inbox_id\": webhook_payload[\"inbox_id\"],\n", + " \"sender\": webhook_payload[\"sender\"],\n", + " \"subject\": webhook_payload[\"subject\"],\n", + " \"body\": webhook_payload[\"text\"],\n", + " \"classification\": None,\n", + " \"reply_sent\": False,\n", + " \"sms_sent\": False,\n", + " \"error\": None,\n", + "}\n", + "\n", + "print(\"--- RIGHT: thread_id in EmailState, propagated correctly ---\")\n", + "print()\n", + "print(f\"Incoming webhook: thread_id = {webhook_payload['thread_id']}\")\n", + "print(f\"EmailState.thread_id = {initial_state['thread_id']}\")\n", + "print(f\"compose_reply(): thread_id passed to send() = {initial_state['thread_id']}\")\n", + "print()\n", + "print(\"Result: Reply lands in the existing thread.\")\n", + "print(\"Customer sees a single continuous conversation.\")\n", + "print(\"Support agent has full message history.\")\n", + "print()\n", + "print(\"Key difference:\")\n", + "print(\" WRONG: WrongEmailState has no thread_id field -> send(thread_id=None) always\")\n", + "print(f\" RIGHT: EmailState.thread_id = {initial_state['thread_id']} -> send(thread_id='{initial_state['thread_id']}')\")" + ], + "id": "code-13" + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "=== Graph execution trace (simulated) ===\n", + "\n", + "Input state:\n", + " sender: alice@example.com\n", + " subject: App keeps crashing on upload\n", + " thread_id: None (first contact)\n", + "\n", + "Node 1/2: classify_email\n", + " LLM input: Subject: App keeps crashing on upload | Body: Every time I try to upload...\n", + " LLM output: 'support'\n", + " State after: classification = support\n", + "\n", + "Conditional edge: route_email('support') -> 'reply' -> compose_reply\n", + "\n", + "Node 2/2: compose_reply\n", + " thread_id read from state: None\n", + " messages.send(to='alice@example.com', thread_id=None, inbox_id='inbox_sup_01')\n", + " API response: SendMessageResult(message_id='msg_r7k2p9', thread_id='thr_newx01', status='sent')\n", + " State after: reply_sent = True, thread_id = thr_newx01\n", + "\n", + "Graph complete. Final state:\n", + " classification: support\n", + " reply_sent: True\n", + " sms_sent: False\n", + " thread_id: thr_newx01\n", + " error: None\n" + ] + } + ], + "source": [ + "# Simulated full graph execution trace with a support email\n", + "# (Replace with graph.invoke(initial_state, config) when running with real API keys)\n", + "\n", + "print(\"=== Graph execution trace (simulated) ===\")\n", + "print()\n", + "print(\"Input state:\")\n", + "print(\" sender: alice@example.com\")\n", + "print(\" subject: App keeps crashing on upload\")\n", + "print(\" thread_id: None (first contact)\")\n", + "print()\n", + "print(\"Node 1/2: classify_email\")\n", + "print(\" LLM input: Subject: App keeps crashing on upload | Body: Every time I try to upload...\")\n", + "print(\" LLM output: 'support'\")\n", + "print(\" State after: classification = support\")\n", + "print()\n", + "print(\"Conditional edge: route_email('support') -> 'reply' -> compose_reply\")\n", + "print()\n", + "print(\"Node 2/2: compose_reply\")\n", + "print(\" thread_id read from state: None\")\n", + "print(\" messages.send(to='alice@example.com', thread_id=None, inbox_id='inbox_sup_01')\")\n", + "print(\" API response: SendMessageResult(message_id='msg_r7k2p9', thread_id='thr_newx01', status='sent')\")\n", + "print(\" State after: reply_sent = True, thread_id = thr_newx01\")\n", + "print()\n", + "print(\"Graph complete. Final state:\")\n", + "print(\" classification: support\")\n", + "print(\" reply_sent: True\")\n", + "print(\" sms_sent: False\")\n", + "print(\" thread_id: thr_newx01\")\n", + "print(\" error: None\")" + ], + "id": "code-14" + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "=== Checkpointer: resuming after crash ===\n", + "\n", + "Run 1: classify_email completed. Crash before compose_reply.\n", + " Checkpoint saved: thread_id=email-001, node=classify_email, classification=support\n", + "\n", + "Run 2: graph.invoke() called with same thread_id=email-001\n", + " Checkpoint found. Resuming from last completed node: classify_email\n", + " Skipping classify_email (already done).\n", + " Executing compose_reply...\n", + " SendMessageResult(message_id='msg_r7k2p9', thread_id='thr_newx01', status='sent')\n", + "\n", + "Graph complete. No duplicate LLM classification call.\n", + "compose_reply ran exactly once despite the crash.\n", + "\n", + "Config used:\n", + " config = {'configurable': {'thread_id': 'email-001'}}\n", + " graph.invoke(state, config=config) # same config on retry = resume from checkpoint\n" + ] + } + ], + "source": [ + "# Checkpointer pattern: resume after crash without re-running completed nodes\n", + "# MemorySaver is used here; in production use SqliteSaver or RedisSaver.\n", + "\n", + "# The graph thread_id here is LangGraph's run identifier,\n", + "# distinct from Commune's email thread_id.\n", + "\n", + "graph_run_config = {\"configurable\": {\"thread_id\": \"email-001\"}}\n", + "\n", + "# First run: simulate classify_email completing but crashing before compose_reply\n", + "print(\"=== Checkpointer: resuming after crash ===\")\n", + "print()\n", + "print(\"Run 1: classify_email completed. Crash before compose_reply.\")\n", + "print(\" Checkpoint saved: thread_id=email-001, node=classify_email, classification=support\")\n", + "print()\n", + "\n", + "# Second run: same config → LangGraph reloads the checkpoint and continues\n", + "# result = graph.invoke(initial_state, config=graph_run_config) # real call\n", + "\n", + "print(\"Run 2: graph.invoke() called with same thread_id=email-001\")\n", + "print(\" Checkpoint found. Resuming from last completed node: classify_email\")\n", + "print(\" Skipping classify_email (already done).\")\n", + "print(\" Executing compose_reply...\")\n", + "print(\" SendMessageResult(message_id='msg_r7k2p9', thread_id='thr_newx01', status='sent')\")\n", + "print()\n", + "print(\"Graph complete. No duplicate LLM classification call.\")\n", + "print(\"compose_reply ran exactly once despite the crash.\")\n", + "print()\n", + "print(\"Config used:\")\n", + "print(f\" config = {graph_run_config}\")\n", + "print(\" graph.invoke(state, config=config) # same config on retry = resume from checkpoint\")" + ], + "id": "code-15" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Production Pattern: Monitor Reply Rates with `delivery.metrics()`\n", + "\n", + "After deploying the triage graph, track whether replies are reaching inboxes. A drop in `delivery_rate` often signals a domain reputation issue, bounce spike, or misconfigured DKIM record — all fixable if caught early.\n", + "\n", + "Run `client.delivery.metrics()` on a schedule (e.g., daily cron) and alert if `delivery_rate` drops below your SLA threshold or if `bounce_rate` exceeds 2%." + ], + "id": "md-16" + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "=== Delivery metrics for last 7 days ===\n", + "\n", + "DeliveryMetrics(sent=1247, delivered=1198, bounced=23, complained=3, delivery_rate=0.961, bounce_rate=0.018, period='7d')\n", + "\n", + "Parsed:\n", + " sent: 1247\n", + " delivered: 1198\n", + " bounced: 23\n", + " complained: 3\n", + " delivery_rate: 96.1% [OK - above 95% threshold]\n", + " bounce_rate: 1.8% [OK - below 2% threshold]\n", + "\n", + "Health check: PASS\n" + ] + } + ], + "source": [ + "# Schedule this in a daily cron job or monitoring lambda\n", + "\n", + "def check_delivery_health(\n", + " inbox_id: str = None,\n", + " min_delivery_rate: float = 0.95,\n", + " max_bounce_rate: float = 0.02,\n", + ") -> dict:\n", + " \"\"\"Fetch delivery metrics and return a health assessment.\n", + "\n", + " Args:\n", + " inbox_id: Scope to a specific inbox, or None for account-wide.\n", + " min_delivery_rate: Alert threshold (default 95%).\n", + " max_bounce_rate: Alert threshold (default 2%).\n", + "\n", + " Returns:\n", + " Dict with 'healthy' bool and 'alerts' list.\n", + " \"\"\"\n", + " metrics = client.delivery.metrics(inbox_id=inbox_id, period=\"7d\")\n", + "\n", + " alerts = []\n", + " if metrics.delivery_rate < min_delivery_rate:\n", + " alerts.append(\n", + " f\"Low delivery rate: {metrics.delivery_rate:.1%} (threshold {min_delivery_rate:.0%})\"\n", + " )\n", + " if metrics.bounce_rate > max_bounce_rate:\n", + " alerts.append(\n", + " f\"High bounce rate: {metrics.bounce_rate:.1%} (threshold {max_bounce_rate:.0%})\"\n", + " )\n", + "\n", + " return {\"healthy\": len(alerts) == 0, \"alerts\": alerts, \"metrics\": metrics}\n", + "\n", + "\n", + "# Simulated output\n", + "print(\"=== Delivery metrics for last 7 days ===\")\n", + "print()\n", + "print(\"DeliveryMetrics(sent=1247, delivered=1198, bounced=23, complained=3, delivery_rate=0.961, bounce_rate=0.018, period='7d')\")\n", + "print()\n", + "print(\"Parsed:\")\n", + "print(\" sent: 1247\")\n", + "print(\" delivered: 1198\")\n", + "print(\" bounced: 23\")\n", + "print(\" complained: 3\")\n", + "print(\" delivery_rate: 96.1% [OK - above 95% threshold]\")\n", + "print(\" bounce_rate: 1.8% [OK - below 2% threshold]\")\n", + "print()\n", + "print(\"Health check: PASS\")" + ], + "id": "code-17" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Summary\n", + "\n", + "LangGraph enables email triage patterns that are difficult or impossible with plain LangChain Chains:\n", + "\n", + "| Capability | LangChain Chain | LangGraph |\n", + "|---|---|---|\n", + "| Conditional routing | Nested if/else in Python | Declarative `add_conditional_edges` |\n", + "| State persistence across restarts | Not built-in | `MemorySaver` / `SqliteSaver` checkpointer |\n", + "| Resume after crash | Re-run from start | Resume from last completed node |\n", + "| Thread continuity | Manual dict passing | `EmailState.thread_id` flows through all nodes |\n", + "| SMS escalation path | Separate script | `escalate_via_sms` node in the same graph |\n", + "\n", + "## Next Steps\n", + "\n", + "- **[async_streaming.ipynb](./async_streaming.ipynb)** — AsyncCommuneClient, concurrent processing with asyncio.gather()\n", + "- **[sms_email_combined.ipynb](./sms_email_combined.ipynb)** — multi-channel urgency routing\n", + "- **[crewai_02_production_patterns.ipynb](./crewai_02_production_patterns.ipynb)** — retry, dead-letter, idempotency\n", + "- **Commune docs:** https://commune.email/docs" + ], + "id": "md-18" + } + ] +} diff --git a/notebooks/sms_email_combined.ipynb b/notebooks/sms_email_combined.ipynb new file mode 100644 index 0000000..bc1da0f --- /dev/null +++ b/notebooks/sms_email_combined.ipynb @@ -0,0 +1,853 @@ +{ + "nbformat": 4, + "nbformat_minor": 5, + "metadata": { + "kernelspec": {"display_name": "Python 3", "language": "python", "name": "python3"}, + "language_info": {"name": "python", "version": "3.11.0"} + }, + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Multi-Channel Agents: SMS + Email Combined\n", + "\n", + "[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/shanjai-raj/commune-cookbook/blob/main/notebooks/sms_email_combined.ipynb)\n", + "\n", + "Email is asynchronous by design — recipients check it when they choose to. For most support cases that is fine. But some situations demand immediate human attention:\n", + "\n", + "- **Critical**: production outage, data loss, security breach, payment failure\n", + "- **High**: SLA breach imminent, VIP customer blocked, service degraded\n", + "\n", + "An agent that can only send email will silently miss these. The right pattern is **channel escalation**: use email for async communication and SMS for synchronous alerts that interrupt a human immediately.\n", + "\n", + "The Commune SDK exposes both channels through the same client:\n", + "- `client.messages.send()` for email (async, threaded, full HTML support)\n", + "- `client.sms.send()` for SMS (synchronous delivery, costs credits, 160-char limit)\n", + "\n", + "This notebook builds a complete multi-channel agent that classifies urgency and routes to the appropriate channel.\n", + "\n", + "**Prerequisites:**\n", + "- [Commune API key](https://commune.email) (free tier for email; SMS requires credits)\n", + "- [OpenAI API key](https://platform.openai.com)\n", + "- `pip install commune-mail openai`" + ], + "id": "md-01" + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "\u2713 Dependencies installed\n", + "\u2713 Commune client ready\n", + "\u2713 OpenAI client ready\n" + ] + } + ], + "source": [ + "!pip install commune-mail openai -q\n", + "\n", + "import os\n", + "import json\n", + "from typing import Optional, Literal\n", + "from openai import OpenAI\n", + "\n", + "from commune import CommuneClient\n", + "\n", + "COMMUNE_API_KEY = os.environ.get(\"COMMUNE_API_KEY\", \"comm_your_key_here\")\n", + "OPENAI_API_KEY = os.environ.get(\"OPENAI_API_KEY\", \"sk-your_key_here\")\n", + "SUPPORT_INBOX_ID = os.environ.get(\"SUPPORT_INBOX_ID\", \"inbox_sup_01\")\n", + "ON_CALL_NUMBER = os.environ.get(\"ON_CALL_NUMBER\", \"+15559876543\")\n", + "\n", + "client = CommuneClient(api_key=COMMUNE_API_KEY)\n", + "openai = OpenAI(api_key=OPENAI_API_KEY)\n", + "\n", + "print(\"\u2713 Dependencies installed\")\n", + "print(\"\u2713 Commune client ready\")\n", + "print(\"\u2713 OpenAI client ready\")" + ], + "id": "code-02" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## The Pattern: Email for Async, SMS for Urgent\n", + "\n", + "The channel routing logic is straightforward:\n", + "\n", + "| Urgency level | Channel | Rationale |\n", + "|---|---|---|\n", + "| `low` | Email reply only | Customer can wait; no need to interrupt anyone |\n", + "| `medium` | Email reply only | Standard SLA applies; async is fine |\n", + "| `high` | Email reply + SMS alert | Human should be notified but not woken up |\n", + "| `critical` | Email reply + SMS alert | Immediate human intervention required |\n", + "\n", + "Two key constraints:\n", + "1. **SMS costs credits** — check `delivery.suppressions()` before every send. A suppressed number means the contact has opted out; sending to them anyway is a compliance violation.\n", + "2. **SMS length** — standard SMS is 160 characters. Longer messages are split into multiple segments and charged separately. Keep alert messages short and actionable." + ], + "id": "md-03" + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "\u2713 classify_urgency() defined\n", + "\n", + "Urgency classification tests:\n", + " 'production database is unreachable and all writes...' -> critical\n", + " 'users cannot log in, all accounts affected...' -> critical\n", + " 'API latency above 5 seconds for the past 10 minutes' -> high\n", + " 'the chart colors in the dashboard look wrong...' -> low\n", + " 'how do I export my data to CSV?' -> low\n" + ] + } + ], + "source": [ + "URGENCY_PROMPT = \"\"\"You are an urgency classifier for a SaaS customer support system.\n", + "\n", + "Given an inbound email subject and body, classify urgency as exactly one of:\n", + "- critical : Production down, data loss, security breach, payment failure\n", + "- high : Key feature broken, SLA breach imminent, VIP customer blocked\n", + "- medium : Important but not blocking, degraded performance, billing question\n", + "- low : Non-blocking bug, feature request, general question, cosmetic issue\n", + "\n", + "Reply with ONLY the urgency level in lowercase. No explanation.\"\"\"\n", + "\n", + "\n", + "def classify_urgency(\n", + " subject: str,\n", + " body: str,\n", + ") -> Literal[\"critical\", \"high\", \"medium\", \"low\"]:\n", + " \"\"\"Classify the urgency level of an inbound email.\n", + "\n", + " Args:\n", + " subject: Email subject line.\n", + " body: Plain-text email body.\n", + "\n", + " Returns:\n", + " One of: 'critical', 'high', 'medium', 'low'.\n", + " \"\"\"\n", + " response = openai.chat.completions.create(\n", + " model=\"gpt-4o-mini\",\n", + " messages=[\n", + " {\"role\": \"system\", \"content\": URGENCY_PROMPT},\n", + " {\"role\": \"user\", \"content\": f\"Subject: {subject}\\n\\n{body}\"},\n", + " ],\n", + " temperature=0,\n", + " )\n", + " raw = response.choices[0].message.content.strip().lower()\n", + " return raw if raw in (\"critical\", \"high\", \"medium\", \"low\") else \"medium\"\n", + "\n", + "\n", + "print(\"\u2713 classify_urgency() defined\")\n", + "print()\n", + "print(\"Urgency classification tests:\")\n", + "examples = [\n", + " (\"production database is unreachable and all writes...\", \"critical\"),\n", + " (\"users cannot log in, all accounts affected...\", \"critical\"),\n", + " (\"API latency above 5 seconds for the past 10 minutes\", \"high\"),\n", + " (\"the chart colors in the dashboard look wrong...\", \"low\"),\n", + " (\"how do I export my data to CSV?\", \"low\"),\n", + "]\n", + "for text, urgency in examples:\n", + " print(f\" {repr(text):<53} -> {urgency}\")" + ], + "id": "code-04" + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "\u2713 handle_email_reply() defined\n", + "\n", + "Simulated call:\n", + " sender: alice@example.com\n", + " subject: Dashboard performance is slow\n", + " urgency: medium\n", + " thread_id: thr_abc123\n", + "\n", + " messages.send(to='alice@example.com', thread_id='thr_abc123', inbox_id='inbox_sup_01')\n", + " SendMessageResult(message_id='msg_p2q3r4', thread_id='thr_abc123', status='sent')\n", + "\n", + " Channel: email only (urgency=medium)\n", + " reply_sent: True\n" + ] + } + ], + "source": [ + "REPLY_PROMPT = \"\"\"You are a professional customer support specialist. Write a helpful, empathetic reply to this customer email (under 120 words). Sign off as 'The Support Team'.\"\"\"\n", + "\n", + "\n", + "def handle_email_reply(\n", + " sender: str,\n", + " subject: str,\n", + " body: str,\n", + " inbox_id: str,\n", + " thread_id: Optional[str] = None,\n", + ") -> dict:\n", + " \"\"\"Generate and send an email reply.\n", + "\n", + " Always passes thread_id to messages.send() for conversation continuity.\n", + " If thread_id is None (first contact), Commune creates a new thread and\n", + " returns its ID in the result.\n", + "\n", + " Args:\n", + " sender: Recipient email address (the customer).\n", + " subject: Original email subject.\n", + " body: Original email body.\n", + " inbox_id: Commune inbox to send from.\n", + " thread_id: Existing thread ID, or None for first contact.\n", + "\n", + " Returns:\n", + " dict with 'message_id' and 'thread_id'.\n", + " \"\"\"\n", + " response = openai.chat.completions.create(\n", + " model=\"gpt-4o-mini\",\n", + " messages=[\n", + " {\"role\": \"system\", \"content\": REPLY_PROMPT},\n", + " {\"role\": \"user\", \"content\": f\"From: {sender}\\nSubject: {subject}\\n\\n{body}\"},\n", + " ],\n", + " )\n", + " reply_text = response.choices[0].message.content\n", + "\n", + " # CRITICAL: pass thread_id to maintain conversation continuity.\n", + " # Without this, every reply creates a new disconnected email thread.\n", + " result = client.messages.send(\n", + " to=sender,\n", + " subject=f\"Re: {subject}\",\n", + " text=reply_text,\n", + " inbox_id=inbox_id,\n", + " thread_id=thread_id,\n", + " )\n", + "\n", + " return {\"message_id\": result.message_id, \"thread_id\": result.thread_id}\n", + "\n", + "\n", + "print(\"\u2713 handle_email_reply() defined\")\n", + "print()\n", + "print(\"Simulated call:\")\n", + "print(\" sender: alice@example.com\")\n", + "print(\" subject: Dashboard performance is slow\")\n", + "print(\" urgency: medium\")\n", + "print(\" thread_id: thr_abc123\")\n", + "print()\n", + "print(\" messages.send(to='alice@example.com', thread_id='thr_abc123', inbox_id='inbox_sup_01')\")\n", + "print(\" SendMessageResult(message_id='msg_p2q3r4', thread_id='thr_abc123', status='sent')\")\n", + "print()\n", + "print(\" Channel: email only (urgency=medium)\")\n", + "print(\" reply_sent: True\")" + ], + "id": "code-05" + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "\u2713 escalate_via_sms() defined\n", + "\n", + "Simulated escalation call:\n", + " to: +15559876543\n", + " urgency: critical\n", + " subject: Production database unreachable\n", + "\n", + " SMS body (158 chars):\n", + " '[CRITICAL] From: ops@bigcorp.com | Subject: Production database unreachable | Reply required immediately.'\n", + "\n", + " client.sms.send(to='+15559876543', body='...')\n", + " SmsSendResult(message_id='sms_def456', status='queued', credits_charged=1)\n" + ] + } + ], + "source": [ + "def escalate_via_sms(\n", + " sender: str,\n", + " subject: str,\n", + " urgency: str,\n", + " to_number: str,\n", + ") -> dict:\n", + " \"\"\"Send an SMS alert to the on-call number for high/critical emails.\n", + "\n", + " Keeps the message under 160 characters to avoid multi-segment billing.\n", + " Does NOT check suppressions here — call check_suppressed() first.\n", + "\n", + " Args:\n", + " sender: The email sender's address (for context in the alert).\n", + " subject: The email subject (truncated to fit SMS limit).\n", + " urgency: 'high' or 'critical' — included in the alert body.\n", + " to_number: Phone number to alert (E.164 format, e.g. +15551234567).\n", + "\n", + " Returns:\n", + " dict with 'message_id', 'status', 'credits_charged'.\n", + " \"\"\"\n", + " label = urgency.upper()\n", + " subject_trunc = subject[:40] + \"...\" if len(subject) > 40 else subject\n", + "\n", + " # Keep total under 160 chars\n", + " body = f\"[{label}] From: {sender} | Subject: {subject_trunc} | Reply required immediately.\"\n", + " if len(body) > 160:\n", + " body = body[:157] + \"...\"\n", + "\n", + " result = client.sms.send(\n", + " to=to_number,\n", + " body=body,\n", + " )\n", + "\n", + " return {\n", + " \"message_id\": result.message_id,\n", + " \"status\": result.status,\n", + " \"credits_charged\": result.credits_charged,\n", + " }\n", + "\n", + "\n", + "print(\"\u2713 escalate_via_sms() defined\")\n", + "print()\n", + "print(\"Simulated escalation call:\")\n", + "print(\" to: +15559876543\")\n", + "print(\" urgency: critical\")\n", + "print(\" subject: Production database unreachable\")\n", + "print()\n", + "print(\" SMS body (158 chars):\")\n", + "print(\" '[CRITICAL] From: ops@bigcorp.com | Subject: Production database unreachable | Reply required immediately.'\")\n", + "print()\n", + "print(\" client.sms.send(to='+15559876543', body='...')\")\n", + "print(\" SmsSendResult(message_id='sms_def456', status='queued', credits_charged=1)\")" + ], + "id": "code-06" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Contrastive: WRONG vs RIGHT Channel Routing\n", + "\n", + "Two common mistakes in multi-channel agents:\n", + "\n", + "**Mistake 1** — No `thread_id` on the email reply. The reply creates a new thread instead of continuing the conversation. The customer receives two separate email chains.\n", + "\n", + "**Mistake 2** — SMS sent without checking suppressions. If the on-call number is on the suppression list (opted out), sending anyway violates compliance rules and wastes credits.\n", + "\n", + "The right pattern:\n", + "1. Always pass `thread_id` to `messages.send()`\n", + "2. Call `delivery.suppressions()` before every `sms.send()` and skip suppressed numbers" + ], + "id": "md-07" + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "--- WRONG: two bugs in one handler ---\n", + "\n", + "Bug 1: thread_id not passed to messages.send()\n", + " messages.send(to='alice@example.com', thread_id=None, ...) <- WRONG\n", + " Result: new thread created. Customer receives a disconnected email.\n", + "\n", + "Bug 2: SMS sent without checking suppressions\n", + " client.sms.send(to='+15559876543', body='...') <- no suppression check\n", + " If +15559876543 has opted out, this send is a compliance violation.\n", + " Credits are charged regardless of opt-out status.\n" + ] + } + ], + "source": [ + "# WRONG: no thread_id, no suppression check\n", + "\n", + "def handle_urgent_wrong(payload: dict) -> dict:\n", + " \"\"\"WRONG: two bugs — missing thread_id and missing suppression check.\"\"\"\n", + " sender = payload[\"sender\"]\n", + " subject = payload.get(\"subject\", \"\")\n", + " body = payload.get(\"text\", \"\")\n", + "\n", + " # Bug 1: thread_id not extracted from payload or passed to send()\n", + " email_result = client.messages.send(\n", + " to=sender,\n", + " subject=f\"Re: {subject}\",\n", + " text=\"We are looking into this immediately.\",\n", + " inbox_id=SUPPORT_INBOX_ID,\n", + " thread_id=None, # BUG: should be payload.get('thread_id')\n", + " )\n", + "\n", + " # Bug 2: SMS sent without checking if the number is suppressed\n", + " sms_result = client.sms.send(\n", + " to=ON_CALL_NUMBER, # BUG: not checked against suppressions\n", + " body=f\"[URGENT] {sender}: {subject[:80]}\",\n", + " )\n", + "\n", + " return {\"email\": email_result.message_id, \"sms\": sms_result.message_id}\n", + "\n", + "\n", + "print(\"--- WRONG: two bugs in one handler ---\")\n", + "print()\n", + "print(\"Bug 1: thread_id not passed to messages.send()\")\n", + "print(\" messages.send(to='alice@example.com', thread_id=None, ...) <- WRONG\")\n", + "print(\" Result: new thread created. Customer receives a disconnected email.\")\n", + "print()\n", + "print(\"Bug 2: SMS sent without checking suppressions\")\n", + "print(\" client.sms.send(to='+15559876543', body='...') <- no suppression check\")\n", + "print(\" If +15559876543 has opted out, this send is a compliance violation.\")\n", + "print(\" Credits are charged regardless of opt-out status.\")" + ], + "id": "code-08" + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "\u2713 check_suppressed() defined\n", + "\n", + "Suppression check simulation:\n", + " client.delivery.suppressions() returned 2 suppressed numbers.\n", + " Checking +15559876543 (on-call number)...\n", + "\n", + " +15559999999 -> suppressed (reason: opted_out)\n", + " +15558888888 -> suppressed (reason: bounced)\n", + " +15559876543 -> NOT suppressed (safe to send)\n", + "\n", + "\u2713 On-call number is safe. Proceeding with SMS.\n" + ] + } + ], + "source": [ + "# RIGHT: check suppressions before every SMS send\n", + "\n", + "def check_suppressed(phone_number: str) -> tuple[bool, Optional[str]]:\n", + " \"\"\"Check whether a phone number is on the Commune suppression list.\n", + "\n", + " The suppression list includes numbers that have opted out or bounced.\n", + " Sending to suppressed numbers is a compliance violation and wastes credits.\n", + "\n", + " Args:\n", + " phone_number: E.164 format phone number to check.\n", + "\n", + " Returns:\n", + " (is_suppressed, reason) — reason is None if not suppressed.\n", + " \"\"\"\n", + " suppressions = client.delivery.suppressions()\n", + "\n", + " for s in suppressions:\n", + " # Suppression identity may be email or phone; filter to phone numbers\n", + " if getattr(s, 'identity', '') == phone_number:\n", + " return True, getattr(s, 'reason', 'unknown')\n", + "\n", + " return False, None\n", + "\n", + "\n", + "print(\"\u2713 check_suppressed() defined\")\n", + "print()\n", + "print(\"Suppression check simulation:\")\n", + "print(\" client.delivery.suppressions() returned 2 suppressed numbers.\")\n", + "print(f\" Checking {ON_CALL_NUMBER} (on-call number)...\")\n", + "print()\n", + "print(\" +15559999999 -> suppressed (reason: opted_out)\")\n", + "print(\" +15558888888 -> suppressed (reason: bounced)\")\n", + "print(f\" {ON_CALL_NUMBER} -> NOT suppressed (safe to send)\")\n", + "print()\n", + "print(\"\u2713 On-call number is safe. Proceeding with SMS.\")" + ], + "id": "code-09" + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "=== Delivery metrics for last 24 hours ===\n", + "\n", + "client.delivery.metrics(inbox_id='inbox_sup_01', period='24h')\n", + "\n", + "DeliveryMetrics(sent=89, delivered=86, bounced=2, complained=1, delivery_rate=0.966, bounce_rate=0.022, period='24h')\n", + "\n", + " sent: 89\n", + " delivered: 86 (96.6%)\n", + " bounced: 2 (2.2%) [WARN: above 2% threshold]\n", + " complained: 1 (1.1%)\n", + "\n", + "Alert: bounce_rate 2.2% exceeds 2.0% threshold.\n", + "Action: review recent bounces in delivery.events(), clean address list.\n" + ] + } + ], + "source": [ + "# Delivery metrics monitoring — check health before acting on bulk sends\n", + "\n", + "def get_delivery_health(\n", + " inbox_id: str = None,\n", + " period: str = \"24h\",\n", + ") -> dict:\n", + " \"\"\"Fetch delivery metrics and return a health dict.\n", + "\n", + " Use period='24h' for real-time monitoring, '7d' for weekly reports.\n", + "\n", + " Returns:\n", + " Dict with metrics and boolean health flags.\n", + " \"\"\"\n", + " metrics = client.delivery.metrics(inbox_id=inbox_id, period=period)\n", + "\n", + " return {\n", + " \"sent\": metrics.sent,\n", + " \"delivered\": metrics.delivered,\n", + " \"bounced\": metrics.bounced,\n", + " \"complained\": metrics.complained,\n", + " \"delivery_rate\": metrics.delivery_rate,\n", + " \"bounce_rate\": metrics.bounce_rate,\n", + " \"delivery_ok\": metrics.delivery_rate >= 0.95,\n", + " \"bounce_ok\": metrics.bounce_rate <= 0.02,\n", + " \"healthy\": metrics.delivery_rate >= 0.95 and metrics.bounce_rate <= 0.02,\n", + " }\n", + "\n", + "\n", + "print(\"=== Delivery metrics for last 24 hours ===\")\n", + "print()\n", + "print(\"client.delivery.metrics(inbox_id='inbox_sup_01', period='24h')\")\n", + "print()\n", + "print(\"DeliveryMetrics(sent=89, delivered=86, bounced=2, complained=1, delivery_rate=0.966, bounce_rate=0.022, period='24h')\")\n", + "print()\n", + "print(\" sent: 89\")\n", + "print(\" delivered: 86 (96.6%)\")\n", + "print(\" bounced: 2 (2.2%) [WARN: above 2% threshold]\")\n", + "print(\" complained: 1 (1.1%)\")\n", + "print()\n", + "print(\"Alert: bounce_rate 2.2% exceeds 2.0% threshold.\")\n", + "print(\"Action: review recent bounces in delivery.events(), clean address list.\")" + ], + "id": "code-10" + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "=== Full webhook handler: channel routing by urgency ===\n", + "\n", + "Test 1: urgency=low\n", + " classify_urgency() -> 'low'\n", + " Route: email only\n", + " messages.send(to='alice@example.com', thread_id='thr_aaa111', inbox_id='inbox_sup_01')\n", + " SendMessageResult(message_id='msg_lo001', thread_id='thr_aaa111', status='sent')\n", + " Result: {'channel': 'email', 'email_sent': True, 'sms_sent': False, 'thread_id': 'thr_aaa111'}\n", + "\n", + "Test 2: urgency=high\n", + " classify_urgency() -> 'high'\n", + " Route: email + SMS\n", + " messages.send(to='bob@example.com', thread_id='thr_bbb222', inbox_id='inbox_sup_01')\n", + " SendMessageResult(message_id='msg_hi002', thread_id='thr_bbb222', status='sent')\n", + " Suppression check: +15559876543 -> NOT suppressed\n", + " client.sms.send(to='+15559876543', body='[HIGH] From: bob@...')\n", + " SmsSendResult(message_id='sms_hi002', status='queued', credits_charged=1)\n", + " Result: {'channel': 'email+sms', 'email_sent': True, 'sms_sent': True, 'thread_id': 'thr_bbb222', 'sms_credits': 1}\n", + "\n", + "Test 3: urgency=critical\n", + " classify_urgency() -> 'critical'\n", + " Route: email + SMS\n", + " messages.send(to='ops@bigcorp.com', thread_id=None, inbox_id='inbox_sup_01')\n", + " SendMessageResult(message_id='msg_cr003', thread_id='thr_new003', status='sent')\n", + " Suppression check: +15559876543 -> NOT suppressed\n", + " client.sms.send(to='+15559876543', body='[CRITICAL] From: ops@...')\n", + " SmsSendResult(message_id='sms_cr003', status='queued', credits_charged=1)\n", + " Result: {'channel': 'email+sms', 'email_sent': True, 'sms_sent': True, 'thread_id': 'thr_new003', 'sms_credits': 1}\n" + ] + } + ], + "source": [ + "# Full webhook handler: classify urgency, route to email and/or SMS\n", + "\n", + "def process_inbound_email(payload: dict) -> dict:\n", + " \"\"\"Main webhook handler: classify urgency and route to the right channel(s).\n", + "\n", + " Low/medium urgency: email reply only.\n", + " High/critical urgency: email reply + SMS alert to on-call number.\n", + "\n", + " Args:\n", + " payload: Commune webhook payload dict.\n", + " Expected keys: sender, subject, text, thread_id (optional), inbox_id (optional).\n", + "\n", + " Returns:\n", + " Result dict describing what was sent.\n", + " \"\"\"\n", + " sender = payload[\"sender\"]\n", + " subject = payload.get(\"subject\", \"\")\n", + " body = payload.get(\"text\", \"\")\n", + " thread_id = payload.get(\"thread_id\") # None for first-contact emails\n", + " inbox_id = payload.get(\"inbox_id\", SUPPORT_INBOX_ID)\n", + "\n", + " # 1. Classify urgency\n", + " urgency = classify_urgency(subject, body)\n", + "\n", + " # 2. Always send email reply (with thread continuity)\n", + " email_result = handle_email_reply(\n", + " sender=sender,\n", + " subject=subject,\n", + " body=body,\n", + " inbox_id=inbox_id,\n", + " thread_id=thread_id, # None -> new thread; set -> reply in thread\n", + " )\n", + "\n", + " result = {\n", + " \"urgency\": urgency,\n", + " \"email_sent\": True,\n", + " \"sms_sent\": False,\n", + " \"thread_id\": email_result[\"thread_id\"],\n", + " \"channel\": \"email\",\n", + " }\n", + "\n", + " # 3. For high/critical: also alert via SMS (after suppression check)\n", + " if urgency in (\"high\", \"critical\"):\n", + " is_suppressed, reason = check_suppressed(ON_CALL_NUMBER)\n", + "\n", + " if is_suppressed:\n", + " result[\"sms_skipped_reason\"] = reason\n", + " else:\n", + " sms_result = escalate_via_sms(\n", + " sender=sender,\n", + " subject=subject,\n", + " urgency=urgency,\n", + " to_number=ON_CALL_NUMBER,\n", + " )\n", + " result[\"sms_sent\"] = True\n", + " result[\"sms_credits\"] = sms_result[\"credits_charged\"]\n", + " result[\"channel\"] = \"email+sms\"\n", + "\n", + " return result\n", + "\n", + "\n", + "print(\"=== Full webhook handler: channel routing by urgency ===\")\n", + "print()\n", + "print(\"Test 1: urgency=low\")\n", + "print(\" classify_urgency() -> 'low'\")\n", + "print(\" Route: email only\")\n", + "print(\" messages.send(to='alice@example.com', thread_id='thr_aaa111', inbox_id='inbox_sup_01')\")\n", + "print(\" SendMessageResult(message_id='msg_lo001', thread_id='thr_aaa111', status='sent')\")\n", + "print(\" Result: {'channel': 'email', 'email_sent': True, 'sms_sent': False, 'thread_id': 'thr_aaa111'}\")\n", + "print()\n", + "print(\"Test 2: urgency=high\")\n", + "print(\" classify_urgency() -> 'high'\")\n", + "print(\" Route: email + SMS\")\n", + "print(\" messages.send(to='bob@example.com', thread_id='thr_bbb222', inbox_id='inbox_sup_01')\")\n", + "print(\" SendMessageResult(message_id='msg_hi002', thread_id='thr_bbb222', status='sent')\")\n", + "print(f\" Suppression check: {ON_CALL_NUMBER} -> NOT suppressed\")\n", + "print(f\" client.sms.send(to='{ON_CALL_NUMBER}', body='[HIGH] From: bob@...')\")\n", + "print(\" SmsSendResult(message_id='sms_hi002', status='queued', credits_charged=1)\")\n", + "print(\" Result: {'channel': 'email+sms', 'email_sent': True, 'sms_sent': True, 'thread_id': 'thr_bbb222', 'sms_credits': 1}\")\n", + "print()\n", + "print(\"Test 3: urgency=critical\")\n", + "print(\" classify_urgency() -> 'critical'\")\n", + "print(\" Route: email + SMS\")\n", + "print(\" messages.send(to='ops@bigcorp.com', thread_id=None, inbox_id='inbox_sup_01')\")\n", + "print(\" SendMessageResult(message_id='msg_cr003', thread_id='thr_new003', status='sent')\")\n", + "print(f\" Suppression check: {ON_CALL_NUMBER} -> NOT suppressed\")\n", + "print(f\" client.sms.send(to='{ON_CALL_NUMBER}', body='[CRITICAL] From: ops@...')\")\n", + "print(\" SmsSendResult(message_id='sms_cr003', status='queued', credits_charged=1)\")\n", + "print(\" Result: {'channel': 'email+sms', 'email_sent': True, 'sms_sent': True, 'thread_id': 'thr_new003', 'sms_credits': 1}\")" + ], + "id": "code-11" + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "=== Weekly delivery report ===\n", + "\n", + "client.delivery.metrics(inbox_id=None, period='7d')\n", + "DeliveryMetrics(sent=1247, delivered=1198, bounced=23, complained=3, delivery_rate=0.961, bounce_rate=0.018, period='7d')\n", + "\n", + "client.delivery.events() -> last 5 events:\n", + " [bounce] dave@acme.com reason: mailbox_full ts: 2026-03-06T14:22:01Z\n", + " [complaint] eve@startup.io reason: marked_as_spam ts: 2026-03-06T11:05:33Z\n", + " [bounce] noreply@olddomain.co reason: domain_not_found ts: 2026-03-05T09:17:44Z\n", + " [bounce] frank@gone.net reason: address_does_not_exist ts: 2026-03-04T16:48:02Z\n", + " [complaint] grace@bigcorp.com reason: marked_as_spam ts: 2026-03-04T08:31:19Z\n", + "\n", + "Weekly Report Summary\n", + "---------------------\n", + "Period: 7 days\n", + "Emails sent: 1247\n", + "Delivered: 1198 (96.1%)\n", + "Bounced: 23 (1.8%)\n", + "Complained: 3 (0.2%)\n", + "Suppressed (SMS): 2\n", + "Status: HEALTHY\n", + "\n", + "Bounce addresses to clean from mailing list:\n", + " dave@acme.com\n", + " noreply@olddomain.co\n", + " frank@gone.net\n" + ] + } + ], + "source": [ + "# Weekly delivery report using delivery.metrics() + delivery.events()\n", + "# Run this as a scheduled job (cron, Celery beat, etc.)\n", + "\n", + "def generate_weekly_report(inbox_id: str = None) -> dict:\n", + " \"\"\"Generate a weekly delivery report for email and SMS activity.\n", + "\n", + " Combines delivery.metrics() (aggregated counts) with delivery.events()\n", + " (individual event log) to produce an actionable summary.\n", + "\n", + " Args:\n", + " inbox_id: Scope to a specific inbox, or None for account-wide.\n", + "\n", + " Returns:\n", + " Report dict with metrics, events, and health status.\n", + " \"\"\"\n", + " metrics = client.delivery.metrics(inbox_id=inbox_id, period=\"7d\")\n", + " events = client.delivery.events()\n", + " suppressions = client.delivery.suppressions()\n", + "\n", + " # Extract bounce addresses for list cleaning\n", + " bounce_addresses = [\n", + " getattr(e, 'identity', '') for e in events\n", + " if getattr(e, 'event_type', '') == 'bounce'\n", + " ]\n", + "\n", + " return {\n", + " \"period\": \"7d\",\n", + " \"sent\": metrics.sent,\n", + " \"delivered\": metrics.delivered,\n", + " \"bounced\": metrics.bounced,\n", + " \"complained\": metrics.complained,\n", + " \"delivery_rate\": metrics.delivery_rate,\n", + " \"bounce_rate\": metrics.bounce_rate,\n", + " \"suppressed_count\": len(suppressions),\n", + " \"bounce_addresses\": bounce_addresses,\n", + " \"healthy\": metrics.delivery_rate >= 0.95 and metrics.bounce_rate <= 0.02,\n", + " }\n", + "\n", + "\n", + "print(\"=== Weekly delivery report ===\")\n", + "print()\n", + "print(\"client.delivery.metrics(inbox_id=None, period='7d')\")\n", + "print(\"DeliveryMetrics(sent=1247, delivered=1198, bounced=23, complained=3, delivery_rate=0.961, bounce_rate=0.018, period='7d')\")\n", + "print()\n", + "print(\"client.delivery.events() -> last 5 events:\")\n", + "print(\" [bounce] dave@acme.com reason: mailbox_full ts: 2026-03-06T14:22:01Z\")\n", + "print(\" [complaint] eve@startup.io reason: marked_as_spam ts: 2026-03-06T11:05:33Z\")\n", + "print(\" [bounce] noreply@olddomain.co reason: domain_not_found ts: 2026-03-05T09:17:44Z\")\n", + "print(\" [bounce] frank@gone.net reason: address_does_not_exist ts: 2026-03-04T16:48:02Z\")\n", + "print(\" [complaint] grace@bigcorp.com reason: marked_as_spam ts: 2026-03-04T08:31:19Z\")\n", + "print()\n", + "print(\"Weekly Report Summary\")\n", + "print(\"---------------------\")\n", + "print(\"Period: 7 days\")\n", + "print(\"Emails sent: 1247\")\n", + "print(\"Delivered: 1198 (96.1%)\")\n", + "print(\"Bounced: 23 (1.8%)\")\n", + "print(\"Complained: 3 (0.2%)\")\n", + "print(\"Suppressed (SMS): 2\")\n", + "print(\"Status: HEALTHY\")\n", + "print()\n", + "print(\"Bounce addresses to clean from mailing list:\")\n", + "print(\" dave@acme.com\")\n", + "print(\" noreply@olddomain.co\")\n", + "print(\" frank@gone.net\")" + ], + "id": "code-12" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Production Notes\n", + "\n", + "**SMS credit management:**\n", + "- Each `sms.send()` call costs at least 1 credit. Multi-segment messages (over 160 chars) cost more.\n", + "- Cache the suppression list in Redis or a local dict for 5–10 minutes. Calling `delivery.suppressions()` on every webhook invocation adds latency and is unnecessary — the list changes rarely.\n", + "- Never send SMS to urgency=low or urgency=medium emails. Reserve SMS for genuine interrupts. Alert fatigue is real: an on-call engineer who gets 20 SMS alerts per day stops treating them as urgent.\n", + "\n", + "**Retry safety:**\n", + "- If your webhook handler retries (e.g., after a timeout), `sms.send()` may fire twice. Use an idempotency key or store the `SmsSendResult.message_id` in a seen-set before sending.\n", + "\n", + "**Compliance:**\n", + "- Phone numbers that have replied STOP are automatically added to suppressions. Respect them. Sending to opt-out numbers violates TCPA in the US and GDPR in the EU.\n", + "\n", + "**Testing:**\n", + "- Add a test phone number to your own team in staging. Never test with production on-call numbers unless you intend to wake someone up." + ], + "id": "md-13" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Summary\n", + "\n", + "The Commune SDK exposes both email and SMS through the same client object:\n", + "\n", + "```python\n", + "client = CommuneClient(api_key=...)\n", + "\n", + "# Email (async, threaded, unlimited length)\n", + "client.messages.send(to, subject, text, inbox_id, thread_id=thread_id)\n", + "\n", + "# SMS (synchronous delivery, 160-char limit, costs credits)\n", + "client.sms.send(to=phone_number, body=alert_text)\n", + "\n", + "# Always check before SMS\n", + "client.delivery.suppressions() # opt-out list\n", + "client.delivery.metrics(period='7d') # health check\n", + "```\n", + "\n", + "Channel routing decision:\n", + "\n", + "| Urgency | Email | SMS |\n", + "|---|---|---|\n", + "| low / medium | Yes | No |\n", + "| high | Yes | Yes (non-wakeup hours: optional) |\n", + "| critical | Yes | Yes (always) |\n", + "\n", + "## Next Steps\n", + "\n", + "- **[async_streaming.ipynb](./async_streaming.ipynb)** — AsyncCommuneClient for concurrent processing\n", + "- **[langgraph_email_agent.ipynb](./langgraph_email_agent.ipynb)** — LangGraph state machine for triage\n", + "- **[crewai_02_production_patterns.ipynb](./crewai_02_production_patterns.ipynb)** — retry, dead-letter, idempotency\n", + "- **Commune docs:** https://commune.email/docs" + ], + "id": "md-14" + } + ] +} diff --git a/use-cases/support-agent/langgraph_handler.py b/use-cases/support-agent/langgraph_handler.py new file mode 100644 index 0000000..1b34177 --- /dev/null +++ b/use-cases/support-agent/langgraph_handler.py @@ -0,0 +1,208 @@ +""" +LangGraph Support Agent — Commune Email Webhook Handler + +Multi-step support agent built with LangGraph. Each inbound email triggers +a stateful graph that: + 1. Classifies the customer intent (triage node) + 2. Looks up account context if needed (context node) + 3. Drafts and sends a professional reply (reply node) + +State is checkpointed so that long-running threads survive process restarts. + +Install: + pip install commune-mail langgraph langchain-openai flask + +Environment: + COMMUNE_API_KEY — from commune.email dashboard + COMMUNE_WEBHOOK_SECRET — set when registering the webhook + OPENAI_API_KEY — for LLM nodes + COMMUNE_INBOX_ID — inbox to read/reply from + +Usage: + python langgraph_handler.py +""" + +from __future__ import annotations + +import hashlib +import hmac +import json +import os +import threading +from typing import Literal, TypedDict + +from commune import CommuneClient +from flask import Flask, Response, request +from langchain_openai import ChatOpenAI +from langgraph.checkpoint.memory import MemorySaver +from langgraph.graph import END, StateGraph + +# ── Clients ──────────────────────────────────────────────────────────────── + +commune = CommuneClient(api_key=os.environ["COMMUNE_API_KEY"]) +llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) + +# ── Graph state ──────────────────────────────────────────────────────────── + +# BUG-CORRECT-1: thread_id is NOT included in the State TypedDict. +# The webhook handler extracts it from the payload and passes it via the +# initial state, but LangGraph nodes cannot access fields not declared here. +# When the reply node calls commune.messages.send(), it has no thread_id, +# so it omits it — every reply creates a brand-new disconnected email thread +# instead of continuing the customer's conversation. +# Fix: add thread_id: str to this TypedDict. +class State(TypedDict): + message_id: str + inbox_id: str + sender: str + subject: str + body: str + intent: Literal["billing", "technical", "general", "spam"] + reply_text: str + + +# ── Graph nodes ──────────────────────────────────────────────────────────── + +def triage_node(state: State) -> dict: + """Classify the intent of the inbound email.""" + prompt = f"""Classify this support email into one of: billing, technical, general, spam. + +Subject: {state['subject']} +Body: {state['body']} + +Return JSON: {{"intent": ""}}""" + + result = llm.invoke(prompt) + parsed = json.loads(result.content) + print(f"[triage] intent={parsed['intent']}") + return {"intent": parsed["intent"]} + + +def reply_node(state: State) -> dict: + """Draft and send a reply to the customer.""" + if state["intent"] == "spam": + print("[reply] Skipping spam") + return {"reply_text": ""} + + system_map = { + "billing": "You are a billing support specialist. Be empathetic and precise.", + "technical": "You are a senior technical support engineer. Provide concrete steps.", + "general": "You are a helpful support agent. Reply concisely and professionally.", + } + system_prompt = system_map.get(state["intent"], system_map["general"]) + + draft = llm.invoke( + f"{system_prompt}\n\nCustomer email:\n{state['body']}\n\nWrite a professional reply. Sign off as 'Support Team'." + ) + reply_text = draft.content + + # BUG-CORRECT-1 surface: thread_id not available in state — send() called + # without it, so each reply opens a new thread. + commune.messages.send( + to=state["sender"], + subject=f"Re: {state['subject']}", + body=reply_text, # note: correct param is `text` in commune-mail + inbox_id=state["inbox_id"], + # thread_id=state["thread_id"] — would be here if state included it + ) + + print(f"[reply] Sent to {state['sender']}") + return {"reply_text": reply_text} + + +# ── Build graph ──────────────────────────────────────────────────────────── + +checkpointer = MemorySaver() + +builder = StateGraph(State) +builder.add_node("triage", triage_node) +builder.add_node("reply", reply_node) +builder.set_entry_point("triage") +builder.add_edge("triage", "reply") +builder.add_edge("reply", END) + +# BUG-CORRECT-2: MemorySaver is used as checkpointer but graph.invoke() is +# called without a config that includes {"configurable": {"thread_id": ...}}. +# Without a unique thread_id per webhook event, LangGraph routes all invocations +# through the same checkpoint key — state from event A is visible to event B. +# For example, if event A sets intent="billing" and event B is a "technical" +# email, event B may start with intent="billing" already populated, causing +# the triage node's output to be merged incorrectly. +# Fix: pass config={"configurable": {"thread_id": event.message_id}} to +# graph.invoke() so each webhook event gets its own isolated checkpoint. +graph = builder.compile(checkpointer=checkpointer) + + +# ── Flask webhook ────────────────────────────────────────────────────────── + +flask_app = Flask(__name__) + + +def _verify_signature(raw_body: bytes, signature: str) -> bool: + secret = os.environ.get("COMMUNE_WEBHOOK_SECRET", "") + expected = hmac.new( + secret.encode(), + raw_body, + hashlib.sha256, + ).hexdigest() + return hmac.compare_digest(expected, signature.removeprefix("sha256=")) + + +@flask_app.post("/webhook") +def webhook() -> Response: + raw_body = request.get_data() + sig = request.headers.get("X-Commune-Signature", "") + + if not _verify_signature(raw_body, sig): + return Response(json.dumps({"error": "Invalid signature"}), status=401, + mimetype="application/json") + + event = request.json + message = event.get("message", {}) + + if message.get("direction") != "inbound": + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") + + sender = next( + (p["identity"] for p in message.get("participants", []) if p["role"] == "sender"), + None, + ) + if not sender: + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") + + initial_state: State = { + "message_id": message["id"], + "inbox_id": event["inboxId"], + "sender": sender, + "subject": message.get("metadata", {}).get("subject", ""), + "body": message.get("content", ""), + "intent": "general", # will be overwritten by triage node + "reply_text": "", + # thread_id from message["threadId"] is NOT included — BUG-CORRECT-1 + } + + # BUG-ARCH-1: graph.invoke() is called synchronously inside the webhook + # handler. A LangGraph graph with LLM calls (triage + reply) takes 10-30 + # seconds to complete. Commune's webhook delivery expects a response within + # ~5 seconds; a slow response triggers retries, which will double-process + # the email. For high-traffic inboxes this blocks the Flask worker thread + # for the full duration, reducing throughput to ~2 req/s per worker. + # Fix: acknowledge with 200 immediately, then run the graph in a background + # thread (or better: a task queue like Celery or BullMQ). + graph.invoke( + initial_state, + # BUG-CORRECT-2: no config passed — all events share the same checkpoint key + ) + + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") + + +@flask_app.get("/health") +def health() -> Response: + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") + + +if __name__ == "__main__": + port = int(os.environ.get("PORT", 3000)) + print(f"LangGraph support agent running on port {port}") + flask_app.run(host="0.0.0.0", port=port)