-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Let Agent be run in a Temporal workflow by moving model requests, tool calls, and MCP to Temporal activities #2225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Docs Preview
|
I'm so excited for this to be natively supported |
Thank you for taking the time to look into native temporal support - this is a topic I’m really interested in seeing progress on. One key feature I’d love to see is support for HITL (Human-in-the-Loop) during tool calls when using temporal and agents. My suggestion on how to achieve that (and not being to specific for HITL) is to pause the workflow before a tool call waiting on a condition for a signal. This will allow, among other things, to wait for human input. Currently, tool calls are handled like this: async def call_tool(
self, name: str, tool_args: dict[str, Any], ctx: RunContext[AgentDepsT], tool: ToolsetTool[AgentDepsT]
) -> Any:
serialized_run_context = self.serialize_run_context(ctx)
return await workflow.execute_activity(
activity=self.call_tool_activity,
arg=FunctionCallToolParams(name=name, tool_args=tool_args, serialized_run_context=serialized_run_context),
**self.temporal_settings.__dict__,
) To add the wait support, we can do something like:
async def call_tool(
self, name: str, tool_args: dict[str, Any], ctx: RunContext[AgentDepsT], tool: ToolsetTool[AgentDepsT]
) -> Any:
# --- ADDED: tool call pause using signal and wait_condition ---
await workflow.wait_condition(lambda: self.allow_tool_call)
# --- END ADDED CODE ---
serialized_run_context = self.serialize_run_context(ctx)
activity_result = await workflow.execute_activity(
activity=self.call_tool_activity,
arg=FunctionCallToolParams(name=name, tool_args=tool_args, serialized_run_context=serialized_run_context),
**self.temporal_settings.__dict__,
)
# --- ADDED: tool call reset state ---
self.allow_tool_call = False
return activity_result
# --- END ADDED CODE --- This will allow users to define their workflow something like: @workflow.signal
def approve_tool_call(self):
... # some code to determine the tool is OK to run
self.allow_tool_call = True This way, the workflow will pause at the tool call and only continue once the signal is approved. of course this would be optional and the default ,that can be overwritten by the user, will be to approve the request Appreciate the ongoing work! |
2abccad
to
a0b934f
Compare
PR Change SummaryIntroduced the Temporal Agent feature by enhancing toolset definitions and adding optional IDs for better identification in error messages.
Modified Files
How can I customize these reviews?Check out the Hyperlint AI Reviewer docs for more information on how to customize the review. If you just want to ignore it on this PR, you can add the Note specifically for link checks, we only check the first 30 links in a file and we cache the results for several hours (for instance, if you just added a page, you might experience this). Our recommendation is to add |
# Conflicts: # pydantic_ai_slim/pydantic_ai/models/openai.py
|
||
@activity.defn(name='model_request') | ||
async def request_activity(params: _RequestParams) -> ModelResponse: | ||
return await original_request(params.messages, params.model_settings, params.model_request_parameters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider disabling retries within an activity, and having it all be handled by Temporal, which will have to know whether an error is retryable and how long to back off for: https://github.com/temporalio/sdk-python/blob/3244f8bffebee05e0e7efefb1240a75039903dda/temporalio/contrib/openai_agents/_invoke_model_activity.py#L231
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either way we should document the 3 levels of retries: Temporal > https://ai.pydantic.dev/retries/ > provider client
raise ValueError( | ||
'Toolsets cannot be set at agent run time when using Temporal, it must be set at agent creation time.' | ||
) | ||
if kwargs.get('event_stream_handler') is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To do:
- override
run_stream
anditer
to point the user torun
/run_sync
+event_stream_handler
…nd direct_call_tool
Closes #1975
Run it yourself:
brew install temporal
temporal server start-dev
uv run temporal.py
To do:
google-genai
+temporalio
type hinting conflictTemporalSettings
be set for a specific toolsetdeps
temporalio
package instead of Git repo once Plugin Support temporalio/sdk-python#952 is releasedAgent(event_stream_handler=...)
, merge that first Add Agentevent_stream_handler
function #2306Toolset.id
, merge that first