Skip to content

Commit 40d83f3

Browse files
bokelleyclaude
andauthored
fix: improve MCP adapter cleanup on connection failures (#19)
* fix: improve MCP adapter cleanup on connection failures Extract duplicate cleanup logic into _cleanup_failed_connection() method to follow DRY principle. Properly handle anyio task group errors without masking original connection failures. Errors like "Attempted to exit cancel scope in a different task" are now logged at debug level rather than raised, allowing the actual connection error to propagate correctly. Also update TaskResult config to use Pydantic v2 ConfigDict pattern, eliminating deprecation warnings. Add comprehensive test coverage for connection failure scenarios and async context manager usage. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * docs: add resource management section and context manager examples Demonstrate the recommended async context manager pattern for proper connection cleanup. Update Quick Start example to use context manager, ensuring new users adopt best practices from the start. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * docs: improve resource management documentation and examples Address code review feedback: - Update examples/ to use async context managers (critical fix) - Add 'Why use context managers?' explanation with clear benefits - Add complete imports to all code examples for copy-paste usability - Expand manual cleanup section with use cases and guidance - Improve comments to clarify what gets cleaned up All examples now consistently demonstrate the recommended pattern for proper resource cleanup, making it easier for new users to adopt best practices. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: line length in cleanup error logging Split long warning message across multiple lines to comply with 100 character line limit. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 813df8a commit 40d83f3

File tree

7 files changed

+359
-97
lines changed

7 files changed

+359
-97
lines changed

README.md

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ pip install adcp
2727
```python
2828
from adcp import ADCPMultiAgentClient, AgentConfig, GetProductsRequest
2929

30-
# Configure agents and handlers
31-
client = ADCPMultiAgentClient(
30+
# Configure agents and handlers (context manager ensures proper cleanup)
31+
async with ADCPMultiAgentClient(
3232
agents=[
3333
AgentConfig(
3434
id="agent_x",
@@ -54,21 +54,21 @@ client = ADCPMultiAgentClient(
5454
if metadata.status == "completed" else None
5555
)
5656
}
57-
)
58-
59-
# Execute operation - library handles operation IDs, webhook URLs, context management
60-
agent = client.agent("agent_x")
61-
request = GetProductsRequest(brief="Coffee brands")
62-
result = await agent.get_products(request)
57+
) as client:
58+
# Execute operation - library handles operation IDs, webhook URLs, context management
59+
agent = client.agent("agent_x")
60+
request = GetProductsRequest(brief="Coffee brands")
61+
result = await agent.get_products(request)
6362

64-
# Check result
65-
if result.status == "completed":
66-
# Agent completed synchronously!
67-
print(f"✅ Sync completion: {len(result.data.products)} products")
63+
# Check result
64+
if result.status == "completed":
65+
# Agent completed synchronously!
66+
print(f"✅ Sync completion: {len(result.data.products)} products")
6867

69-
if result.status == "submitted":
70-
# Agent will send webhook when complete
71-
print(f"⏳ Async - webhook registered at: {result.submitted.webhook_url}")
68+
if result.status == "submitted":
69+
# Agent will send webhook when complete
70+
print(f"⏳ Async - webhook registered at: {result.submitted.webhook_url}")
71+
# Connections automatically cleaned up here
7272
```
7373

7474
## Features
@@ -173,6 +173,51 @@ Or use the CLI:
173173
uvx adcp --debug myagent get_products '{"brief":"TV ads"}'
174174
```
175175

176+
### Resource Management
177+
178+
**Why use async context managers?**
179+
- Ensures HTTP connections are properly closed, preventing resource leaks
180+
- Handles cleanup even when exceptions occur
181+
- Required for production applications with connection pooling
182+
- Prevents issues with async task group cleanup in MCP protocol
183+
184+
The recommended pattern uses async context managers:
185+
186+
```python
187+
from adcp import ADCPClient, AgentConfig, GetProductsRequest
188+
189+
# Recommended: Automatic cleanup with context manager
190+
config = AgentConfig(id="agent_x", agent_uri="https://...", protocol="a2a")
191+
async with ADCPClient(config) as client:
192+
request = GetProductsRequest(brief="Coffee brands")
193+
result = await client.get_products(request)
194+
# Connection automatically closed on exit
195+
196+
# Multi-agent client also supports context managers
197+
async with ADCPMultiAgentClient(agents) as client:
198+
# Execute across all agents in parallel
199+
results = await client.get_products(request)
200+
# All agent connections closed automatically (even if some failed)
201+
```
202+
203+
Manual cleanup is available for special cases (e.g., managing client lifecycle manually):
204+
205+
```python
206+
# Use manual cleanup when you need fine-grained control over lifecycle
207+
client = ADCPClient(config)
208+
try:
209+
result = await client.get_products(request)
210+
finally:
211+
await client.close() # Explicit cleanup
212+
```
213+
214+
**When to use manual cleanup:**
215+
- Managing client lifecycle across multiple functions
216+
- Testing scenarios requiring explicit control
217+
- Integration with frameworks that manage resources differently
218+
219+
In most cases, prefer the context manager pattern.
220+
176221
### Error Handling
177222

178223
The library provides a comprehensive exception hierarchy with helpful error messages:

examples/basic_usage.py

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,32 +23,33 @@ async def main():
2323
auth_token="your-token-here", # Optional
2424
)
2525

26-
# Create client
27-
client = ADCPClient(
26+
# Use context manager for automatic resource cleanup
27+
async with ADCPClient(
2828
config,
2929
webhook_url_template="https://myapp.com/webhook/{task_type}/{agent_id}/{operation_id}",
3030
on_activity=lambda activity: print(f"[{activity.type}] {activity.task_type}"),
31-
)
31+
) as client:
32+
# Call get_products
33+
print("Fetching products...")
34+
result = await client.get_products(brief="Coffee brands targeting millennials")
3235

33-
# Call get_products
34-
print("Fetching products...")
35-
result = await client.get_products(brief="Coffee brands targeting millennials")
36+
# Handle result
37+
if result.status == "completed":
38+
print(f"✅ Sync completion: Got {len(result.data.get('products', []))} products")
39+
for product in result.data.get("products", []):
40+
print(f" - {product.get('name')}: {product.get('description')}")
3641

37-
# Handle result
38-
if result.status == "completed":
39-
print(f"✅ Sync completion: Got {len(result.data.get('products', []))} products")
40-
for product in result.data.get("products", []):
41-
print(f" - {product.get('name')}: {product.get('description')}")
42+
elif result.status == "submitted":
43+
print(f"⏳ Async: Webhook will be sent to {result.submitted.webhook_url}")
44+
print(f" Operation ID: {result.submitted.operation_id}")
4245

43-
elif result.status == "submitted":
44-
print(f"⏳ Async: Webhook will be sent to {result.submitted.webhook_url}")
45-
print(f" Operation ID: {result.submitted.operation_id}")
46+
elif result.status == "needs_input":
47+
print(f"❓ Agent needs clarification: {result.needs_input.message}")
4648

47-
elif result.status == "needs_input":
48-
print(f"❓ Agent needs clarification: {result.needs_input.message}")
49+
elif result.status == "failed":
50+
print(f"❌ Error: {result.error}")
4951

50-
elif result.status == "failed":
51-
print(f"❌ Error: {result.error}")
52+
# Connection automatically closed here
5253

5354

5455
if __name__ == "__main__":

examples/multi_agent.py

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ async def main():
3434
),
3535
]
3636

37-
# Create multi-agent client
38-
client = ADCPMultiAgentClient(
37+
# Use context manager for automatic resource cleanup
38+
async with ADCPMultiAgentClient(
3939
agents=agents,
4040
webhook_url_template="https://myapp.com/webhook/{task_type}/{agent_id}/{operation_id}",
4141
on_activity=lambda activity: print(
@@ -44,29 +44,30 @@ async def main():
4444
handlers={
4545
"on_get_products_status_change": handle_products_result,
4646
},
47-
)
47+
) as client:
48+
# Execute across all agents in parallel
49+
print(f"Querying {len(agents)} agents in parallel...")
50+
results = await client.get_products(brief="Coffee brands")
4851

49-
# Execute across all agents in parallel
50-
print(f"Querying {len(agents)} agents in parallel...")
51-
results = await client.get_products(brief="Coffee brands")
52+
# Process results
53+
sync_count = sum(1 for r in results if r.status == "completed")
54+
async_count = sum(1 for r in results if r.status == "submitted")
5255

53-
# Process results
54-
sync_count = sum(1 for r in results if r.status == "completed")
55-
async_count = sum(1 for r in results if r.status == "submitted")
56+
print(f"\n📊 Results:")
57+
print(f" ✅ Sync completions: {sync_count}")
58+
print(f" ⏳ Async (webhooks pending): {async_count}")
5659

57-
print(f"\n📊 Results:")
58-
print(f" ✅ Sync completions: {sync_count}")
59-
print(f" ⏳ Async (webhooks pending): {async_count}")
60+
for i, result in enumerate(results):
61+
agent_id = client.agent_ids[i]
6062

61-
for i, result in enumerate(results):
62-
agent_id = client.agent_ids[i]
63+
if result.status == "completed":
64+
products = result.data.get("products", [])
65+
print(f"\n{agent_id}: {len(products)} products (sync)")
6366

64-
if result.status == "completed":
65-
products = result.data.get("products", [])
66-
print(f"\n{agent_id}: {len(products)} products (sync)")
67+
elif result.status == "submitted":
68+
print(f"\n{agent_id}: webhook to {result.submitted.webhook_url}")
6769

68-
elif result.status == "submitted":
69-
print(f"\n{agent_id}: webhook to {result.submitted.webhook_url}")
70+
# All agent connections automatically closed here
7071

7172

7273
def handle_products_result(response, metadata):

src/adcp/protocols/mcp.py

Lines changed: 37 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,39 @@ def __init__(self, *args: Any, **kwargs: Any):
4040
self._session: Any = None
4141
self._exit_stack: Any = None
4242

43+
async def _cleanup_failed_connection(self, context: str) -> None:
44+
"""
45+
Clean up resources after a failed connection attempt.
46+
47+
This method handles cleanup without raising exceptions to avoid
48+
masking the original connection error.
49+
50+
Args:
51+
context: Description of the context for logging (e.g., "during connection attempt")
52+
"""
53+
if self._exit_stack is not None:
54+
old_stack = self._exit_stack
55+
self._exit_stack = None
56+
self._session = None
57+
try:
58+
await old_stack.aclose()
59+
except asyncio.CancelledError:
60+
logger.debug(f"MCP session cleanup cancelled {context}")
61+
except RuntimeError as cleanup_error:
62+
# Known anyio task group cleanup issue
63+
error_msg = str(cleanup_error).lower()
64+
if "cancel scope" in error_msg or "async context" in error_msg:
65+
logger.debug(f"Ignoring anyio cleanup error {context}: {cleanup_error}")
66+
else:
67+
logger.warning(
68+
f"Unexpected RuntimeError during cleanup {context}: {cleanup_error}"
69+
)
70+
except Exception as cleanup_error:
71+
# Log unexpected cleanup errors but don't raise to preserve original error
72+
logger.warning(
73+
f"Unexpected error during cleanup {context}: {cleanup_error}", exc_info=True
74+
)
75+
4376
async def _get_session(self) -> ClientSession:
4477
"""
4578
Get or create MCP client session with URL fallback handling.
@@ -115,35 +148,8 @@ async def _get_session(self) -> ClientSession:
115148
return self._session # type: ignore[no-any-return]
116149
except Exception as e:
117150
last_error = e
118-
# Clean up the exit stack on failure to avoid async scope issues
119-
if self._exit_stack is not None:
120-
old_stack = self._exit_stack
121-
self._exit_stack = None # Clear immediately to prevent reuse
122-
self._session = None
123-
try:
124-
await old_stack.aclose()
125-
except asyncio.CancelledError:
126-
# Expected during shutdown
127-
pass
128-
except RuntimeError as cleanup_error:
129-
# Known MCP SDK async cleanup issue
130-
if (
131-
"async context" in str(cleanup_error).lower()
132-
or "cancel scope" in str(cleanup_error).lower()
133-
):
134-
logger.debug(
135-
"Ignoring MCP SDK async context error during cleanup: "
136-
f"{cleanup_error}"
137-
)
138-
else:
139-
logger.warning(
140-
f"Unexpected RuntimeError during cleanup: {cleanup_error}"
141-
)
142-
except Exception as cleanup_error:
143-
# Unexpected cleanup errors should be logged
144-
logger.warning(
145-
f"Unexpected error during cleanup: {cleanup_error}", exc_info=True
146-
)
151+
# Clean up the exit stack on failure to avoid resource leaks
152+
await self._cleanup_failed_connection("during connection attempt")
147153

148154
# If this isn't the last URL to try, create a new exit stack and continue
149155
if url != urls_to_try[-1]:
@@ -352,15 +358,5 @@ async def list_tools(self) -> list[str]:
352358
return [tool.name for tool in result.tools]
353359

354360
async def close(self) -> None:
355-
"""Close the MCP session."""
356-
if self._exit_stack is not None:
357-
old_stack = self._exit_stack
358-
self._exit_stack = None
359-
self._session = None
360-
try:
361-
await old_stack.aclose()
362-
except (asyncio.CancelledError, RuntimeError):
363-
# Cleanup errors during shutdown are expected
364-
pass
365-
except Exception as e:
366-
logger.debug(f"Error during MCP session cleanup: {e}")
361+
"""Close the MCP session and clean up resources."""
362+
await self._cleanup_failed_connection("during close")

src/adcp/types/core.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from enum import Enum
66
from typing import Any, Generic, Literal, TypeVar
77

8-
from pydantic import BaseModel, Field, field_validator
8+
from pydantic import BaseModel, ConfigDict, Field, field_validator
99

1010

1111
class Protocol(str, Enum):
@@ -125,6 +125,8 @@ class DebugInfo(BaseModel):
125125
class TaskResult(BaseModel, Generic[T]):
126126
"""Result from task execution."""
127127

128+
model_config = ConfigDict(arbitrary_types_allowed=True)
129+
128130
status: TaskStatus
129131
data: T | None = None
130132
message: str | None = None # Human-readable message from agent (e.g., MCP content text)
@@ -135,9 +137,6 @@ class TaskResult(BaseModel, Generic[T]):
135137
metadata: dict[str, Any] | None = None
136138
debug_info: DebugInfo | None = None
137139

138-
class Config:
139-
arbitrary_types_allowed = True
140-
141140

142141
class ActivityType(str, Enum):
143142
"""Types of activity events."""

0 commit comments

Comments
 (0)