-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Asynchronous Tool Execution #1398
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Asynchronous Tool Execution #1398
Conversation
Latest commit implements working stream binding to support elicitation and sampling. There's definitely room for refactoring that one 😔 |
Added support for configuring the immediate result and made some changes to avoid smuggling parameters through |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements asynchronous tool execution for the MCP (Model Context Protocol) Python SDK, enabling long-running operations that execute in the background while clients poll for status and results. The implementation introduces operation tokens for tracking execution state and supports configurable keep-alive durations for result availability.
Key changes include:
- Added async operation management system with token-based tracking
- Extended MCP types to support async operation parameters and results
- Implemented immediate feedback capability for async tools
- Added client-side polling mechanisms for operation status and results
Reviewed Changes
Copilot reviewed 35 out of 35 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
src/mcp/types.py | Extended protocol types with async operation support and operation tokens |
src/mcp/shared/async_operations.py | Core async operation management classes for client and server |
src/mcp/client/session.py | Client-side async operation tracking and polling methods |
src/mcp/server/fastmcp/server.py | FastMCP integration with async tools and invocation mode filtering |
src/mcp/server/fastmcp/tools/base.py | Tool base class extensions for async modes and immediate results |
src/mcp/server/lowlevel/server.py | Low-level server async operation handlers and execution logic |
tests/server/fastmcp/test_server.py | Comprehensive test coverage for async tool functionality |
examples/snippets/servers/async_tool_*.py | Example implementations demonstrating async tool patterns |
Comments suppressed due to low confidence (2)
src/mcp/server/fastmcp/tools/base.py:162
- Function
_is_async_callable
is referenced on line 120 but not defined in this file. Either add the function definition or import it properly.
def _is_async_callable(obj: Any) -> bool:
tests/shared/test_progress_notifications.py:1
- [nitpick] The
operation_token=None
parameter addition maintains backward compatibility but consider adding a comment explaining when this would be non-None for clarity in test scenarios.
from typing import Any, cast
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
src/mcp/server/lowlevel/server.py
Outdated
logger.exception(f"Async execution failed for {tool_name}") | ||
self.async_operations.fail_operation(operation.token, str(e)) | ||
|
||
asyncio.create_task(execute_async()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's stop using asyncio
in this code source. Use anyio
please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed here, still need to fix AsyncOperationManager
. Still trying to figure out how to do that properly with cancellation scopes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can help if you need help.
Why would a tool have 2 flavors (async & sync)? Is there a hint to the client when it should try to retrieve the result, e.g. what |
As in for how often it should poll? There is not, but that's a good callout - I'll amend the proposal to include that. |
In the case where a tool is being migrated from sync to async, and may support clients that haven't yet supported the latest (or next, in this case) protocol version, it's useful to support both on the same tool - so clients that don't support the latest version will have a single long request, while clients that do support it will use async tool call semantics. There will be some use cases where that's desirable, and some where it's not, so it's optional behavior. |
If the choice of using async/sync is made by the client, why should we add flavors when defining a tool on the server side? |
Answering my own question... You need to signal in the tool definition which flavor it supports. |
Okay. I don't think we should be adding multiple flavors to a tool, I think it makes things more complicated. Also, the spec anyway supports the I think a better API is to have a new method e.g. |
|
||
token: str | ||
"""Server-generated token to use for checking status and retrieving results.""" | ||
keepAlive: int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keepAlive: int | |
keep_live: int = Field(alias="keepAlive") |
|
||
name: str | ||
arguments: dict[str, Any] | None = None | ||
operation_params: AsyncRequestProperties | None = Field(serialization_alias="operation", default=None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the problem with the real name "operation"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The base RequestParams
type also has a _operation
used for association metadata, which needs to be aliased to operation
to not be treated as a private/protected field by pyright wherever it gets used.
error: "_operation" is protected and used outside of the class in which it is declared (reportPrivateUsage)
I didn't want to ignore pyright just because of a naming conflict, but now that I'm looking at this again there are only 3 places where we'd need to do so.
src/mcp/server/lowlevel/server.py
Outdated
self.async_operations.fail_operation(operation.token, str(e)) | ||
|
||
async with anyio.create_task_group() as tg: | ||
tg.start_soon(execute_async) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to make this implementation a bit more abstract.
What if I want to execute my tasks in another machine/node/process than the MCP server? The world seems to like a lot durable execution products like Temporal as well.
I think we need something like what this design proposes: https://github.com/pydantic/fasta2a/tree/main?tab=readme-ov-file#design
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made AsyncOperationManager
a Server
parameter with the intention that consumers would implement something like that for that use case, but that's not enough due to missing a broker to enable that pattern, yeah. Will adjust this accordingly.
On the SEP, we're going to be moving to referring to these uniformly as long-running operations, so an Do we want to force tool implementors to make a clean break from existing tools, though? Or are you suggesting that in the case where both modes are desired on a tool in an interim state, they would add both If so, that then requires the tool manager to support adding the same tool more than once, and updating the parameters if |
As for what I understood, since the
I wouldn't like to add two decorators. 🤔 If a tool can be short, and long-running, why the |
A tool is exactly one or the other from the perspective of a single client. ListTools must show exactly one execution mode. In addition, version negotiation is used to hide long-running tools from clients that don't support them, yet. However, we can still support hybrid tools for backwards-compatibility purposes if the server operator allows it. Essentially, because the core tool implementation is the same There's a few sections of the SEP which discuss this, but pulling one I think is useful:
Note that some tools are present in the "old clients" list despite presenting as LRO-only in the "new clients" list. When an old client invokes a tool represented like this, they won't get the
This was the original idea, actually, but for futureproofing it's been changed to an enum. |
This PR implements the required changes for modelcontextprotocol/modelcontextprotocol#1391, which adds asynchronous tool execution.
This is a large PR, and I expect that if the associated SEP is accepted, we might want to break this down into several smaller PRs for SDK reviewers. I tried to generally have separate commits for each step of the implementation, to try and make this easier to review in its current form.
Motivation and Context
Today, most applications integrate with tools in a straightforward but naive manner, choosing to have agents invoke tools synchronously with the conversation instead of allowing agents to multitask where possible. There are a few reasons why we believe this is the case, including the lack of clarity around tool interfaces (single tool or multiple for job tracking), model failures when manually polling on operations, and not having a way to retrieve results with a well-defined TTL, among other problems (described in more detail in the linked issue). Here, we introduce an alternative API that establishes a clear integration path for async job-style use cases that are typically on the order of minutes to hours.
The ultra high-level overview is as follows:
tools/call
begins an async tool callCallToolResult
containing an operation token, which is used to interact with the async tool call across multiple RPC callstools/async/status
, which returns the current operation statustools/async/result
, which has the final tool outputWhether a tool is sync, async, or both (on old/new protocol versions) is defined by tool implementors. This enables remote server operators to control this based on how long each tool is expected to take to execute, rather than potentially serving HTTP requests with widely varying execution times on the same endpoint. This also makes it much more clear to client applications what the "time contract" of a tool is, so that fast tools can still be executed synchronously while allowing long-running tools to be immediately backgrounded.
Usage
Defining an async-compatible tool is just a matter of adjusting the
@mcp.tool()
decorator to include aninvocation_modes
parameter, which is a list of"sync"
and"async"
:If
invocation_modes
contains"async"
, the tool is async-compatible and will only be called in async mode by clients on new versions, while if it contains"sync"
, the tool is sync-compatible and will be called in sync mode if async mode is not supported (a client will never have the option to choose one or the other itself).Behind the scenes, the SDK handles branching the behavior to either run synchronously (like today) or asynchronously (immediate return with job tracking) depending on if the client version supports async tools yet or not.
To control how long the results are kept for to retrieve with
tools/async/result
, we can use thekeep_alive
parameter:@mcp.tool(invocation_modes=["async", "sync"], keep_alive=30) # retain result for 30s following completion
We can also customize the
content
returned in the immediateCallToolResult
with theimmediate_result
parameter:On the client side, we just add the polling and result retrieval like so:
How Has This Been Tested?
Unit tests, integration tests, and new example snippets.
Breaking Changes
Existing users will not need to update their applications to continue using synchronous tool calls. Asynchronous tool calls will require minor code changes that will be documented.
Types of changes
Checklist
Additional context
There were a bunch of decisions in the implementation we may want to discuss further, some of which were due to ambiguity in the proposal (which will be revised again) and some of which were due to working things into the SDK implementation.
next
to deal with the requirement that sessions on the current-latest version always advertise as sync-only. The tests and examples explicitly set the advertised client protocol version tonext
when calling async-only tools.tool/call
method creates some ambiguities in howoutputSchema
should be handled, as the immediate tool call result (communicating an accepted state) would no longer have meaningfulstructuredContent
. The output that should be validated is actually the result ofGetOperationPayloadResult
, so for now I'm skipping validation of the immediateCallToolResult
(only in async execution) and only validatingGetOperationPayloadResult
(sync tool executions are always validated, just like before).keepAlive
should have a sentinel value representing "no expiration," and I'm leaning towardsNone
. However, in SDK implementations, that becomes somewhat ambiguous with sync tool calls, which also implicitly have akeepAlive
ofNone
already. For now, I default it to 1 hour if not specified/None
, but this should probably be changed before this is merged.CallToolRequest
, by attaching arelated_request_id
to the stream for fast lookups and session resumption. To support sampling and elicitation, we keep a map of operation tokens to their original request IDs to reuse the same event store entry between related calls.structuredContent
in async tool calls, as it otherwise has no way to look up the cachedoutputSchema
. We could consider including atoolName
inGetOperationPayloadResult
to avoid the inconvenience, but in this draft I'm using a cache expiry based onkeepAlive
to avoid holding that mapping forever.