Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/durable_execution/temporal.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ from pydantic_ai.durable_exec.temporal import (
)

agent = Agent(
'gpt-5',
'openai:gpt-5',
instructions="You're an expert in geography.",
name='geography', # (10)!
)
Expand Down Expand Up @@ -158,6 +158,8 @@ To ensure that Temporal knows what code to run when an activity fails or is inte

When `TemporalAgent` dynamically creates activities for the wrapped agent's model requests and toolsets (specifically those that implement their own tool listing and calling, i.e. [`FunctionToolset`][pydantic_ai.toolsets.FunctionToolset] and [`MCPServer`][pydantic_ai.mcp.MCPServer]), their names are derived from the agent's [`name`][pydantic_ai.agent.AbstractAgent.name] and the toolsets' [`id`s][pydantic_ai.toolsets.AbstractToolset.id]. These fields are normally optional, but are required to be set when using Temporal. They should not be changed once the durable agent has been deployed to production as this would break active workflows.

For dynamic toolsets created with the [`@agent.toolset`][pydantic_ai.Agent.toolset] decorator, the `id` parameter must be set explicitly. Note that with Temporal, `per_run_step=False` is not respected, as the toolset always needs to be created on-the-fly in the activity.

Other than that, any agent and toolset will just work!

### Instructions Functions, Output Functions, and History Processors
Expand Down
8 changes: 6 additions & 2 deletions pydantic_ai_slim/pydantic_ai/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,7 @@ def toolset(
/,
*,
per_run_step: bool = True,
id: str | None = None,
) -> Callable[[ToolsetFunc[AgentDepsT]], ToolsetFunc[AgentDepsT]]: ...

def toolset(
Expand All @@ -1250,6 +1251,7 @@ def toolset(
/,
*,
per_run_step: bool = True,
id: str | None = None,
) -> Any:
"""Decorator to register a toolset function which takes [`RunContext`][pydantic_ai.tools.RunContext] as its only argument.

Expand All @@ -1271,10 +1273,12 @@ async def simple_toolset(ctx: RunContext[str]) -> AbstractToolset[str]:
Args:
func: The toolset function to register.
per_run_step: Whether to re-evaluate the toolset for each run step. Defaults to True.
id: An optional unique ID for the dynamic toolset. Required for use with durable execution
environments like Temporal, where the ID identifies the toolset's activities within the workflow.
"""

def toolset_decorator(func_: ToolsetFunc[AgentDepsT]) -> ToolsetFunc[AgentDepsT]:
self._dynamic_toolsets.append(DynamicToolset(func_, per_run_step=per_run_step))
self._dynamic_toolsets.append(DynamicToolset(func_, per_run_step=per_run_step, id=id))
return func_

return toolset_decorator if func is None else toolset_decorator(func)
Expand Down Expand Up @@ -1378,7 +1382,7 @@ def _get_toolset(
# Copy the dynamic toolsets to ensure each run has its own instances
def copy_dynamic_toolsets(toolset: AbstractToolset[AgentDepsT]) -> AbstractToolset[AgentDepsT]:
if isinstance(toolset, DynamicToolset):
return dataclasses.replace(toolset)
return toolset.copy()
else:
return toolset

Expand Down
172 changes: 172 additions & 0 deletions pydantic_ai_slim/pydantic_ai/durable_exec/temporal/_dynamic_toolset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
from __future__ import annotations

from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, Literal

from pydantic import ConfigDict, with_config
from temporalio import activity, workflow
from temporalio.workflow import ActivityConfig
from typing_extensions import Self

from pydantic_ai import ToolsetTool
from pydantic_ai.exceptions import UserError
from pydantic_ai.tools import AgentDepsT, RunContext, ToolDefinition
from pydantic_ai.toolsets._dynamic import DynamicToolset
from pydantic_ai.toolsets.external import TOOL_SCHEMA_VALIDATOR

from ._run_context import TemporalRunContext
from ._toolset import (
CallToolParams,
CallToolResult,
TemporalWrapperToolset,
)


@dataclass
@with_config(ConfigDict(arbitrary_types_allowed=True))
class _GetToolsParams:
serialized_run_context: Any


@dataclass
class _ToolInfo:
"""Serializable tool information returned from get_tools_activity."""

tool_def: ToolDefinition
max_retries: int


class TemporalDynamicToolset(TemporalWrapperToolset[AgentDepsT]):
"""Temporal wrapper for DynamicToolset.

This provides static activities (get_tools, call_tool) that are registered at worker start time,
while the actual toolset selection happens dynamically inside the activities where I/O is allowed.
"""

def __init__(
self,
toolset: DynamicToolset[AgentDepsT],
*,
activity_name_prefix: str,
activity_config: ActivityConfig,
tool_activity_config: dict[str, ActivityConfig | Literal[False]],
deps_type: type[AgentDepsT],
run_context_type: type[TemporalRunContext[AgentDepsT]] = TemporalRunContext[AgentDepsT],
):
super().__init__(toolset)
self.activity_config = activity_config
self.tool_activity_config = tool_activity_config
self.run_context_type = run_context_type

async def get_tools_activity(params: _GetToolsParams, deps: AgentDepsT) -> dict[str, _ToolInfo]:
"""Activity that calls the dynamic function and returns tool definitions."""
ctx = self.run_context_type.deserialize_run_context(params.serialized_run_context, deps=deps)

async with self.wrapped:
tools = await self.wrapped.get_tools(ctx)
return {
name: _ToolInfo(tool_def=tool.tool_def, max_retries=tool.max_retries)
for name, tool in tools.items()
}

get_tools_activity.__annotations__['deps'] = deps_type

self.get_tools_activity = activity.defn(name=f'{activity_name_prefix}__dynamic_toolset__{self.id}__get_tools')(
get_tools_activity
)

async def call_tool_activity(params: CallToolParams, deps: AgentDepsT) -> CallToolResult:
"""Activity that instantiates the dynamic toolset and calls the tool."""
ctx = self.run_context_type.deserialize_run_context(params.serialized_run_context, deps=deps)

async with self.wrapped:
tools = await self.wrapped.get_tools(ctx)
tool = tools.get(params.name)
if tool is None: # pragma: no cover
raise UserError(
f'Tool {params.name!r} not found in dynamic toolset {self.id!r}. '
'The dynamic toolset function may have returned a different toolset than expected.'
)

return await self._call_tool_in_activity(params.name, params.tool_args, ctx, tool)

call_tool_activity.__annotations__['deps'] = deps_type

self.call_tool_activity = activity.defn(name=f'{activity_name_prefix}__dynamic_toolset__{self.id}__call_tool')(
call_tool_activity
)

@property
def temporal_activities(self) -> list[Callable[..., Any]]:
return [self.get_tools_activity, self.call_tool_activity]

async def __aenter__(self) -> Self:
if not workflow.in_workflow():
await self.wrapped.__aenter__()
return self

async def __aexit__(self, *args: Any) -> bool | None:
if not workflow.in_workflow():
return await self.wrapped.__aexit__(*args)
return None

async def get_tools(self, ctx: RunContext[AgentDepsT]) -> dict[str, ToolsetTool[AgentDepsT]]:
if not workflow.in_workflow():
return await super().get_tools(ctx)

serialized_run_context = self.run_context_type.serialize_run_context(ctx)
tool_infos = await workflow.execute_activity(
activity=self.get_tools_activity,
args=[
_GetToolsParams(serialized_run_context=serialized_run_context),
ctx.deps,
],
**self.activity_config,
)
return {name: self._tool_for_tool_info(tool_info) for name, tool_info in tool_infos.items()}

async def call_tool(
self,
name: str,
tool_args: dict[str, Any],
ctx: RunContext[AgentDepsT],
tool: ToolsetTool[AgentDepsT],
) -> Any:
if not workflow.in_workflow():
return await super().call_tool(name, tool_args, ctx, tool)

tool_activity_config = self.tool_activity_config.get(name)
if tool_activity_config is False: # pragma: no cover
return await super().call_tool(name, tool_args, ctx, tool)

merged_config = self.activity_config | (tool_activity_config or {})
serialized_run_context = self.run_context_type.serialize_run_context(ctx)
return self._unwrap_call_tool_result(
await workflow.execute_activity(
activity=self.call_tool_activity,
args=[
CallToolParams(
name=name,
tool_args=tool_args,
serialized_run_context=serialized_run_context,
tool_def=tool.tool_def,
),
ctx.deps,
],
**merged_config,
)
)

def _tool_for_tool_info(self, tool_info: _ToolInfo) -> ToolsetTool[AgentDepsT]:
"""Create a ToolsetTool from a _ToolInfo for use outside activities.

We use `TOOL_SCHEMA_VALIDATOR` here which just parses JSON without additional validation,
because the actual args validation happens inside `call_tool_activity`.
"""
return ToolsetTool(
toolset=self,
tool_def=tool_info.tool_def,
max_retries=tool_info.max_retries,
args_validator=TOOL_SCHEMA_VALIDATOR,
)
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ async def call_tool_activity(params: CallToolParams, deps: AgentDepsT) -> CallTo
'Removing or renaming tools during an agent run is not supported with Temporal.'
) from e

# The tool args will already have been validated into their proper types in the `ToolManager`,
# but `execute_activity` would have turned them into simple Python types again, so we need to re-validate them.
args_dict = tool.args_validator.validate_python(params.tool_args)
return await self._wrap_call_tool_result(self.wrapped.call_tool(name, args_dict, ctx, tool))
return await self._call_tool_in_activity(name, params.tool_args, ctx, tool)

# Set type hint explicitly so that Temporal can take care of serialization and deserialization
call_tool_activity.__annotations__['deps'] = deps_type
Expand Down
32 changes: 30 additions & 2 deletions pydantic_ai_slim/pydantic_ai/durable_exec/temporal/_toolset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
from temporalio.workflow import ActivityConfig
from typing_extensions import assert_never

from pydantic_ai import AbstractToolset, FunctionToolset, WrapperToolset
from pydantic_ai import AbstractToolset, FunctionToolset, ToolsetTool, WrapperToolset
from pydantic_ai.exceptions import ApprovalRequired, CallDeferred, ModelRetry
from pydantic_ai.tools import AgentDepsT, ToolDefinition
from pydantic_ai.tools import AgentDepsT, RunContext, ToolDefinition
from pydantic_ai.toolsets._dynamic import DynamicToolset

from ._run_context import TemporalRunContext

Expand Down Expand Up @@ -96,6 +97,21 @@ def _unwrap_call_tool_result(self, result: CallToolResult) -> Any:
else:
assert_never(result)

async def _call_tool_in_activity(
self,
name: str,
tool_args: dict[str, Any],
ctx: RunContext[AgentDepsT],
tool: ToolsetTool[AgentDepsT],
) -> CallToolResult:
"""Call a tool inside an activity, re-validating args that were deserialized.

The tool args will already have been validated into their proper types in the `ToolManager`,
but `execute_activity` would have turned them into simple Python types again, so we need to re-validate them.
"""
args_dict = tool.args_validator.validate_python(tool_args)
return await self._wrap_call_tool_result(self.wrapped.call_tool(name, args_dict, ctx, tool))


def temporalize_toolset(
toolset: AbstractToolset[AgentDepsT],
Expand Down Expand Up @@ -127,6 +143,18 @@ def temporalize_toolset(
run_context_type=run_context_type,
)

if isinstance(toolset, DynamicToolset):
from ._dynamic_toolset import TemporalDynamicToolset

return TemporalDynamicToolset(
toolset,
activity_name_prefix=activity_name_prefix,
activity_config=activity_config,
tool_activity_config=tool_activity_config,
deps_type=deps_type,
run_context_type=run_context_type,
)

try:
from pydantic_ai.mcp import MCPServer

Expand Down
Loading
Loading