From 70208982f1c4d5dd38a3ec0f49f6631c0356e73f Mon Sep 17 00:00:00 2001 From: scalabrese Date: Wed, 15 Oct 2025 14:39:46 +0200 Subject: [PATCH 01/11] [Temporal - Documentation] Added an example of workflow with streaming capabilities --- docs/durable_execution/temporal.md | 329 +++++++++++++++++++++++++++++ 1 file changed, 329 insertions(+) diff --git a/docs/durable_execution/temporal.md b/docs/durable_execution/temporal.md index c29e178843..f27282a566 100644 --- a/docs/durable_execution/temporal.md +++ b/docs/durable_execution/temporal.md @@ -187,6 +187,335 @@ As the streaming model request activity, workflow, and workflow execution call a - To get data from the workflow call site or workflow to the event stream handler, you can use a [dependencies object](#agent-run-context-and-dependencies). - To get data from the event stream handler to the workflow, workflow call site, or a frontend, you need to use an external system that the event stream handler can write to and the event consumer can read from, like a message queue. You can use the dependency object to make sure the same connection string or other unique ID is available in all the places that need it. +#### Example + +Run the following +`pip install pydantic-ai temporalio mcp-run-python` + +Assuming your project has the following structure: +``` + project/ + ├── src/ + │ ├── agents.py + │ ├── datamodels.py + │ ├── streaming_handler.py + │ ├── utils.py + │ └── workflow.py + └── pyproject.toml + +``` +`utils.py` +```python +import yaml +from copy import copy + +def recursively_modify_api_key(conf): + """ + Recursively replace API key placeholders with environment variable values. + + This function traverses a configuration dictionary and replaces any keys + containing 'api_key' with the corresponding environment variable value. + It handles nested dictionaries and lists recursively. + + Args: + conf: The configuration dictionary to process. + + Returns: + A copy of the configuration with API keys replaced by environment variable values. + """ + + def inner(_conf): + for key, value in _conf.items(): + if isinstance(value, dict): + inner(value) + elif isinstance(value, list): + if len(value) > 0 and isinstance(value[0], dict): + for item in value: + inner(item) + else: + _conf[key] = [os.environ.get(str(v), v) for v in value] + elif isinstance(value, str): + _conf[key] = os.environ.get(value, value) + else: + _conf[key] = value + + copy_conf = copy(conf) + inner(copy_conf) + return copy_conf + +def read_config_yml(path): + """ + Read and process a YAML configuration file. + + This function reads a YAML file, processes it to replace API key placeholders + with environment variable values, and returns the processed configuration. + + Args: + path: The path to the YAML configuration file. + + Returns: + dict: The parsed and processed YAML content as a Python dictionary. + """ + with open(path, "r") as f: + configs = yaml.safe_load(f) + recursively_modify_api_key(configs) + return configs +``` + +`datamodels.py` + +```python +from enum import Enum +from typing import Any, Dict, Deque, AsyncIterable, Optional +from pydantic import BaseModel + +class AgentDependencies(BaseModel): + workflow_id: str + run_id: str + +class EventKind(str, Enum): + CONTINUE_CHAT = 'continue_chat' + EVENT = 'event' + RESULT = 'result' + + +class EventStream(BaseModel): + kind: EventKind + content: str +``` +`agents.py` +```python +from temporalio import workflow +from temporalio.common import RetryPolicy +from temporalio.workflow import ActivityConfig +with workflow.unsafe.imports_passed_through(): + from pydantic_ai import Agent, FilteredToolset, RunContext, ModelSettings + from pydantic_ai.models.anthropic import AnthropicModel + from pydantic_ai.providers.anthropic import AnthropicProvider + from pydantic_ai.mcp import MCPServerStdio + from pydantic_ai.durable_exec.temporal import TemporalAgent + from datamodels import AgentDependencies + from mcp_run_python import code_sandbox + from typing import Dict + from datetime import timedelta + + +async def get_mcp_toolsets() -> Dict[str, FilteredToolset]: + yf_server = MCPServerStdio( + command="uvx", + args=["mcp-yahoo-finance"], + timeout=240, + read_timeout=240, + id='yahoo' + ) + return { + 'yahoo': yf_server.filtered(lambda ctx, tool_def: True) + } + +async def get_claude_model(parallel_tool_calls: bool = True, **env_vars): + model_name = 'claude-sonnet-4-5-20250929' + api_key = env_vars.get('anthropic_api_key') + model = AnthropicModel(model_name=model_name, + provider=AnthropicProvider(api_key=api_key), + settings=ModelSettings(**{ + "temperature": 0.5, + "n": 1, + "max_completion_tokens": 64000, + "max_tokens": 64000, + "parallel_tool_calls": parallel_tool_calls, + })) + + return model + +async def build_agent(stream_handler=None, **env_vars): + + system_prompt = """ + You are an expert travel agent that knows perfectly how to search for hotels on the web. + You also have a Data Analyst background, mastering well how to use pandas for tabular operations. + """ + agent_name = "YahooFinanceSearchAgent" + + toolsets = await get_mcp_toolsets() + agent = Agent(name=agent_name, + model=await get_claude_model(**env_vars), # Here you place your Model instance + toolsets=[*toolsets.values()], + system_prompt=system_prompt, + event_stream_handler=stream_handler, + deps_type=AgentDependencies, + ) + + @agent.tool(name='run_python_code') + async def run_python_code(ctx: RunContext[None], code: str) -> str: + async with code_sandbox(dependencies=['pandas', 'numpy']) as sandbox: + result = await sandbox.eval(code) + return result + + + temporal_agent = TemporalAgent(wrapped=agent, + model_activity_config=ActivityConfig( + start_to_close_timeout=timedelta(minutes=5), + retry_policy=RetryPolicy(maximum_attempts=50) + ), + toolset_activity_config={ + toolset_id: ActivityConfig( + start_to_close_timeout=timedelta(minutes=3), + retry_policy=RetryPolicy(maximum_attempts=3, + non_retryable_error_types=['ToolRetryError'] + ) + ) for toolset_id in toolsets.keys()}) + return temporal_agent +``` + +`streaming_handler.py` +```python +from temporalio import activity +from typing import Any, Dict, Deque, AsyncIterable, Optional +from pydantic_ai import AgentStreamEvent, FunctionToolCallEvent, \ + PartStartEvent, FunctionToolResultEvent, TextPart, ToolCallPart, PartDeltaEvent, UsageLimits, TextPartDelta, \ + ThinkingPartDelta +from datamodels import EventStream, EventKind, AgentDependencies + +async def streaming_handler(ctx, + event_stream_events: AsyncIterable[AgentStreamEvent]): + """ + This function is used by the agent to stream-out the actions that are being performed (tool calls, llm call, streaming results, etc etc. + Feel free to change it as you like or need - skipping events or enriching the content + """ + + output_delta = "" + output = "" + output_tool_delta = dict( + tool_call_id='', + tool_name_delta='', + args_delta='', + ) + # If TextPart and output delta is empty + async for event in event_stream_events: + if isinstance(event, PartStartEvent): + if isinstance(event.part, TextPart): + output_delta += f"{event.part.content}" + elif isinstance(event.part, ToolCallPart): + output += f"\nTool Call Id: {event.part.tool_call_id}" + output += f"\nTool Name: {event.part.tool_name}" + output += f"\nTool Args: {event.part.args}" + else: + pass + elif isinstance(event, FunctionToolCallEvent): + output += f"\nTool Call Id: {event.part.tool_call_id}" + output += f"\nTool Name: {event.part.tool_name}" + output += f"\nTool Args: {event.part.args}" + elif isinstance(event, FunctionToolResultEvent): + output += f"\nTool Call Id: {event.result.tool_call_id}" + output += f"\nTool Name: {event.result.tool_name}" + output += f"\nContent: {event.result.content}" + elif isinstance(event, PartDeltaEvent): + if isinstance(event.delta, TextPartDelta) or isinstance(event.delta, ThinkingPartDelta): + output_delta += f"{event.delta.content_delta}" + else: + if len(output_tool_delta['tool_call_id']) == 0: + output_tool_delta['tool_call_id'] += event.delta.tool_call_id or '' + output_tool_delta['tool_name_delta'] += event.delta.tool_name_delta or '' + output_tool_delta['args_delta'] += event.delta.args_delta or '' + + if len(output_tool_delta['tool_call_id']): + output += f"\nTool Call Id: {output_tool_delta['tool_call_id']}" + output += f"\nTool Name: {output_tool_delta['tool_name_delta']}" + output += f"\nTool Args: {output_tool_delta['args_delta']}" + + events = [] + + if output: + event = EventStream(kind=EventKind.EVENT, content=output) + events.append(event) + if output_delta: + event = EventStream(kind=EventKind.RESULT, content=output_delta) + events.append(event) + + if activity.in_activity(): + deps: AgentDependencies = ctx.deps + + workflow_id = deps.workflow_id + run_id = deps.run_id + workflow_handle = activity.client().get_workflow_handle(workflow_id=workflow_id, run_id=run_id) + for event in events: + await workflow_handle.signal('append_event', arg=event) +``` + +`workflow.py` + +```python + +import asyncio +from collections import deque +from datetime import timedelta +from typing import Any, Dict, Deque, Optional + +from temporalio import workflow, activity + +with workflow.unsafe.imports_passed_through(): + from datamodels import EventStream, AgentDependencies + from agents import YahooFinanceSearchAgent + from pydanticai import UsageLimits + from agents import streaming_handler, build_agent + from utils import read_config_yml + + +@workflow.defn +class YahooFinanceSearchWorkflow: + def __init__(self): + self.events: Deque[EventStream] = deque() + + @workflow.run + async def run(self, user_prompt: str): + + wf_vars = await workflow.execute_activity( + activity='retrieve_env_vars', + start_to_close_timeout=timedelta(seconds=10), + result_type=Dict[str, Any], + ) + deps = AgentDependencies(workflow_id=workflow.info().workflow_id, run_id=workflow.info().run_id) + + agent = await build_agent(streaming_handler, **wf_vars) + result = await agent.run(user_prompt=user_prompt, + usage_limits=UsageLimits(request_limit=50), + deps=deps + ) + + try: + await workflow.wait_condition( + lambda: len(self.events) == 0, + timeout=timedelta(seconds=10), + timeout_summary='Waiting for events to be consumed' + ) + return result.output + except asyncio.TimeoutError: + return result.output + + @staticmethod + @activity.defn(name='retrieve_env_vars') + async def retrieve_env_vars(): + with workflow.unsafe.imports_passed_through(): + import os + config_path = os.getenv('APP_CONFIG_PATH', './app_conf.yml') + configs = read_config_yml(config_path) + return { + 'anthropic_api_key': configs['llm']['anthropic_api_key'] + } + + @workflow.query + def event_stream(self) -> Optional[EventStream]: + if self.events: + return self.events.popleft() + return None + + @workflow.signal + async def append_event(self, event_stream: EventStream): + # This signal is invoked by streaming_handler, pushing event for every async loop + self.events.append(event_stream) +``` + + + ## Activity Configuration Temporal activity configuration, like timeouts and retry policies, can be customized by passing [`temporalio.workflow.ActivityConfig`](https://python.temporal.io/temporalio.workflow.ActivityConfig.html) objects to the `TemporalAgent` constructor: From 159aee357a148a989ecf8f58abd256e1ea1b47fa Mon Sep 17 00:00:00 2001 From: scalabrese Date: Thu, 16 Oct 2025 08:57:55 +0200 Subject: [PATCH 02/11] [Temporal - Documentation] Added an example of workflow with streaming capabilities --- docs/durable_execution/temporal.md | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/docs/durable_execution/temporal.md b/docs/durable_execution/temporal.md index f27282a566..dc0e7b1981 100644 --- a/docs/durable_execution/temporal.md +++ b/docs/durable_execution/temporal.md @@ -382,7 +382,6 @@ async def streaming_handler(ctx, Feel free to change it as you like or need - skipping events or enriching the content """ - output_delta = "" output = "" output_tool_delta = dict( tool_call_id='', @@ -393,7 +392,7 @@ async def streaming_handler(ctx, async for event in event_stream_events: if isinstance(event, PartStartEvent): if isinstance(event.part, TextPart): - output_delta += f"{event.part.content}" + output += f"{event.part.content}" elif isinstance(event.part, ToolCallPart): output += f"\nTool Call Id: {event.part.tool_call_id}" output += f"\nTool Name: {event.part.tool_name}" @@ -410,7 +409,7 @@ async def streaming_handler(ctx, output += f"\nContent: {event.result.content}" elif isinstance(event, PartDeltaEvent): if isinstance(event.delta, TextPartDelta) or isinstance(event.delta, ThinkingPartDelta): - output_delta += f"{event.delta.content_delta}" + output += f"{event.delta.content_delta}" else: if len(output_tool_delta['tool_call_id']) == 0: output_tool_delta['tool_call_id'] += event.delta.tool_call_id or '' @@ -427,9 +426,6 @@ async def streaming_handler(ctx, if output: event = EventStream(kind=EventKind.EVENT, content=output) events.append(event) - if output_delta: - event = EventStream(kind=EventKind.RESULT, content=output_delta) - events.append(event) if activity.in_activity(): deps: AgentDependencies = ctx.deps @@ -453,7 +449,7 @@ from typing import Any, Dict, Deque, Optional from temporalio import workflow, activity with workflow.unsafe.imports_passed_through(): - from datamodels import EventStream, AgentDependencies + from datamodels import EventStream, EventKind, AgentDependencies from agents import YahooFinanceSearchAgent from pydanticai import UsageLimits from agents import streaming_handler, build_agent @@ -480,6 +476,12 @@ class YahooFinanceSearchWorkflow: usage_limits=UsageLimits(request_limit=50), deps=deps ) + + await self.append_event(event_stream=EventStream(kind=EventKind.RESULT, + content=result.output)) + + await self.append_event(event_stream=EventStream(kind=EventKind.CONTINUE_CHAT, + content="")) try: await workflow.wait_condition( From b5f3586b11c71810a1d1e213b1e882d99f458bb1 Mon Sep 17 00:00:00 2001 From: scalabrese Date: Thu, 16 Oct 2025 09:08:24 +0200 Subject: [PATCH 03/11] [Temporal - Documentation] Added an example of workflow with streaming capabilities --- docs/durable_execution/temporal.md | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/docs/durable_execution/temporal.md b/docs/durable_execution/temporal.md index dc0e7b1981..9c55767767 100644 --- a/docs/durable_execution/temporal.md +++ b/docs/durable_execution/temporal.md @@ -204,8 +204,8 @@ Assuming your project has the following structure: └── pyproject.toml ``` -`utils.py` -```python + +```py {title="utils.py" test="skip" noqa="F841"} import yaml from copy import copy @@ -262,9 +262,7 @@ def read_config_yml(path): return configs ``` -`datamodels.py` - -```python +```py {title="datamodels.py" test="skip" noqa="F841"} from enum import Enum from typing import Any, Dict, Deque, AsyncIterable, Optional from pydantic import BaseModel @@ -283,8 +281,8 @@ class EventStream(BaseModel): kind: EventKind content: str ``` -`agents.py` -```python + +```py {title="agents.py" test="skip" noqa="F841"} from temporalio import workflow from temporalio.common import RetryPolicy from temporalio.workflow import ActivityConfig @@ -366,8 +364,7 @@ async def build_agent(stream_handler=None, **env_vars): return temporal_agent ``` -`streaming_handler.py` -```python +```py {title="streaming_handler.py" test="skip" noqa="F841"} from temporalio import activity from typing import Any, Dict, Deque, AsyncIterable, Optional from pydantic_ai import AgentStreamEvent, FunctionToolCallEvent, \ @@ -437,9 +434,8 @@ async def streaming_handler(ctx, await workflow_handle.signal('append_event', arg=event) ``` -`workflow.py` -```python +```py {title="workflow.py" test="skip" noqa="F841"} import asyncio from collections import deque From 815f04a3983adb52a4e821787403724b0d207f06 Mon Sep 17 00:00:00 2001 From: scalabrese Date: Thu, 16 Oct 2025 10:09:59 +0200 Subject: [PATCH 04/11] [Temporal - Documentation] Fixed ruff issues --- docs/durable_execution/temporal.md | 194 +++++++++++++++-------------- 1 file changed, 103 insertions(+), 91 deletions(-) diff --git a/docs/durable_execution/temporal.md b/docs/durable_execution/temporal.md index 9c55767767..76bc612131 100644 --- a/docs/durable_execution/temporal.md +++ b/docs/durable_execution/temporal.md @@ -205,10 +205,13 @@ Assuming your project has the following structure: ``` -```py {title="utils.py" test="skip" noqa="F841"} -import yaml +```py {title="utils.py" test="skip"} +import os from copy import copy +import yaml + + def recursively_modify_api_key(conf): """ Recursively replace API key placeholders with environment variable values. @@ -243,6 +246,7 @@ def recursively_modify_api_key(conf): inner(copy_conf) return copy_conf + def read_config_yml(path): """ Read and process a YAML configuration file. @@ -256,21 +260,23 @@ def read_config_yml(path): Returns: dict: The parsed and processed YAML content as a Python dictionary. """ - with open(path, "r") as f: + with open(path) as f: configs = yaml.safe_load(f) recursively_modify_api_key(configs) return configs ``` -```py {title="datamodels.py" test="skip" noqa="F841"} +```py {title="datamodels.py" test="skip"} from enum import Enum -from typing import Any, Dict, Deque, AsyncIterable, Optional + from pydantic import BaseModel + class AgentDependencies(BaseModel): workflow_id: str run_id: str + class EventKind(str, Enum): CONTINUE_CHAT = 'continue_chat' EVENT = 'event' @@ -282,26 +288,25 @@ class EventStream(BaseModel): content: str ``` -```py {title="agents.py" test="skip" noqa="F841"} -from temporalio import workflow + +```py {title="agents.py" test="skip"} +from datetime import timedelta + +from mcp_run_python import code_sandbox +from pydantic_ai import Agent, FilteredToolset, ModelSettings, RunContext +from pydantic_ai.durable_exec.temporal import TemporalAgent +from pydantic_ai.mcp import MCPServerStdio +from pydantic_ai.models.anthropic import AnthropicModel +from pydantic_ai.providers.anthropic import AnthropicProvider from temporalio.common import RetryPolicy from temporalio.workflow import ActivityConfig -with workflow.unsafe.imports_passed_through(): - from pydantic_ai import Agent, FilteredToolset, RunContext, ModelSettings - from pydantic_ai.models.anthropic import AnthropicModel - from pydantic_ai.providers.anthropic import AnthropicProvider - from pydantic_ai.mcp import MCPServerStdio - from pydantic_ai.durable_exec.temporal import TemporalAgent - from datamodels import AgentDependencies - from mcp_run_python import code_sandbox - from typing import Dict - from datetime import timedelta - - -async def get_mcp_toolsets() -> Dict[str, FilteredToolset]: + +from .datamodels import AgentDependencies + +async def get_mcp_toolsets() -> dict[str, FilteredToolset]: yf_server = MCPServerStdio( - command="uvx", - args=["mcp-yahoo-finance"], + command='uvx', + args=['mcp-yahoo-finance'], timeout=240, read_timeout=240, id='yahoo' @@ -310,67 +315,78 @@ async def get_mcp_toolsets() -> Dict[str, FilteredToolset]: 'yahoo': yf_server.filtered(lambda ctx, tool_def: True) } + async def get_claude_model(parallel_tool_calls: bool = True, **env_vars): model_name = 'claude-sonnet-4-5-20250929' api_key = env_vars.get('anthropic_api_key') model = AnthropicModel(model_name=model_name, provider=AnthropicProvider(api_key=api_key), settings=ModelSettings(**{ - "temperature": 0.5, - "n": 1, - "max_completion_tokens": 64000, - "max_tokens": 64000, - "parallel_tool_calls": parallel_tool_calls, + 'temperature': 0.5, + 'n': 1, + 'max_completion_tokens': 64000, + 'max_tokens': 64000, + 'parallel_tool_calls': parallel_tool_calls, })) return model + async def build_agent(stream_handler=None, **env_vars): - system_prompt = """ You are an expert travel agent that knows perfectly how to search for hotels on the web. You also have a Data Analyst background, mastering well how to use pandas for tabular operations. """ - agent_name = "YahooFinanceSearchAgent" - + agent_name = 'YahooFinanceSearchAgent' + toolsets = await get_mcp_toolsets() agent = Agent(name=agent_name, - model=await get_claude_model(**env_vars), # Here you place your Model instance + model=await get_claude_model(**env_vars), # Here you place your Model instance toolsets=[*toolsets.values()], system_prompt=system_prompt, event_stream_handler=stream_handler, deps_type=AgentDependencies, ) - + @agent.tool(name='run_python_code') async def run_python_code(ctx: RunContext[None], code: str) -> str: async with code_sandbox(dependencies=['pandas', 'numpy']) as sandbox: result = await sandbox.eval(code) return result - - + temporal_agent = TemporalAgent(wrapped=agent, - model_activity_config=ActivityConfig( - start_to_close_timeout=timedelta(minutes=5), - retry_policy=RetryPolicy(maximum_attempts=50) - ), - toolset_activity_config={ - toolset_id: ActivityConfig( - start_to_close_timeout=timedelta(minutes=3), - retry_policy=RetryPolicy(maximum_attempts=3, - non_retryable_error_types=['ToolRetryError'] - ) - ) for toolset_id in toolsets.keys()}) + model_activity_config=ActivityConfig( + start_to_close_timeout=timedelta(minutes=5), + retry_policy=RetryPolicy(maximum_attempts=50) + ), + toolset_activity_config={ + toolset_id: ActivityConfig( + start_to_close_timeout=timedelta(minutes=3), + retry_policy=RetryPolicy(maximum_attempts=3, + non_retryable_error_types=['ToolRetryError'] + ) + ) for toolset_id in toolsets.keys()}) return temporal_agent ``` -```py {title="streaming_handler.py" test="skip" noqa="F841"} +```py {title="streaming_handler.py" test="skip"} +from collections.abc import AsyncIterable + +from .datamodels import AgentDependencies, EventKind, EventStream from temporalio import activity -from typing import Any, Dict, Deque, AsyncIterable, Optional -from pydantic_ai import AgentStreamEvent, FunctionToolCallEvent, \ - PartStartEvent, FunctionToolResultEvent, TextPart, ToolCallPart, PartDeltaEvent, UsageLimits, TextPartDelta, \ - ThinkingPartDelta -from datamodels import EventStream, EventKind, AgentDependencies + +from pydantic_ai import ( + AgentStreamEvent, + FunctionToolCallEvent, + PartStartEvent, + FunctionToolResultEvent, + TextPart, + ToolCallPart, + PartDeltaEvent, + TextPartDelta, + ThinkingPartDelta, +) + async def streaming_handler(ctx, event_stream_events: AsyncIterable[AgentStreamEvent]): @@ -378,8 +394,8 @@ async def streaming_handler(ctx, This function is used by the agent to stream-out the actions that are being performed (tool calls, llm call, streaming results, etc etc. Feel free to change it as you like or need - skipping events or enriching the content """ - - output = "" + + output = '' output_tool_delta = dict( tool_call_id='', tool_name_delta='', @@ -389,24 +405,24 @@ async def streaming_handler(ctx, async for event in event_stream_events: if isinstance(event, PartStartEvent): if isinstance(event.part, TextPart): - output += f"{event.part.content}" + output += f'{event.part.content}' elif isinstance(event.part, ToolCallPart): - output += f"\nTool Call Id: {event.part.tool_call_id}" - output += f"\nTool Name: {event.part.tool_name}" - output += f"\nTool Args: {event.part.args}" + output += f'\nTool Call Id: {event.part.tool_call_id}' + output += f'\nTool Name: {event.part.tool_name}' + output += f'\nTool Args: {event.part.args}' else: pass elif isinstance(event, FunctionToolCallEvent): - output += f"\nTool Call Id: {event.part.tool_call_id}" - output += f"\nTool Name: {event.part.tool_name}" - output += f"\nTool Args: {event.part.args}" + output += f'\nTool Call Id: {event.part.tool_call_id}' + output += f'\nTool Name: {event.part.tool_name}' + output += f'\nTool Args: {event.part.args}' elif isinstance(event, FunctionToolResultEvent): - output += f"\nTool Call Id: {event.result.tool_call_id}" - output += f"\nTool Name: {event.result.tool_name}" - output += f"\nContent: {event.result.content}" + output += f'\nTool Call Id: {event.result.tool_call_id}' + output += f'\nTool Name: {event.result.tool_name}' + output += f'\nContent: {event.result.content}' elif isinstance(event, PartDeltaEvent): if isinstance(event.delta, TextPartDelta) or isinstance(event.delta, ThinkingPartDelta): - output += f"{event.delta.content_delta}" + output += f'{event.delta.content_delta}' else: if len(output_tool_delta['tool_call_id']) == 0: output_tool_delta['tool_call_id'] += event.delta.tool_call_id or '' @@ -414,9 +430,9 @@ async def streaming_handler(ctx, output_tool_delta['args_delta'] += event.delta.args_delta or '' if len(output_tool_delta['tool_call_id']): - output += f"\nTool Call Id: {output_tool_delta['tool_call_id']}" - output += f"\nTool Name: {output_tool_delta['tool_name_delta']}" - output += f"\nTool Args: {output_tool_delta['args_delta']}" + output += f'\nTool Call Id: {output_tool_delta["tool_call_id"]}' + output += f'\nTool Name: {output_tool_delta["tool_name_delta"]}' + output += f'\nTool Args: {output_tool_delta["args_delta"]}' events = [] @@ -434,28 +450,23 @@ async def streaming_handler(ctx, await workflow_handle.signal('append_event', arg=event) ``` - -```py {title="workflow.py" test="skip" noqa="F841"} +```py {title="workflow.py" test="skip"} import asyncio from collections import deque from datetime import timedelta -from typing import Any, Dict, Deque, Optional +from typing import Any -from temporalio import workflow, activity - -with workflow.unsafe.imports_passed_through(): - from datamodels import EventStream, EventKind, AgentDependencies - from agents import YahooFinanceSearchAgent - from pydanticai import UsageLimits - from agents import streaming_handler, build_agent - from utils import read_config_yml +from pydanticai import UsageLimits +from temporalio import activity, workflow +from .agents import build_agent, streaming_handler +from .datamodels import AgentDependencies, EventKind, EventStream @workflow.defn class YahooFinanceSearchWorkflow: def __init__(self): - self.events: Deque[EventStream] = deque() + self.events: deque[EventStream] = deque() @workflow.run async def run(self, user_prompt: str): @@ -463,7 +474,7 @@ class YahooFinanceSearchWorkflow: wf_vars = await workflow.execute_activity( activity='retrieve_env_vars', start_to_close_timeout=timedelta(seconds=10), - result_type=Dict[str, Any], + result_type=dict[str, Any], ) deps = AgentDependencies(workflow_id=workflow.info().workflow_id, run_id=workflow.info().run_id) @@ -472,12 +483,12 @@ class YahooFinanceSearchWorkflow: usage_limits=UsageLimits(request_limit=50), deps=deps ) - + await self.append_event(event_stream=EventStream(kind=EventKind.RESULT, - content=result.output)) + content=result.output)) await self.append_event(event_stream=EventStream(kind=EventKind.CONTINUE_CHAT, - content="")) + content='')) try: await workflow.wait_condition( @@ -492,16 +503,17 @@ class YahooFinanceSearchWorkflow: @staticmethod @activity.defn(name='retrieve_env_vars') async def retrieve_env_vars(): - with workflow.unsafe.imports_passed_through(): - import os - config_path = os.getenv('APP_CONFIG_PATH', './app_conf.yml') - configs = read_config_yml(config_path) - return { - 'anthropic_api_key': configs['llm']['anthropic_api_key'] - } + import os + from .utils import read_config_yml + + config_path = os.getenv('APP_CONFIG_PATH', './app_conf.yml') + configs = read_config_yml(config_path) + return { + 'anthropic_api_key': configs['llm']['anthropic_api_key'] + } @workflow.query - def event_stream(self) -> Optional[EventStream]: + def event_stream(self) -> EventStream | None: if self.events: return self.events.popleft() return None From 31a5dc87fe9efdb6f3f62a5457ddda360ee7f615 Mon Sep 17 00:00:00 2001 From: scalabrese Date: Thu, 16 Oct 2025 10:18:26 +0200 Subject: [PATCH 05/11] [Temporal - Documentation] Fixed ruff issues --- docs/durable_execution/temporal.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/durable_execution/temporal.md b/docs/durable_execution/temporal.md index 76bc612131..80415a9c4e 100644 --- a/docs/durable_execution/temporal.md +++ b/docs/durable_execution/temporal.md @@ -451,13 +451,12 @@ async def streaming_handler(ctx, ``` ```py {title="workflow.py" test="skip"} - import asyncio from collections import deque from datetime import timedelta from typing import Any -from pydanticai import UsageLimits +from pydantic_ai import UsageLimits from temporalio import activity, workflow from .agents import build_agent, streaming_handler From 944606cdf05757fdd3b0b83f92817ab2799d8c2e Mon Sep 17 00:00:00 2001 From: scalabrese Date: Thu, 16 Oct 2025 11:03:31 +0200 Subject: [PATCH 06/11] [Temporal - Documentation] Moved it to examples --- docs/durable_execution/temporal.md | 341 +----------------- .../temporal_streaming/README.md | 172 +++++++++ .../temporal_streaming/__init__.py | 19 + .../temporal_streaming/agents.py | 118 ++++++ .../temporal_streaming/app_conf.yml | 11 + .../temporal_streaming/datamodels.py | 27 ++ .../temporal_streaming/main.py | 102 ++++++ .../temporal_streaming/streaming_handler.py | 96 +++++ .../temporal_streaming/utils.py | 60 +++ .../temporal_streaming/workflow.py | 114 ++++++ 10 files changed, 728 insertions(+), 332 deletions(-) create mode 100644 examples/pydantic_ai_examples/temporal_streaming/README.md create mode 100644 examples/pydantic_ai_examples/temporal_streaming/__init__.py create mode 100644 examples/pydantic_ai_examples/temporal_streaming/agents.py create mode 100644 examples/pydantic_ai_examples/temporal_streaming/app_conf.yml create mode 100644 examples/pydantic_ai_examples/temporal_streaming/datamodels.py create mode 100644 examples/pydantic_ai_examples/temporal_streaming/main.py create mode 100644 examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py create mode 100644 examples/pydantic_ai_examples/temporal_streaming/utils.py create mode 100644 examples/pydantic_ai_examples/temporal_streaming/workflow.py diff --git a/docs/durable_execution/temporal.md b/docs/durable_execution/temporal.md index 80415a9c4e..2199835c15 100644 --- a/docs/durable_execution/temporal.md +++ b/docs/durable_execution/temporal.md @@ -185,343 +185,20 @@ The event stream handler function will receive the agent [run context][pydantic_ As the streaming model request activity, workflow, and workflow execution call all take place in separate processes, passing data between them requires some care: - To get data from the workflow call site or workflow to the event stream handler, you can use a [dependencies object](#agent-run-context-and-dependencies). -- To get data from the event stream handler to the workflow, workflow call site, or a frontend, you need to use an external system that the event stream handler can write to and the event consumer can read from, like a message queue. You can use the dependency object to make sure the same connection string or other unique ID is available in all the places that need it. +- To get data from the event stream handler to the workflow, workflow call site, or a frontend, you need to use an external system that the event stream handler can write to and the event consumer can read from. Alternatively, you can use Temporal's built-in signals and queries to pass events from activities to the workflow and from the workflow to the caller. #### Example -Run the following -`pip install pydantic-ai temporalio mcp-run-python` - -Assuming your project has the following structure: -``` - project/ - ├── src/ - │ ├── agents.py - │ ├── datamodels.py - │ ├── streaming_handler.py - │ ├── utils.py - │ └── workflow.py - └── pyproject.toml - -``` - -```py {title="utils.py" test="skip"} -import os -from copy import copy - -import yaml - - -def recursively_modify_api_key(conf): - """ - Recursively replace API key placeholders with environment variable values. - - This function traverses a configuration dictionary and replaces any keys - containing 'api_key' with the corresponding environment variable value. - It handles nested dictionaries and lists recursively. - - Args: - conf: The configuration dictionary to process. - - Returns: - A copy of the configuration with API keys replaced by environment variable values. - """ - - def inner(_conf): - for key, value in _conf.items(): - if isinstance(value, dict): - inner(value) - elif isinstance(value, list): - if len(value) > 0 and isinstance(value[0], dict): - for item in value: - inner(item) - else: - _conf[key] = [os.environ.get(str(v), v) for v in value] - elif isinstance(value, str): - _conf[key] = os.environ.get(value, value) - else: - _conf[key] = value - - copy_conf = copy(conf) - inner(copy_conf) - return copy_conf - - -def read_config_yml(path): - """ - Read and process a YAML configuration file. - - This function reads a YAML file, processes it to replace API key placeholders - with environment variable values, and returns the processed configuration. - - Args: - path: The path to the YAML configuration file. - - Returns: - dict: The parsed and processed YAML content as a Python dictionary. - """ - with open(path) as f: - configs = yaml.safe_load(f) - recursively_modify_api_key(configs) - return configs -``` - -```py {title="datamodels.py" test="skip"} -from enum import Enum - -from pydantic import BaseModel - - -class AgentDependencies(BaseModel): - workflow_id: str - run_id: str +For a complete working example of streaming with Temporal using signals and queries, see the [temporal_streaming example](https://github.com/pydantic/pydantic-ai/tree/main/examples/pydantic_ai_examples/temporal_streaming). This example demonstrates: +- How to use an [`event_stream_handler`][pydantic_ai.agent.EventStreamHandler] to capture agent events in activities +- Using Temporal signals to send events from activities to the workflow +- Using Temporal queries to poll events from the workflow to the caller +- Setting up dependencies to pass workflow identification for signal routing +- Integrating MCP toolsets and custom tools with streaming +- Complete project structure with all necessary files -class EventKind(str, Enum): - CONTINUE_CHAT = 'continue_chat' - EVENT = 'event' - RESULT = 'result' - - -class EventStream(BaseModel): - kind: EventKind - content: str -``` - - -```py {title="agents.py" test="skip"} -from datetime import timedelta - -from mcp_run_python import code_sandbox -from pydantic_ai import Agent, FilteredToolset, ModelSettings, RunContext -from pydantic_ai.durable_exec.temporal import TemporalAgent -from pydantic_ai.mcp import MCPServerStdio -from pydantic_ai.models.anthropic import AnthropicModel -from pydantic_ai.providers.anthropic import AnthropicProvider -from temporalio.common import RetryPolicy -from temporalio.workflow import ActivityConfig - -from .datamodels import AgentDependencies - -async def get_mcp_toolsets() -> dict[str, FilteredToolset]: - yf_server = MCPServerStdio( - command='uvx', - args=['mcp-yahoo-finance'], - timeout=240, - read_timeout=240, - id='yahoo' - ) - return { - 'yahoo': yf_server.filtered(lambda ctx, tool_def: True) - } - - -async def get_claude_model(parallel_tool_calls: bool = True, **env_vars): - model_name = 'claude-sonnet-4-5-20250929' - api_key = env_vars.get('anthropic_api_key') - model = AnthropicModel(model_name=model_name, - provider=AnthropicProvider(api_key=api_key), - settings=ModelSettings(**{ - 'temperature': 0.5, - 'n': 1, - 'max_completion_tokens': 64000, - 'max_tokens': 64000, - 'parallel_tool_calls': parallel_tool_calls, - })) - - return model - - -async def build_agent(stream_handler=None, **env_vars): - system_prompt = """ - You are an expert travel agent that knows perfectly how to search for hotels on the web. - You also have a Data Analyst background, mastering well how to use pandas for tabular operations. - """ - agent_name = 'YahooFinanceSearchAgent' - - toolsets = await get_mcp_toolsets() - agent = Agent(name=agent_name, - model=await get_claude_model(**env_vars), # Here you place your Model instance - toolsets=[*toolsets.values()], - system_prompt=system_prompt, - event_stream_handler=stream_handler, - deps_type=AgentDependencies, - ) - - @agent.tool(name='run_python_code') - async def run_python_code(ctx: RunContext[None], code: str) -> str: - async with code_sandbox(dependencies=['pandas', 'numpy']) as sandbox: - result = await sandbox.eval(code) - return result - - temporal_agent = TemporalAgent(wrapped=agent, - model_activity_config=ActivityConfig( - start_to_close_timeout=timedelta(minutes=5), - retry_policy=RetryPolicy(maximum_attempts=50) - ), - toolset_activity_config={ - toolset_id: ActivityConfig( - start_to_close_timeout=timedelta(minutes=3), - retry_policy=RetryPolicy(maximum_attempts=3, - non_retryable_error_types=['ToolRetryError'] - ) - ) for toolset_id in toolsets.keys()}) - return temporal_agent -``` - -```py {title="streaming_handler.py" test="skip"} -from collections.abc import AsyncIterable - -from .datamodels import AgentDependencies, EventKind, EventStream -from temporalio import activity - -from pydantic_ai import ( - AgentStreamEvent, - FunctionToolCallEvent, - PartStartEvent, - FunctionToolResultEvent, - TextPart, - ToolCallPart, - PartDeltaEvent, - TextPartDelta, - ThinkingPartDelta, -) - - -async def streaming_handler(ctx, - event_stream_events: AsyncIterable[AgentStreamEvent]): - """ - This function is used by the agent to stream-out the actions that are being performed (tool calls, llm call, streaming results, etc etc. - Feel free to change it as you like or need - skipping events or enriching the content - """ - - output = '' - output_tool_delta = dict( - tool_call_id='', - tool_name_delta='', - args_delta='', - ) - # If TextPart and output delta is empty - async for event in event_stream_events: - if isinstance(event, PartStartEvent): - if isinstance(event.part, TextPart): - output += f'{event.part.content}' - elif isinstance(event.part, ToolCallPart): - output += f'\nTool Call Id: {event.part.tool_call_id}' - output += f'\nTool Name: {event.part.tool_name}' - output += f'\nTool Args: {event.part.args}' - else: - pass - elif isinstance(event, FunctionToolCallEvent): - output += f'\nTool Call Id: {event.part.tool_call_id}' - output += f'\nTool Name: {event.part.tool_name}' - output += f'\nTool Args: {event.part.args}' - elif isinstance(event, FunctionToolResultEvent): - output += f'\nTool Call Id: {event.result.tool_call_id}' - output += f'\nTool Name: {event.result.tool_name}' - output += f'\nContent: {event.result.content}' - elif isinstance(event, PartDeltaEvent): - if isinstance(event.delta, TextPartDelta) or isinstance(event.delta, ThinkingPartDelta): - output += f'{event.delta.content_delta}' - else: - if len(output_tool_delta['tool_call_id']) == 0: - output_tool_delta['tool_call_id'] += event.delta.tool_call_id or '' - output_tool_delta['tool_name_delta'] += event.delta.tool_name_delta or '' - output_tool_delta['args_delta'] += event.delta.args_delta or '' - - if len(output_tool_delta['tool_call_id']): - output += f'\nTool Call Id: {output_tool_delta["tool_call_id"]}' - output += f'\nTool Name: {output_tool_delta["tool_name_delta"]}' - output += f'\nTool Args: {output_tool_delta["args_delta"]}' - - events = [] - - if output: - event = EventStream(kind=EventKind.EVENT, content=output) - events.append(event) - - if activity.in_activity(): - deps: AgentDependencies = ctx.deps - - workflow_id = deps.workflow_id - run_id = deps.run_id - workflow_handle = activity.client().get_workflow_handle(workflow_id=workflow_id, run_id=run_id) - for event in events: - await workflow_handle.signal('append_event', arg=event) -``` - -```py {title="workflow.py" test="skip"} -import asyncio -from collections import deque -from datetime import timedelta -from typing import Any - -from pydantic_ai import UsageLimits -from temporalio import activity, workflow - -from .agents import build_agent, streaming_handler -from .datamodels import AgentDependencies, EventKind, EventStream - -@workflow.defn -class YahooFinanceSearchWorkflow: - def __init__(self): - self.events: deque[EventStream] = deque() - - @workflow.run - async def run(self, user_prompt: str): - - wf_vars = await workflow.execute_activity( - activity='retrieve_env_vars', - start_to_close_timeout=timedelta(seconds=10), - result_type=dict[str, Any], - ) - deps = AgentDependencies(workflow_id=workflow.info().workflow_id, run_id=workflow.info().run_id) - - agent = await build_agent(streaming_handler, **wf_vars) - result = await agent.run(user_prompt=user_prompt, - usage_limits=UsageLimits(request_limit=50), - deps=deps - ) - - await self.append_event(event_stream=EventStream(kind=EventKind.RESULT, - content=result.output)) - - await self.append_event(event_stream=EventStream(kind=EventKind.CONTINUE_CHAT, - content='')) - - try: - await workflow.wait_condition( - lambda: len(self.events) == 0, - timeout=timedelta(seconds=10), - timeout_summary='Waiting for events to be consumed' - ) - return result.output - except asyncio.TimeoutError: - return result.output - - @staticmethod - @activity.defn(name='retrieve_env_vars') - async def retrieve_env_vars(): - import os - from .utils import read_config_yml - - config_path = os.getenv('APP_CONFIG_PATH', './app_conf.yml') - configs = read_config_yml(config_path) - return { - 'anthropic_api_key': configs['llm']['anthropic_api_key'] - } - - @workflow.query - def event_stream(self) -> EventStream | None: - if self.events: - return self.events.popleft() - return None - - @workflow.signal - async def append_event(self, event_stream: EventStream): - # This signal is invoked by streaming_handler, pushing event for every async loop - self.events.append(event_stream) -``` +The example includes a Yahoo Finance search agent with Python code execution capabilities, showing how to stream tool calls, model responses, and results in real-time during workflow execution. diff --git a/examples/pydantic_ai_examples/temporal_streaming/README.md b/examples/pydantic_ai_examples/temporal_streaming/README.md new file mode 100644 index 0000000000..ad1aa0fff3 --- /dev/null +++ b/examples/pydantic_ai_examples/temporal_streaming/README.md @@ -0,0 +1,172 @@ +# Temporal Streaming Example + +This example demonstrates how to implement streaming with Pydantic AI agents in Temporal workflows. It showcases the streaming pattern described in the [Temporal documentation](../../../docs/durable_execution/temporal.md#streaming). + +## Overview + +The example implements a Yahoo Finance search agent that: +- Uses MCP (Model Context Protocol) toolsets for accessing financial data +- Executes Python code in a sandbox for data analysis +- Streams events during execution via Temporal signals and queries +- Provides durable execution with automatic retries + +## Architecture + +The streaming architecture works as follows: + +1. **Agent Configuration** (`agents.py`): Defines the agent with MCP toolsets and custom Python execution tools +2. **Workflow** (`workflow.py`): Temporal workflow that orchestrates agent execution and manages event streams +3. **Streaming Handler** (`streaming_handler.py`): Processes agent events and sends them to the workflow via signals +4. **Main** (`main.py`): Sets up the Temporal client/worker and polls for events via queries + +## Key Components + +### Event Flow + +``` +Agent Execution (in Activity) + ↓ +Streaming Handler + ↓ (via Signal) +Workflow Event Queue + ↓ (via Query) +Main Process (polling) + ↓ +Display to User +``` + +### Dependencies + +The [`AgentDependencies`][pydantic_ai_examples.temporal_streaming.datamodels.AgentDependencies] model passes workflow identification from the workflow to activities, enabling the streaming handler to send signals back to the correct workflow instance. + +## Prerequisites + +1. **Temporal Server**: Install and run Temporal locally + +```bash +brew install temporal +temporal server start-dev +``` + +2. **Python Dependencies**: Install required packages + +```bash +pip install pydantic-ai temporalio mcp-run-python pyyaml +``` + +3. **Configuration File**: Create an `app_conf.yml` file in your project root + +```yaml +llm: + anthropic_api_key: ANTHROPIC_API_KEY # Will be read from environment variable +``` + +4. **Environment Variables**: Set your Anthropic API key + +```bash +export ANTHROPIC_API_KEY='your-api-key-here' +``` + +## Running the Example + +1. Make sure Temporal server is running: + +```bash +temporal server start-dev +``` + +2. Set the configuration file path (optional, defaults to `./app_conf.yml`): + +```bash +export APP_CONFIG_PATH=./app_conf.yml +``` + +3. Run the example: + +```bash +python -m pydantic_ai_examples.temporal_streaming.main +``` + +## What to Expect + +The example will: +1. Connect to Temporal server +2. Start a worker to handle workflows and activities +3. Execute the workflow with a sample financial query +4. Stream events as the agent: + - Calls tools (Yahoo Finance API, Python sandbox) + - Receives responses + - Generates the final result +5. Display all events in real-time +6. Show the final result + +## Project Structure + +``` +temporal_streaming/ +├── agents.py # Agent configuration with MCP toolsets +├── datamodels.py # Pydantic models for dependencies and events +├── main.py # Main entry point +├── streaming_handler.py # Event stream handler +├── utils.py # Configuration utilities +├── workflow.py # Temporal workflow definition +└── README.md # This file +``` + +## Customization + +### Changing the Query + +Edit the query in `main.py`: + +```python +workflow_handle = await client.start_workflow( + YahooFinanceSearchWorkflow.run, + args=['Your custom financial query here'], + id=workflow_id, + task_queue=task_queue, +) +``` + +### Adding More Tools + +Add tools to the agent in `agents.py`: + +```python +@agent.tool(name='your_tool_name') +async def your_tool(ctx: RunContext[AgentDependencies], param: str) -> str: + # Your tool implementation + return result +``` + +### Modifying Event Handling + +Customize what events are captured and displayed in `streaming_handler.py`. + +## Key Concepts + +### Why Streaming is Different in Temporal + +Traditional streaming methods like [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] don't work in Temporal because: +- Activities cannot stream directly to the workflow +- The workflow and activity run in separate processes + +### The Solution + +This example uses: +- **Event Stream Handler**: Captures events during agent execution +- **Signals**: Push events from activities to the workflow +- **Queries**: Pull events from the workflow to the caller +- **Dependencies**: Pass workflow identification to enable signal routing + +## Limitations + +- Events are batched per model request/tool call rather than streamed token-by-token +- Query polling introduces a small delay in event delivery +- The workflow waits up to 10 seconds for events to be consumed before completing + +## Learn More + +- [Temporal Documentation](https://docs.temporal.io/) +- [Pydantic AI Temporal Integration](../../../docs/durable_execution/temporal.md) +- [Streaming with Pydantic AI](../../../docs/agents.md#streaming-all-events) diff --git a/examples/pydantic_ai_examples/temporal_streaming/__init__.py b/examples/pydantic_ai_examples/temporal_streaming/__init__.py new file mode 100644 index 0000000000..dfaff570e3 --- /dev/null +++ b/examples/pydantic_ai_examples/temporal_streaming/__init__.py @@ -0,0 +1,19 @@ +"""Temporal streaming example for Pydantic AI. + +This example demonstrates how to implement streaming with Pydantic AI agents +in Temporal workflows using signals and queries. +""" + +from .agents import build_agent +from .datamodels import AgentDependencies, EventKind, EventStream +from .streaming_handler import streaming_handler +from .workflow import YahooFinanceSearchWorkflow + +__all__ = [ + 'build_agent', + 'streaming_handler', + 'YahooFinanceSearchWorkflow', + 'AgentDependencies', + 'EventKind', + 'EventStream', +] diff --git a/examples/pydantic_ai_examples/temporal_streaming/agents.py b/examples/pydantic_ai_examples/temporal_streaming/agents.py new file mode 100644 index 0000000000..fc7b541d25 --- /dev/null +++ b/examples/pydantic_ai_examples/temporal_streaming/agents.py @@ -0,0 +1,118 @@ +"""Agent configuration for the Temporal streaming example. + +This module defines the agent setup with MCP toolsets, model configuration, +and custom tools for data analysis. +""" + +from datetime import timedelta + +from mcp_run_python import code_sandbox +from pydantic_ai import Agent, FilteredToolset, ModelSettings, RunContext +from pydantic_ai.durable_exec.temporal import TemporalAgent +from pydantic_ai.mcp import MCPServerStdio +from pydantic_ai.models.anthropic import AnthropicModel +from pydantic_ai.providers.anthropic import AnthropicProvider +from temporalio.common import RetryPolicy +from temporalio.workflow import ActivityConfig + +from datamodels import AgentDependencies + + +async def get_mcp_toolsets() -> dict[str, FilteredToolset]: + """ + Initialize MCP toolsets for the agent. + + Returns: + A dictionary mapping toolset names to filtered toolsets. + """ + yf_server = MCPServerStdio( + command='uvx', + args=['mcp-yahoo-finance'], + timeout=240, + read_timeout=240, + id='yahoo', + ) + return {'yahoo': yf_server.filtered(lambda ctx, tool_def: True)} + + +async def get_claude_model(parallel_tool_calls: bool = True, **env_vars): + """ + Create and configure the Claude model. + + Args: + parallel_tool_calls: Whether to enable parallel tool calls. + **env_vars: Environment variables including API keys. + + Returns: + Configured AnthropicModel instance. + """ + model_name = 'claude-sonnet-4-5-20250929' + api_key = env_vars.get('anthropic_api_key') + model = AnthropicModel( + model_name=model_name, + provider=AnthropicProvider(api_key=api_key), + settings=ModelSettings( + **{ + 'temperature': 0.5, + 'n': 1, + 'max_completion_tokens': 64000, + 'max_tokens': 64000, + 'parallel_tool_calls': parallel_tool_calls, + } + ), + ) + + return model + + +async def build_agent(stream_handler=None, **env_vars): + """ + Build and configure the agent with tools and temporal settings. + + Args: + stream_handler: Optional event stream handler for streaming responses. + **env_vars: Environment variables including API keys. + + Returns: + TemporalAgent instance ready for use in Temporal workflows. + """ + system_prompt = """ + You are an expert financial analyst that knows how to search for financial data on the web. + You also have a Data Analyst background, mastering well how to use pandas for tabular operations. + """ + agent_name = 'YahooFinanceSearchAgent' + + toolsets = await get_mcp_toolsets() + agent = Agent( + name=agent_name, + model=await get_claude_model(**env_vars), + toolsets=[*toolsets.values()], + system_prompt=system_prompt, + event_stream_handler=stream_handler, + deps_type=AgentDependencies, + ) + + @agent.tool(name='run_python_code') + async def run_python_code(ctx: RunContext[None], code: str) -> str: + """Execute Python code in a sandboxed environment with pandas and numpy available.""" + async with code_sandbox(dependencies=['pandas', 'numpy']) as sandbox: + result = await sandbox.eval(code) + return result + + temporal_agent = TemporalAgent( + wrapped=agent, + model_activity_config=ActivityConfig( + start_to_close_timeout=timedelta(minutes=5), + retry_policy=RetryPolicy(maximum_attempts=50), + ), + toolset_activity_config={ + toolset_id: ActivityConfig( + start_to_close_timeout=timedelta(minutes=3), + retry_policy=RetryPolicy( + maximum_attempts=3, non_retryable_error_types=['ToolRetryError'] + ), + ) + for toolset_id in toolsets.keys() + }, + ) + return temporal_agent diff --git a/examples/pydantic_ai_examples/temporal_streaming/app_conf.yml b/examples/pydantic_ai_examples/temporal_streaming/app_conf.yml new file mode 100644 index 0000000000..4aa566303d --- /dev/null +++ b/examples/pydantic_ai_examples/temporal_streaming/app_conf.yml @@ -0,0 +1,11 @@ +# Configuration file for the Temporal Streaming example +# +# This file demonstrates how to configure API keys using environment variables. +# The value specified here (e.g., ANTHROPIC_API_KEY) will be replaced with the +# actual environment variable value at runtime. + +llm: + # The anthropic_api_key will be read from the ANTHROPIC_API_KEY environment variable + # Make sure to set it before running the example: + # export ANTHROPIC_API_KEY='your-api-key-here' + anthropic_api_key: 'ANTHROPIC_API_KEY' diff --git a/examples/pydantic_ai_examples/temporal_streaming/datamodels.py b/examples/pydantic_ai_examples/temporal_streaming/datamodels.py new file mode 100644 index 0000000000..ce1557f0b7 --- /dev/null +++ b/examples/pydantic_ai_examples/temporal_streaming/datamodels.py @@ -0,0 +1,27 @@ +"""Data models for the temporal streaming example.""" + +from enum import Enum + +from pydantic import BaseModel + + +class AgentDependencies(BaseModel): + """Dependencies passed to the agent containing workflow identification.""" + + workflow_id: str + run_id: str + + +class EventKind(str, Enum): + """Types of events that can be streamed.""" + + CONTINUE_CHAT = 'continue_chat' + EVENT = 'event' + RESULT = 'result' + + +class EventStream(BaseModel): + """Event stream data model for streaming agent events.""" + + kind: EventKind + content: str diff --git a/examples/pydantic_ai_examples/temporal_streaming/main.py b/examples/pydantic_ai_examples/temporal_streaming/main.py new file mode 100644 index 0000000000..aeb2b29ec6 --- /dev/null +++ b/examples/pydantic_ai_examples/temporal_streaming/main.py @@ -0,0 +1,102 @@ +"""Main entry point for the Temporal streaming example. + +This module sets up the Temporal client and worker, executes the workflow, +and polls for streaming events to display to the user. +""" + +import asyncio +import os +import uuid + +from temporalio.client import Client +from temporalio.worker import Worker + +from agents import build_agent +from datamodels import EventKind, EventStream +from pydantic_ai.durable_exec.temporal import AgentPlugin, PydanticAIPlugin +from streaming_handler import streaming_handler +from utils import read_config_yml +from workflow import YahooFinanceSearchWorkflow + + +async def poll_events(workflow_handle): + """ + Poll for events from the workflow and print them. + + Args: + workflow_handle: Handle to the running workflow. + """ + while True: + event = await workflow_handle.query('event_stream', result_type=EventStream | None) + if event is None: + await asyncio.sleep(0.1) + continue + + if event.kind == EventKind.CONTINUE_CHAT: + print('\n--- Workflow completed ---') + break + elif event.kind == EventKind.RESULT: + print(f'\n=== Final Result ===\n{event.content}\n') + elif event.kind == EventKind.EVENT: + print(f'\n--- Event ---\n{event.content}\n') + + +async def main(): + """ + Main function to set up and run the Temporal workflow. + + This function: + 1. Connects to the Temporal server + 2. Builds the agent and registers activities + 3. Starts a worker + 4. Executes the workflow + 5. Polls for streaming events + """ + # Connect to Temporal server + client = await Client.connect( + target_host='localhost:7233', + plugins=[PydanticAIPlugin()], + ) + config_path = os.getenv('APP_CONFIG_PATH', './app_conf.yml') + confs = read_config_yml(config_path) + + # Build the agent with streaming handler + temporal_agent = await build_agent(streaming_handler, **confs['llm']) + + # Define the task queue + task_queue = 'yahoo-finance-search' + + # Start the worker + async with Worker( + client, + task_queue=task_queue, + workflows=[YahooFinanceSearchWorkflow], + activities=[YahooFinanceSearchWorkflow.retrieve_env_vars], + plugins=[AgentPlugin(temporal_agent)], + ): + # Execute the workflow + workflow_id = f'yahoo-finance-search-{uuid.uuid4()}' + workflow_handle = await client.start_workflow( + YahooFinanceSearchWorkflow.run, + args=['What are the latest financial metrics for Apple (AAPL)?'], + id=workflow_id, + task_queue=task_queue, + ) + + print(f'Started workflow with ID: {workflow_id}') + print('Polling for events...\n') + + # Poll for events in the background + event_task = asyncio.create_task(poll_events(workflow_handle)) + + # Wait for workflow to complete + result = await workflow_handle.result() + + # Ensure event polling is done + await event_task + + print(f'\nWorkflow completed with result: {result}') + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py b/examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py new file mode 100644 index 0000000000..127d97557c --- /dev/null +++ b/examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py @@ -0,0 +1,96 @@ +"""Event streaming handler for the Temporal workflow. + +This module handles streaming events from the agent during execution, +processing various event types and sending them to the workflow via signals. +""" + +from collections.abc import AsyncIterable + +from pydantic_ai import ( + AgentStreamEvent, + FunctionToolCallEvent, + FunctionToolResultEvent, + PartDeltaEvent, + PartStartEvent, + TextPart, + TextPartDelta, + ThinkingPartDelta, + ToolCallPart, +) +from temporalio import activity + +from datamodels import AgentDependencies, EventKind, EventStream + + +async def streaming_handler( + ctx, + event_stream_events: AsyncIterable[AgentStreamEvent], +): + """ + Handle streaming events from the agent. + + This function processes events from the agent's execution stream, including + tool calls, LLM responses, and streaming results. It aggregates the events + and sends them to the workflow via signals. + + Args: + ctx: The run context containing dependencies. + event_stream_events: Async iterable of agent stream events. + """ + output = '' + output_tool_delta = dict( + tool_call_id='', + tool_name_delta='', + args_delta='', + ) + + # Process all events from the stream + async for event in event_stream_events: + if isinstance(event, PartStartEvent): + if isinstance(event.part, TextPart): + output += f'{event.part.content}' + elif isinstance(event.part, ToolCallPart): + output += f'\nTool Call Id: {event.part.tool_call_id}' + output += f'\nTool Name: {event.part.tool_name}' + output += f'\nTool Args: {event.part.args}' + else: + pass + elif isinstance(event, FunctionToolCallEvent): + output += f'\nTool Call Id: {event.part.tool_call_id}' + output += f'\nTool Name: {event.part.tool_name}' + output += f'\nTool Args: {event.part.args}' + elif isinstance(event, FunctionToolResultEvent): + output += f'\nTool Call Id: {event.result.tool_call_id}' + output += f'\nTool Name: {event.result.tool_name}' + output += f'\nContent: {event.result.content}' + elif isinstance(event, PartDeltaEvent): + if isinstance(event.delta, TextPartDelta) or isinstance(event.delta, ThinkingPartDelta): + output += f'{event.delta.content_delta}' + else: + if len(output_tool_delta['tool_call_id']) == 0: + output_tool_delta['tool_call_id'] += event.delta.tool_call_id or '' + output_tool_delta['tool_name_delta'] += event.delta.tool_name_delta or '' + output_tool_delta['args_delta'] += event.delta.args_delta or '' + + # Add accumulated tool delta output if present + if len(output_tool_delta['tool_call_id']): + output += f'\nTool Call Id: {output_tool_delta["tool_call_id"]}' + output += f'\nTool Name: {output_tool_delta["tool_name_delta"]}' + output += f'\nTool Args: {output_tool_delta["args_delta"]}' + + events = [] + + # Create event stream if there's output + if output: + event = EventStream(kind=EventKind.EVENT, content=output) + events.append(event) + + # Send events to workflow if running in an activity + if activity.in_activity(): + deps: AgentDependencies = ctx.deps + + workflow_id = deps.workflow_id + run_id = deps.run_id + workflow_handle = activity.client().get_workflow_handle(workflow_id=workflow_id, run_id=run_id) + for event in events: + await workflow_handle.signal('append_event', arg=event) diff --git a/examples/pydantic_ai_examples/temporal_streaming/utils.py b/examples/pydantic_ai_examples/temporal_streaming/utils.py new file mode 100644 index 0000000000..45e902fe22 --- /dev/null +++ b/examples/pydantic_ai_examples/temporal_streaming/utils.py @@ -0,0 +1,60 @@ +"""Utility functions for configuration management.""" + +import os +from copy import copy + +import yaml + + +def recursively_modify_api_key(conf): + """ + Recursively replace API key placeholders with environment variable values. + + This function traverses a configuration dictionary and replaces any keys + containing 'api_key' with the corresponding environment variable value. + It handles nested dictionaries and lists recursively. + + Args: + conf: The configuration dictionary to process. + + Returns: + A copy of the configuration with API keys replaced by environment variable values. + """ + + def inner(_conf): + for key, value in _conf.items(): + if isinstance(value, dict): + inner(value) + elif isinstance(value, list): + if len(value) > 0 and isinstance(value[0], dict): + for item in value: + inner(item) + else: + _conf[key] = [os.environ.get(str(v), v) for v in value] + elif isinstance(value, str): + _conf[key] = os.environ.get(value, value) + else: + _conf[key] = value + + copy_conf = copy(conf) + inner(copy_conf) + return copy_conf + + +def read_config_yml(path): + """ + Read and process a YAML configuration file. + + This function reads a YAML file, processes it to replace API key placeholders + with environment variable values, and returns the processed configuration. + + Args: + path: The path to the YAML configuration file. + + Returns: + dict: The parsed and processed YAML content as a Python dictionary. + """ + with open(path) as f: + configs = yaml.safe_load(f) + recursively_modify_api_key(configs) + return configs diff --git a/examples/pydantic_ai_examples/temporal_streaming/workflow.py b/examples/pydantic_ai_examples/temporal_streaming/workflow.py new file mode 100644 index 0000000000..b3813fd7d0 --- /dev/null +++ b/examples/pydantic_ai_examples/temporal_streaming/workflow.py @@ -0,0 +1,114 @@ +"""Temporal workflow for streaming agent execution. + +This module defines the Temporal workflow that orchestrates the agent execution +with streaming capabilities, using signals and queries for event communication. +""" + +import asyncio +import os +from collections import deque +from datetime import timedelta +from typing import Any + +from pydantic_ai import UsageLimits +from temporalio import activity, workflow + +from agents import build_agent +from streaming_handler import streaming_handler +from datamodels import AgentDependencies, EventKind, EventStream +from utils import read_config_yml + + +@workflow.defn +class YahooFinanceSearchWorkflow: + """ + Temporal workflow for executing the Yahoo Finance search agent with streaming. + + This workflow manages the agent execution, collects streaming events via signals, + and exposes them through queries for consumption by external systems. + """ + + def __init__(self): + """Initialize the workflow with an empty event queue.""" + self.events: deque[EventStream] = deque() + + @workflow.run + async def run(self, user_prompt: str): + """ + Execute the agent with the given user prompt. + + Args: + user_prompt: The user's question or request. + + Returns: + The agent's final output. + """ + # Retrieve environment variables from configuration + wf_vars = await workflow.execute_activity( + activity='retrieve_env_vars', + start_to_close_timeout=timedelta(seconds=10), + result_type=dict[str, Any], + ) + + # Create dependencies with workflow identification for signal routing + deps = AgentDependencies(workflow_id=workflow.info().workflow_id, run_id=workflow.info().run_id) + + # Build and run the agent + agent = await build_agent(streaming_handler, **wf_vars) + result = await agent.run( + user_prompt=user_prompt, usage_limits=UsageLimits(request_limit=50), deps=deps + ) + + # Signal the final result + await self.append_event(event_stream=EventStream(kind=EventKind.RESULT, content=result.output)) + + # Signal completion + await self.append_event(event_stream=EventStream(kind=EventKind.CONTINUE_CHAT, content='')) + + # Wait for events to be consumed before completing + try: + await workflow.wait_condition( + lambda: len(self.events) == 0, + timeout=timedelta(seconds=10), + timeout_summary='Waiting for events to be consumed', + ) + return result.output + except asyncio.TimeoutError: + return result.output + + @staticmethod + @activity.defn(name='retrieve_env_vars') + async def retrieve_env_vars(): + """ + Retrieve environment variables from configuration file. + + Returns: + Dictionary containing API keys and other configuration. + """ + config_path = os.getenv('APP_CONFIG_PATH', './app_conf.yml') + configs = read_config_yml(config_path) + return {'anthropic_api_key': configs['llm']['anthropic_api_key']} + + @workflow.query + def event_stream(self) -> EventStream | None: + """ + Query to retrieve the next event from the stream. + + Returns: + The next event if available, None otherwise. + """ + if self.events: + return self.events.popleft() + return None + + @workflow.signal + async def append_event(self, event_stream: EventStream): + """ + Signal to append a new event to the stream. + + This is called by the streaming handler running in activities. + + Args: + event_stream: The event to append. + """ + self.events.append(event_stream) From edaecd4ba798a45d5bbec8842fc5e8a8c0f3fc5b Mon Sep 17 00:00:00 2001 From: scalabrese Date: Thu, 16 Oct 2025 11:10:41 +0200 Subject: [PATCH 07/11] [Temporal - Documentation] Removed code execution --- .../temporal_streaming/agents.py | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/examples/pydantic_ai_examples/temporal_streaming/agents.py b/examples/pydantic_ai_examples/temporal_streaming/agents.py index fc7b541d25..2880e4baf1 100644 --- a/examples/pydantic_ai_examples/temporal_streaming/agents.py +++ b/examples/pydantic_ai_examples/temporal_streaming/agents.py @@ -6,16 +6,15 @@ from datetime import timedelta -from mcp_run_python import code_sandbox -from pydantic_ai import Agent, FilteredToolset, ModelSettings, RunContext -from pydantic_ai.durable_exec.temporal import TemporalAgent -from pydantic_ai.mcp import MCPServerStdio -from pydantic_ai.models.anthropic import AnthropicModel -from pydantic_ai.providers.anthropic import AnthropicProvider from temporalio.common import RetryPolicy from temporalio.workflow import ActivityConfig from datamodels import AgentDependencies +from pydantic_ai import Agent, FilteredToolset, ModelSettings +from pydantic_ai.durable_exec.temporal import TemporalAgent +from pydantic_ai.mcp import MCPServerStdio +from pydantic_ai.models.anthropic import AnthropicModel +from pydantic_ai.providers.anthropic import AnthropicProvider async def get_mcp_toolsets() -> dict[str, FilteredToolset]: @@ -78,7 +77,6 @@ async def build_agent(stream_handler=None, **env_vars): """ system_prompt = """ You are an expert financial analyst that knows how to search for financial data on the web. - You also have a Data Analyst background, mastering well how to use pandas for tabular operations. """ agent_name = 'YahooFinanceSearchAgent' @@ -92,13 +90,6 @@ async def build_agent(stream_handler=None, **env_vars): deps_type=AgentDependencies, ) - @agent.tool(name='run_python_code') - async def run_python_code(ctx: RunContext[None], code: str) -> str: - """Execute Python code in a sandboxed environment with pandas and numpy available.""" - async with code_sandbox(dependencies=['pandas', 'numpy']) as sandbox: - result = await sandbox.eval(code) - return result - temporal_agent = TemporalAgent( wrapped=agent, model_activity_config=ActivityConfig( From d15723c463e2ac29cc09df64d4dd00a0a35badba Mon Sep 17 00:00:00 2001 From: scalabrese Date: Thu, 16 Oct 2025 11:24:55 +0200 Subject: [PATCH 08/11] [Temporal - Documentation] Removed code execution --- .../temporal_streaming/agents.py | 3 ++- .../pydantic_ai_examples/temporal_streaming/main.py | 12 ++++++------ .../temporal_streaming/streaming_handler.py | 2 +- .../temporal_streaming/workflow.py | 8 ++++---- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/examples/pydantic_ai_examples/temporal_streaming/agents.py b/examples/pydantic_ai_examples/temporal_streaming/agents.py index 2880e4baf1..2f10999e8e 100644 --- a/examples/pydantic_ai_examples/temporal_streaming/agents.py +++ b/examples/pydantic_ai_examples/temporal_streaming/agents.py @@ -9,13 +9,14 @@ from temporalio.common import RetryPolicy from temporalio.workflow import ActivityConfig -from datamodels import AgentDependencies from pydantic_ai import Agent, FilteredToolset, ModelSettings from pydantic_ai.durable_exec.temporal import TemporalAgent from pydantic_ai.mcp import MCPServerStdio from pydantic_ai.models.anthropic import AnthropicModel from pydantic_ai.providers.anthropic import AnthropicProvider +from .datamodels import AgentDependencies + async def get_mcp_toolsets() -> dict[str, FilteredToolset]: """ diff --git a/examples/pydantic_ai_examples/temporal_streaming/main.py b/examples/pydantic_ai_examples/temporal_streaming/main.py index aeb2b29ec6..ce92f3171b 100644 --- a/examples/pydantic_ai_examples/temporal_streaming/main.py +++ b/examples/pydantic_ai_examples/temporal_streaming/main.py @@ -8,15 +8,15 @@ import os import uuid +from pydantic_ai.durable_exec.temporal import AgentPlugin, PydanticAIPlugin from temporalio.client import Client from temporalio.worker import Worker -from agents import build_agent -from datamodels import EventKind, EventStream -from pydantic_ai.durable_exec.temporal import AgentPlugin, PydanticAIPlugin -from streaming_handler import streaming_handler -from utils import read_config_yml -from workflow import YahooFinanceSearchWorkflow +from .agents import build_agent +from .datamodels import EventKind, EventStream +from .streaming_handler import streaming_handler +from .utils import read_config_yml +from .workflow import YahooFinanceSearchWorkflow async def poll_events(workflow_handle): diff --git a/examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py b/examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py index 127d97557c..67ecba6a15 100644 --- a/examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py +++ b/examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py @@ -19,7 +19,7 @@ ) from temporalio import activity -from datamodels import AgentDependencies, EventKind, EventStream +from .datamodels import AgentDependencies, EventKind, EventStream async def streaming_handler( diff --git a/examples/pydantic_ai_examples/temporal_streaming/workflow.py b/examples/pydantic_ai_examples/temporal_streaming/workflow.py index b3813fd7d0..0af1d5aae7 100644 --- a/examples/pydantic_ai_examples/temporal_streaming/workflow.py +++ b/examples/pydantic_ai_examples/temporal_streaming/workflow.py @@ -13,10 +13,10 @@ from pydantic_ai import UsageLimits from temporalio import activity, workflow -from agents import build_agent -from streaming_handler import streaming_handler -from datamodels import AgentDependencies, EventKind, EventStream -from utils import read_config_yml +from .agents import build_agent +from .datamodels import AgentDependencies, EventKind, EventStream +from .streaming_handler import streaming_handler +from .utils import read_config_yml @workflow.defn From 86f5b55d9b644fc13705da070c84adbf12ecf846 Mon Sep 17 00:00:00 2001 From: scalabrese Date: Fri, 17 Oct 2025 09:28:28 +0200 Subject: [PATCH 09/11] [Temporal - Documentation] Fixing PR errors --- .../temporal_streaming/agents.py | 30 +++++++------- .../temporal_streaming/main.py | 18 +++++---- .../temporal_streaming/streaming_handler.py | 39 ++++++++----------- .../temporal_streaming/utils.py | 5 ++- .../temporal_streaming/workflow.py | 6 +-- 5 files changed, 46 insertions(+), 52 deletions(-) diff --git a/examples/pydantic_ai_examples/temporal_streaming/agents.py b/examples/pydantic_ai_examples/temporal_streaming/agents.py index 2f10999e8e..8da4355487 100644 --- a/examples/pydantic_ai_examples/temporal_streaming/agents.py +++ b/examples/pydantic_ai_examples/temporal_streaming/agents.py @@ -3,18 +3,17 @@ This module defines the agent setup with MCP toolsets, model configuration, and custom tools for data analysis. """ - from datetime import timedelta from temporalio.common import RetryPolicy from temporalio.workflow import ActivityConfig from pydantic_ai import Agent, FilteredToolset, ModelSettings +from pydantic_ai.agent import EventStreamHandler from pydantic_ai.durable_exec.temporal import TemporalAgent from pydantic_ai.mcp import MCPServerStdio from pydantic_ai.models.anthropic import AnthropicModel from pydantic_ai.providers.anthropic import AnthropicProvider - from .datamodels import AgentDependencies @@ -35,43 +34,40 @@ async def get_mcp_toolsets() -> dict[str, FilteredToolset]: return {'yahoo': yf_server.filtered(lambda ctx, tool_def: True)} -async def get_claude_model(parallel_tool_calls: bool = True, **env_vars): +async def get_claude_model(parallel_tool_calls: bool = True, **kwargs): """ Create and configure the Claude model. Args: parallel_tool_calls: Whether to enable parallel tool calls. - **env_vars: Environment variables including API keys. + **kwargs: Environment variables including API keys. Returns: Configured AnthropicModel instance. """ - model_name = 'claude-sonnet-4-5-20250929' - api_key = env_vars.get('anthropic_api_key') - model = AnthropicModel( + model_name: str = 'claude-sonnet-4-5-20250929' + api_key: str | None = kwargs.get('anthropic_api_key', None) + model: AnthropicModel = AnthropicModel( model_name=model_name, provider=AnthropicProvider(api_key=api_key), settings=ModelSettings( - **{ - 'temperature': 0.5, - 'n': 1, - 'max_completion_tokens': 64000, - 'max_tokens': 64000, - 'parallel_tool_calls': parallel_tool_calls, - } + temperature=0.5, + max_tokens=64000, + parallel_tool_calls=parallel_tool_calls, ), ) return model -async def build_agent(stream_handler=None, **env_vars): +async def build_agent(stream_handler: EventStreamHandler, + **kwargs) -> TemporalAgent: """ Build and configure the agent with tools and temporal settings. Args: stream_handler: Optional event stream handler for streaming responses. - **env_vars: Environment variables including API keys. + **kwargs: Environment variables including API keys. Returns: TemporalAgent instance ready for use in Temporal workflows. @@ -84,7 +80,7 @@ async def build_agent(stream_handler=None, **env_vars): toolsets = await get_mcp_toolsets() agent = Agent( name=agent_name, - model=await get_claude_model(**env_vars), + model=await get_claude_model(**kwargs), toolsets=[*toolsets.values()], system_prompt=system_prompt, event_stream_handler=stream_handler, diff --git a/examples/pydantic_ai_examples/temporal_streaming/main.py b/examples/pydantic_ai_examples/temporal_streaming/main.py index ce92f3171b..46f24d308e 100644 --- a/examples/pydantic_ai_examples/temporal_streaming/main.py +++ b/examples/pydantic_ai_examples/temporal_streaming/main.py @@ -7,11 +7,12 @@ import asyncio import os import uuid +from typing import Any, Union -from pydantic_ai.durable_exec.temporal import AgentPlugin, PydanticAIPlugin -from temporalio.client import Client +from temporalio.client import Client, WorkflowHandle from temporalio.worker import Worker +from pydantic_ai.durable_exec.temporal import AgentPlugin, PydanticAIPlugin from .agents import build_agent from .datamodels import EventKind, EventStream from .streaming_handler import streaming_handler @@ -19,7 +20,7 @@ from .workflow import YahooFinanceSearchWorkflow -async def poll_events(workflow_handle): +async def poll_events(workflow_handle: WorkflowHandle[Any, str]) -> None: """ Poll for events from the workflow and print them. @@ -27,7 +28,7 @@ async def poll_events(workflow_handle): workflow_handle: Handle to the running workflow. """ while True: - event = await workflow_handle.query('event_stream', result_type=EventStream | None) + event = await workflow_handle.query('event_stream', result_type=Union[EventStream | None]) if event is None: await asyncio.sleep(0.1) continue @@ -41,7 +42,7 @@ async def poll_events(workflow_handle): print(f'\n--- Event ---\n{event.content}\n') -async def main(): +async def main() -> None: """ Main function to set up and run the Temporal workflow. @@ -76,11 +77,12 @@ async def main(): ): # Execute the workflow workflow_id = f'yahoo-finance-search-{uuid.uuid4()}' - workflow_handle = await client.start_workflow( - YahooFinanceSearchWorkflow.run, - args=['What are the latest financial metrics for Apple (AAPL)?'], + workflow_handle: WorkflowHandle[Any, str] = await client.start_workflow( + 'YahooFinanceSearchWorkflow', + arg='What are the latest financial metrics for Apple (AAPL)?', id=workflow_id, task_queue=task_queue, + result_type=str ) print(f'Started workflow with ID: {workflow_id}') diff --git a/examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py b/examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py index 67ecba6a15..7fc28781ab 100644 --- a/examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py +++ b/examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py @@ -6,6 +6,9 @@ from collections.abc import AsyncIterable +from temporalio import activity +from temporalio.client import WorkflowHandle + from pydantic_ai import ( AgentStreamEvent, FunctionToolCallEvent, @@ -15,17 +18,15 @@ TextPart, TextPartDelta, ThinkingPartDelta, - ToolCallPart, + ToolCallPart, RunContext, ) -from temporalio import activity - from .datamodels import AgentDependencies, EventKind, EventStream async def streaming_handler( - ctx, - event_stream_events: AsyncIterable[AgentStreamEvent], -): + ctx: RunContext, + event_stream_events: AsyncIterable[AgentStreamEvent], +) -> None: """ Handle streaming events from the agent. @@ -37,8 +38,11 @@ async def streaming_handler( ctx: The run context containing dependencies. event_stream_events: Async iterable of agent stream events. """ - output = '' - output_tool_delta = dict( + if not activity.in_activity(): + return + + output: str = '' + output_tool_delta: dict[str, str] = dict( tool_call_id='', tool_name_delta='', args_delta='', @@ -78,19 +82,10 @@ async def streaming_handler( output += f'\nTool Name: {output_tool_delta["tool_name_delta"]}' output += f'\nTool Args: {output_tool_delta["args_delta"]}' - events = [] - - # Create event stream if there's output - if output: - event = EventStream(kind=EventKind.EVENT, content=output) - events.append(event) - # Send events to workflow if running in an activity - if activity.in_activity(): - deps: AgentDependencies = ctx.deps + deps: AgentDependencies = ctx.deps - workflow_id = deps.workflow_id - run_id = deps.run_id - workflow_handle = activity.client().get_workflow_handle(workflow_id=workflow_id, run_id=run_id) - for event in events: - await workflow_handle.signal('append_event', arg=event) + workflow_id: str = deps.workflow_id + run_id: str = deps.run_id + workflow_handle: WorkflowHandle = activity.client().get_workflow_handle(workflow_id=workflow_id, run_id=run_id) + await workflow_handle.signal('append_event', arg=EventStream(kind=EventKind.EVENT, content=output)) diff --git a/examples/pydantic_ai_examples/temporal_streaming/utils.py b/examples/pydantic_ai_examples/temporal_streaming/utils.py index 45e902fe22..920a6dbd73 100644 --- a/examples/pydantic_ai_examples/temporal_streaming/utils.py +++ b/examples/pydantic_ai_examples/temporal_streaming/utils.py @@ -2,11 +2,12 @@ import os from copy import copy +from typing import Any import yaml -def recursively_modify_api_key(conf): +def recursively_modify_api_key(conf) -> dict[str, Any]: """ Recursively replace API key placeholders with environment variable values. @@ -41,7 +42,7 @@ def inner(_conf): return copy_conf -def read_config_yml(path): +def read_config_yml(path) -> dict[str, Any]: """ Read and process a YAML configuration file. diff --git a/examples/pydantic_ai_examples/temporal_streaming/workflow.py b/examples/pydantic_ai_examples/temporal_streaming/workflow.py index 0af1d5aae7..dfa1090a52 100644 --- a/examples/pydantic_ai_examples/temporal_streaming/workflow.py +++ b/examples/pydantic_ai_examples/temporal_streaming/workflow.py @@ -10,9 +10,9 @@ from datetime import timedelta from typing import Any -from pydantic_ai import UsageLimits from temporalio import activity, workflow +from pydantic_ai import UsageLimits from .agents import build_agent from .datamodels import AgentDependencies, EventKind, EventStream from .streaming_handler import streaming_handler @@ -33,7 +33,7 @@ def __init__(self): self.events: deque[EventStream] = deque() @workflow.run - async def run(self, user_prompt: str): + async def run(self, user_prompt: str) -> str: """ Execute the agent with the given user prompt. @@ -78,7 +78,7 @@ async def run(self, user_prompt: str): @staticmethod @activity.defn(name='retrieve_env_vars') - async def retrieve_env_vars(): + async def retrieve_env_vars() -> dict[str, Any]: """ Retrieve environment variables from configuration file. From 38209c69b6e7fe4bb7ad0ca9361182dd81018841 Mon Sep 17 00:00:00 2001 From: scalabrese Date: Fri, 17 Oct 2025 09:54:25 +0200 Subject: [PATCH 10/11] [Temporal - Documentation] Fixing PR errors --- .../temporal_streaming/agents.py | 11 ++++++----- .../temporal_streaming/main.py | 7 +++++-- .../temporal_streaming/streaming_handler.py | 15 +++++++++++---- .../temporal_streaming/utils.py | 17 +++++++++-------- .../temporal_streaming/workflow.py | 2 +- 5 files changed, 32 insertions(+), 20 deletions(-) diff --git a/examples/pydantic_ai_examples/temporal_streaming/agents.py b/examples/pydantic_ai_examples/temporal_streaming/agents.py index 8da4355487..a22c7d6ba7 100644 --- a/examples/pydantic_ai_examples/temporal_streaming/agents.py +++ b/examples/pydantic_ai_examples/temporal_streaming/agents.py @@ -4,6 +4,7 @@ and custom tools for data analysis. """ from datetime import timedelta +from typing import Any from temporalio.common import RetryPolicy from temporalio.workflow import ActivityConfig @@ -17,7 +18,7 @@ from .datamodels import AgentDependencies -async def get_mcp_toolsets() -> dict[str, FilteredToolset]: +async def get_mcp_toolsets() -> dict[str, FilteredToolset[AgentDependencies]]: """ Initialize MCP toolsets for the agent. @@ -34,7 +35,7 @@ async def get_mcp_toolsets() -> dict[str, FilteredToolset]: return {'yahoo': yf_server.filtered(lambda ctx, tool_def: True)} -async def get_claude_model(parallel_tool_calls: bool = True, **kwargs): +async def get_claude_model(parallel_tool_calls: bool = True, **kwargs: Any) -> AnthropicModel: """ Create and configure the Claude model. @@ -60,8 +61,8 @@ async def get_claude_model(parallel_tool_calls: bool = True, **kwargs): return model -async def build_agent(stream_handler: EventStreamHandler, - **kwargs) -> TemporalAgent: +async def build_agent(stream_handler: EventStreamHandler[AgentDependencies], + **kwargs: Any) -> TemporalAgent[AgentDependencies, str]: """ Build and configure the agent with tools and temporal settings. @@ -78,7 +79,7 @@ async def build_agent(stream_handler: EventStreamHandler, agent_name = 'YahooFinanceSearchAgent' toolsets = await get_mcp_toolsets() - agent = Agent( + agent: Agent[AgentDependencies, str] = Agent[AgentDependencies, str]( name=agent_name, model=await get_claude_model(**kwargs), toolsets=[*toolsets.values()], diff --git a/examples/pydantic_ai_examples/temporal_streaming/main.py b/examples/pydantic_ai_examples/temporal_streaming/main.py index 46f24d308e..e49969a297 100644 --- a/examples/pydantic_ai_examples/temporal_streaming/main.py +++ b/examples/pydantic_ai_examples/temporal_streaming/main.py @@ -28,7 +28,9 @@ async def poll_events(workflow_handle: WorkflowHandle[Any, str]) -> None: workflow_handle: Handle to the running workflow. """ while True: - event = await workflow_handle.query('event_stream', result_type=Union[EventStream | None]) + event: Union[EventStream, None] = await workflow_handle.query('event_stream', + result_type=Union[ + EventStream, None]) # type: ignore[misc] if event is None: await asyncio.sleep(0.1) continue @@ -55,6 +57,7 @@ async def main() -> None: """ # Connect to Temporal server client = await Client.connect( + # target_host='localhost:7233', target_host='localhost:7233', plugins=[PydanticAIPlugin()], ) @@ -77,7 +80,7 @@ async def main() -> None: ): # Execute the workflow workflow_id = f'yahoo-finance-search-{uuid.uuid4()}' - workflow_handle: WorkflowHandle[Any, str] = await client.start_workflow( + workflow_handle: WorkflowHandle[Any, str] = await client.start_workflow( # type: ignore[misc] 'YahooFinanceSearchWorkflow', arg='What are the latest financial metrics for Apple (AAPL)?', id=workflow_id, diff --git a/examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py b/examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py index 7fc28781ab..18fd830a1e 100644 --- a/examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py +++ b/examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py @@ -24,7 +24,7 @@ async def streaming_handler( - ctx: RunContext, + ctx: RunContext[AgentDependencies], event_stream_events: AsyncIterable[AgentStreamEvent], ) -> None: """ @@ -74,18 +74,25 @@ async def streaming_handler( if len(output_tool_delta['tool_call_id']) == 0: output_tool_delta['tool_call_id'] += event.delta.tool_call_id or '' output_tool_delta['tool_name_delta'] += event.delta.tool_name_delta or '' - output_tool_delta['args_delta'] += event.delta.args_delta or '' + # Handle args_delta which can be str or dict + args_delta = event.delta.args_delta + if isinstance(args_delta, str): + output_tool_delta['args_delta'] += args_delta + elif isinstance(args_delta, dict): + output_tool_delta['args_delta'] += str(args_delta) # Add accumulated tool delta output if present if len(output_tool_delta['tool_call_id']): output += f'\nTool Call Id: {output_tool_delta["tool_call_id"]}' output += f'\nTool Name: {output_tool_delta["tool_name_delta"]}' - output += f'\nTool Args: {output_tool_delta["args_delta"]}' + args_delta_str = str(output_tool_delta["args_delta"]) + output += f'\nTool Args: {args_delta_str}' # Send events to workflow if running in an activity deps: AgentDependencies = ctx.deps workflow_id: str = deps.workflow_id run_id: str = deps.run_id - workflow_handle: WorkflowHandle = activity.client().get_workflow_handle(workflow_id=workflow_id, run_id=run_id) + from typing import Any + workflow_handle: WorkflowHandle[Any, Any] = activity.client().get_workflow_handle(workflow_id=workflow_id, run_id=run_id) # type: ignore[misc] await workflow_handle.signal('append_event', arg=EventStream(kind=EventKind.EVENT, content=output)) diff --git a/examples/pydantic_ai_examples/temporal_streaming/utils.py b/examples/pydantic_ai_examples/temporal_streaming/utils.py index 920a6dbd73..95c2a3797d 100644 --- a/examples/pydantic_ai_examples/temporal_streaming/utils.py +++ b/examples/pydantic_ai_examples/temporal_streaming/utils.py @@ -7,7 +7,7 @@ import yaml -def recursively_modify_api_key(conf) -> dict[str, Any]: +def recursively_modify_api_key(conf: dict[str, Any]) -> dict[str, Any]: """ Recursively replace API key placeholders with environment variable values. @@ -22,16 +22,17 @@ def recursively_modify_api_key(conf) -> dict[str, Any]: A copy of the configuration with API keys replaced by environment variable values. """ - def inner(_conf): + def inner(_conf: dict[str, Any]) -> None: for key, value in _conf.items(): if isinstance(value, dict): - inner(value) + inner(value) # type: ignore[arg-type] elif isinstance(value, list): - if len(value) > 0 and isinstance(value[0], dict): - for item in value: - inner(item) + if len(value) > 0 and isinstance(value[0], dict): # type: ignore[misc] + item: dict[str, Any] + for item in value: # type: ignore[assignment,misc] + inner(item) # type: ignore[arg-type] else: - _conf[key] = [os.environ.get(str(v), v) for v in value] + _conf[key] = [str(os.environ.get(str(v), v)) for v in value] # type: ignore[misc] elif isinstance(value, str): _conf[key] = os.environ.get(value, value) else: @@ -42,7 +43,7 @@ def inner(_conf): return copy_conf -def read_config_yml(path) -> dict[str, Any]: +def read_config_yml(path: str) -> dict[str, Any]: """ Read and process a YAML configuration file. diff --git a/examples/pydantic_ai_examples/temporal_streaming/workflow.py b/examples/pydantic_ai_examples/temporal_streaming/workflow.py index dfa1090a52..cfd19aaa80 100644 --- a/examples/pydantic_ai_examples/temporal_streaming/workflow.py +++ b/examples/pydantic_ai_examples/temporal_streaming/workflow.py @@ -44,7 +44,7 @@ async def run(self, user_prompt: str) -> str: The agent's final output. """ # Retrieve environment variables from configuration - wf_vars = await workflow.execute_activity( + wf_vars = await workflow.execute_activity( # type: ignore[misc] activity='retrieve_env_vars', start_to_close_timeout=timedelta(seconds=10), result_type=dict[str, Any], From f894720710c64c58b4d30ce5d6a7a0ca0ad633c2 Mon Sep 17 00:00:00 2001 From: scalabrese Date: Fri, 17 Oct 2025 10:36:45 +0200 Subject: [PATCH 11/11] [Temporal - Documentation] Fixing PR errors --- examples/pydantic_ai_examples/temporal_streaming/main.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/examples/pydantic_ai_examples/temporal_streaming/main.py b/examples/pydantic_ai_examples/temporal_streaming/main.py index e49969a297..e24a8ec98a 100644 --- a/examples/pydantic_ai_examples/temporal_streaming/main.py +++ b/examples/pydantic_ai_examples/temporal_streaming/main.py @@ -7,7 +7,7 @@ import asyncio import os import uuid -from typing import Any, Union +from typing import Any from temporalio.client import Client, WorkflowHandle from temporalio.worker import Worker @@ -28,9 +28,8 @@ async def poll_events(workflow_handle: WorkflowHandle[Any, str]) -> None: workflow_handle: Handle to the running workflow. """ while True: - event: Union[EventStream, None] = await workflow_handle.query('event_stream', - result_type=Union[ - EventStream, None]) # type: ignore[misc] + event: EventStream | None = await workflow_handle.query('event_stream', + result_type=EventStream | None) # type: ignore[misc] if event is None: await asyncio.sleep(0.1) continue