From ccaf87f2afdacaeb80e21b44ee17fa6e9c24fd12 Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Sun, 12 Oct 2025 09:53:08 -0600 Subject: [PATCH 01/18] Add run_stream_sync --- .../pydantic_ai/agent/abstract.py | 80 ++++++++ pydantic_ai_slim/pydantic_ai/result.py | 83 +++++++- tests/test_streaming.py | 194 ++++++++++++++++++ 3 files changed, 356 insertions(+), 1 deletion(-) diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index 72be9a4f7a..ccfa10f313 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -569,6 +569,86 @@ async def on_complete() -> None: if not yielded: raise exceptions.AgentRunError('Agent run finished without producing a final result') # pragma: no cover + @contextmanager + def run_stream_sync( + self, + user_prompt: str | Sequence[_messages.UserContent] | None = None, + *, + output_type: OutputSpec[RunOutputDataT] | None = None, + message_history: Sequence[_messages.ModelMessage] | None = None, + deferred_tool_results: DeferredToolResults | None = None, + model: models.Model | models.KnownModelName | str | None = None, + deps: AgentDepsT = None, + model_settings: ModelSettings | None = None, + usage_limits: _usage.UsageLimits | None = None, + usage: _usage.RunUsage | None = None, + infer_name: bool = True, + toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, + builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, + ) -> Iterator[result.CollectedRunResult[AgentDepsT, Any]]: + """Run the agent with a user prompt in collected streaming mode. + + This method builds an internal agent graph (using system prompts, tools and output schemas) and then + runs the graph until the model produces output matching the `output_type`, for example text or structured data. + At this point, a streaming run result object is collected and -- once this output has completed streaming -- you can iterate over the complete output, message history, and usage. + + As this method will consider the first output matching the `output_type` to be the final output, + it will stop running the agent graph and will not execute any tool calls made by the model after this "final" output. + If you want to always run the agent graph to completion and stream events and output at the same time, + use [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] with an `event_stream_handler` or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] instead. + + Example: + ```python + from pydantic_ai import Agent + + agent = Agent('openai:gpt-4o') + + def main(): + with agent.run_stream_sync('What is the capital of the UK?') as response: + print(response.get_output()) + #> The capital of the UK is London. + ``` + + Args: + user_prompt: User input to start/continue the conversation. + output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no + output validators since output validators would expect an argument that matches the agent's output type. + message_history: History of the conversation so far. + deferred_tool_results: Optional results for deferred tool calls in the message history. + model: Optional model to use for this run, required if `model` was not set when creating the agent. + deps: Optional dependencies to use for this run. + model_settings: Optional settings to use for this model's request. + usage_limits: Optional limits on model request count or token usage. + usage: Optional usage to start with, useful for resuming a conversation or agents used in tools. + infer_name: Whether to try to infer the agent name from the call frame if it's not set. + toolsets: Optional additional toolsets for this run. + builtin_tools: Optional additional builtin tools for this run. + event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run. + It will receive all the events up until the final result is found, which you can then read or stream from inside the context manager. + Note that it does _not_ receive any events after the final result is found. + + Returns: + The result of the run. + """ + async_cm = self.run_stream( + user_prompt, + output_type=output_type, + message_history=message_history, + deferred_tool_results=deferred_tool_results, + model=model, + deps=deps, + model_settings=model_settings, + usage_limits=usage_limits, + usage=usage, + infer_name=infer_name, + toolsets=toolsets, + builtin_tools=builtin_tools, + event_stream_handler=event_stream_handler, + ) + async_result = get_event_loop().run_until_complete(async_cm.__aenter__()) + yield result.CollectedRunResult.from_streamed_result(async_result) # type: ignore[reportReturnType] + @overload def run_stream_events( self, diff --git a/pydantic_ai_slim/pydantic_ai/result.py b/pydantic_ai_slim/pydantic_ai/result.py index f5b542953e..1e0d6e3025 100644 --- a/pydantic_ai_slim/pydantic_ai/result.py +++ b/pydantic_ai_slim/pydantic_ai/result.py @@ -1,6 +1,6 @@ from __future__ import annotations as _annotations -from collections.abc import AsyncIterator, Awaitable, Callable, Iterable +from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Iterator from copy import deepcopy from dataclasses import dataclass, field from datetime import datetime @@ -9,6 +9,8 @@ from pydantic import ValidationError from typing_extensions import TypeVar, deprecated +from pydantic_graph._utils import get_event_loop + from . import _utils, exceptions, messages as _messages, models from ._output import ( OutputDataT_inv, @@ -543,6 +545,85 @@ async def _marked_completed(self, message: _messages.ModelResponse | None = None await self._on_complete() +@dataclass(init=False) +class CollectedRunResult(StreamedRunResult[AgentDepsT, OutputDataT]): + """Provides a synchronous API over 'StreamedRunResult' by eagerly loading the stream.""" + + @classmethod + def from_streamed_result( + cls, streamed_run_result: StreamedRunResult[AgentDepsT, OutputDataT] + ) -> CollectedRunResult[AgentDepsT, OutputDataT]: + """Create a CollectedRunResult from an existing StreamedRunResult.""" + instance = cls.__new__(cls) + + instance._all_messages = streamed_run_result._all_messages + instance._new_message_index = streamed_run_result._new_message_index + instance._stream_response = streamed_run_result._stream_response + instance._on_complete = streamed_run_result._on_complete + instance._run_result = streamed_run_result._run_result + instance.is_complete = streamed_run_result.is_complete + + return instance + + def _collect_async_iterator(self, async_iter: AsyncIterator[T]) -> list[T]: + async def collect(): + return [item async for item in async_iter] + + return get_event_loop().run_until_complete(collect()) + + def stream_output(self, *, debounce_by: float | None = 0.1) -> Iterator[OutputDataT]: # type: ignore[reportIncompatibleMethodOverride] + """Collect and stream the output as an iterable. + + The pydantic validator for structured data will be called in + [partial mode](https://docs.pydantic.dev/dev/concepts/experimental/#partial-validation) + on each iteration. + + Args: + debounce_by: by how much (if at all) to debounce/group the output chunks by. `None` means no debouncing. + Debouncing is particularly important for long structured outputs to reduce the overhead of + performing validation as each token is received. + + Returns: + An iterable of the response data. + """ + async_stream = super().stream_output(debounce_by=debounce_by) + yield from self._collect_async_iterator(async_stream) + + def stream_text(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> Iterator[str]: # type: ignore[reportIncompatibleMethodOverride] + """Collect and stream the text result as an iterable. + + !!! note + Result validators will NOT be called on the text result if `delta=True`. + + Args: + delta: if `True`, yield each chunk of text as it is received, if `False` (default), yield the full text + up to the current point. + debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing. + Debouncing is particularly important for long structured responses to reduce the overhead of + performing validation as each token is received. + """ + async_stream = super().stream_text(delta=delta, debounce_by=debounce_by) + yield from self._collect_async_iterator(async_stream) + + def stream_responses(self, *, debounce_by: float | None = 0.1) -> Iterator[tuple[_messages.ModelResponse, bool]]: # type: ignore[reportIncompatibleMethodOverride] + """Collect and stream the response as an iterable of Structured LLM Messages. + + Args: + debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing. + Debouncing is particularly important for long structured responses to reduce the overhead of + performing validation as each token is received. + + Returns: + An iterable of the structured response message and whether that is the last message. + """ + async_stream = super().stream_responses(debounce_by=debounce_by) + yield from self._collect_async_iterator(async_stream) + + def get_output(self) -> OutputDataT: # type: ignore[reportIncompatibleMethodOverride] + """Stream the whole response, validate and return it.""" + return get_event_loop().run_until_complete(super().get_output()) + + @dataclass(repr=False) class FinalResult(Generic[OutputDataT]): """Marker class storing the final output of an agent run and associated metadata.""" diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 537b76e03d..c453993026 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -134,6 +134,86 @@ async def ret_a(x: str) -> str: ) +def test_streamed_text_sync_response(): + m = TestModel() + + test_agent = Agent(m) + assert test_agent.name is None + + @test_agent.tool_plain + async def ret_a(x: str) -> str: + return f'{x}-apple' + + with test_agent.run_stream_sync('Hello') as result: + # assert test_agent.name == 'test_agent' + assert not result.is_complete + assert result.all_messages() == snapshot( + [ + ModelRequest(parts=[UserPromptPart(content='Hello', timestamp=IsNow(tz=timezone.utc))]), + ModelResponse( + parts=[ToolCallPart(tool_name='ret_a', args={'x': 'a'}, tool_call_id=IsStr())], + usage=RequestUsage(input_tokens=51), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ModelRequest( + parts=[ + ToolReturnPart( + tool_name='ret_a', content='a-apple', timestamp=IsNow(tz=timezone.utc), tool_call_id=IsStr() + ) + ] + ), + ] + ) + assert result.usage() == snapshot( + RunUsage( + requests=2, + input_tokens=103, + output_tokens=5, + tool_calls=1, + ) + ) + response = result.get_output() + assert response == snapshot('{"ret_a":"a-apple"}') + assert result.is_complete + assert result.timestamp() == IsNow(tz=timezone.utc) + assert result.all_messages() == snapshot( + [ + ModelRequest(parts=[UserPromptPart(content='Hello', timestamp=IsNow(tz=timezone.utc))]), + ModelResponse( + parts=[ToolCallPart(tool_name='ret_a', args={'x': 'a'}, tool_call_id=IsStr())], + usage=RequestUsage(input_tokens=51), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ModelRequest( + parts=[ + ToolReturnPart( + tool_name='ret_a', content='a-apple', timestamp=IsNow(tz=timezone.utc), tool_call_id=IsStr() + ) + ] + ), + ModelResponse( + parts=[TextPart(content='{"ret_a":"a-apple"}')], + usage=RequestUsage(input_tokens=52, output_tokens=11), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ] + ) + assert result.usage() == snapshot( + RunUsage( + requests=2, + input_tokens=103, + output_tokens=11, + tool_calls=1, + ) + ) + + async def test_streamed_structured_response(): m = TestModel() @@ -302,6 +382,120 @@ def upcase(text: str) -> str: ) +def test_streamed_text_stream_sync(): + m = TestModel(custom_output_text='The cat sat on the mat.') + + agent = Agent(m) + + with agent.run_stream_sync('Hello') as result: + # typehint to test (via static typing) that the stream type is correctly inferred + chunks: list[str] = [c for c in result.stream_text()] + # one chunk with `stream_text()` due to group_by_temporal + assert chunks == snapshot(['The cat sat on the mat.']) + assert result.is_complete + + with agent.run_stream_sync('Hello') as result: + # typehint to test (via static typing) that the stream type is correctly inferred + chunks: list[str] = [c for c in result.stream_output()] + # two chunks with `stream()` due to not-final vs. final + assert chunks == snapshot(['The cat sat on the mat.', 'The cat sat on the mat.']) + assert result.is_complete + + with agent.run_stream_sync('Hello') as result: + assert [c for c in result.stream_text(debounce_by=None)] == snapshot( + [ + 'The ', + 'The cat ', + 'The cat sat ', + 'The cat sat on ', + 'The cat sat on the ', + 'The cat sat on the mat.', + ] + ) + + with agent.run_stream_sync('Hello') as result: + # with stream_text, there is no need to do partial validation, so we only get the final message once: + assert [c for c in result.stream_text(delta=False, debounce_by=None)] == snapshot( + ['The ', 'The cat ', 'The cat sat ', 'The cat sat on ', 'The cat sat on the ', 'The cat sat on the mat.'] + ) + + with agent.run_stream_sync('Hello') as result: + assert [c for c in result.stream_text(delta=True, debounce_by=None)] == snapshot( + ['The ', 'cat ', 'sat ', 'on ', 'the ', 'mat.'] + ) + + def upcase(text: str) -> str: + return text.upper() + + with agent.run_stream_sync('Hello', output_type=TextOutput(upcase)) as result: + assert [c for c in result.stream_output(debounce_by=None)] == snapshot( + [ + 'THE ', + 'THE CAT ', + 'THE CAT SAT ', + 'THE CAT SAT ON ', + 'THE CAT SAT ON THE ', + 'THE CAT SAT ON THE MAT.', + 'THE CAT SAT ON THE MAT.', + ] + ) + + with agent.run_stream_sync('Hello') as result: + assert [c for c, _is_last in result.stream_responses(debounce_by=None)] == snapshot( + [ + ModelResponse( + parts=[TextPart(content='The ')], + usage=RequestUsage(input_tokens=51, output_tokens=1), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ModelResponse( + parts=[TextPart(content='The cat ')], + usage=RequestUsage(input_tokens=51, output_tokens=2), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ModelResponse( + parts=[TextPart(content='The cat sat ')], + usage=RequestUsage(input_tokens=51, output_tokens=3), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ModelResponse( + parts=[TextPart(content='The cat sat on ')], + usage=RequestUsage(input_tokens=51, output_tokens=4), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ModelResponse( + parts=[TextPart(content='The cat sat on the ')], + usage=RequestUsage(input_tokens=51, output_tokens=5), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ModelResponse( + parts=[TextPart(content='The cat sat on the mat.')], + usage=RequestUsage(input_tokens=51, output_tokens=7), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ModelResponse( + parts=[TextPart(content='The cat sat on the mat.')], + usage=RequestUsage(input_tokens=51, output_tokens=7), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ] + ) + + async def test_plain_response(): call_index = 0 From 8623cb9064f7df5289836fabe9a866b4b8f2fa39 Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Mon, 13 Oct 2025 07:34:08 -0600 Subject: [PATCH 02/18] add lazy implementation --- .../pydantic_ai/agent/abstract.py | 9 ++--- pydantic_ai_slim/pydantic_ai/result.py | 33 +++++++++++-------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index ccfa10f313..e1e3de508b 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -586,12 +586,13 @@ def run_stream_sync( toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, - ) -> Iterator[result.CollectedRunResult[AgentDepsT, Any]]: - """Run the agent with a user prompt in collected streaming mode. + ) -> Iterator[result.SyncStreamedRunResult[AgentDepsT, Any]]: + """Run the agent with a user prompt in sync streaming mode. This method builds an internal agent graph (using system prompts, tools and output schemas) and then runs the graph until the model produces output matching the `output_type`, for example text or structured data. - At this point, a streaming run result object is collected and -- once this output has completed streaming -- you can iterate over the complete output, message history, and usage. + At this point, a streaming run result object is yielded from which you can stream the output as it comes in, + and -- once this output has completed streaming -- get the complete output, message history, and usage. As this method will consider the first output matching the `output_type` to be the final output, it will stop running the agent graph and will not execute any tool calls made by the model after this "final" output. @@ -647,7 +648,7 @@ def main(): event_stream_handler=event_stream_handler, ) async_result = get_event_loop().run_until_complete(async_cm.__aenter__()) - yield result.CollectedRunResult.from_streamed_result(async_result) # type: ignore[reportReturnType] + yield result.SyncStreamedRunResult.from_streamed_result(async_result) # type: ignore[reportReturnType] @overload def run_stream_events( diff --git a/pydantic_ai_slim/pydantic_ai/result.py b/pydantic_ai_slim/pydantic_ai/result.py index 1e0d6e3025..3919076856 100644 --- a/pydantic_ai_slim/pydantic_ai/result.py +++ b/pydantic_ai_slim/pydantic_ai/result.py @@ -546,14 +546,14 @@ async def _marked_completed(self, message: _messages.ModelResponse | None = None @dataclass(init=False) -class CollectedRunResult(StreamedRunResult[AgentDepsT, OutputDataT]): - """Provides a synchronous API over 'StreamedRunResult' by eagerly loading the stream.""" +class SyncStreamedRunResult(StreamedRunResult[AgentDepsT, OutputDataT]): + """Provides a synchronous API over 'StreamedRunResult'.""" @classmethod def from_streamed_result( cls, streamed_run_result: StreamedRunResult[AgentDepsT, OutputDataT] - ) -> CollectedRunResult[AgentDepsT, OutputDataT]: - """Create a CollectedRunResult from an existing StreamedRunResult.""" + ) -> SyncStreamedRunResult[AgentDepsT, OutputDataT]: + """Create a 'SyncStreamedRunResult' from an existing 'StreamedRunResult'.""" instance = cls.__new__(cls) instance._all_messages = streamed_run_result._all_messages @@ -565,14 +565,19 @@ def from_streamed_result( return instance - def _collect_async_iterator(self, async_iter: AsyncIterator[T]) -> list[T]: - async def collect(): - return [item async for item in async_iter] + def _lazy_async_iterator(self, async_iter: AsyncIterator[T]) -> Iterator[T]: + """Lazily yield items from async iterator as they're requested.""" + loop = get_event_loop() - return get_event_loop().run_until_complete(collect()) + while True: + try: + item = loop.run_until_complete(async_iter.__anext__()) + yield item + except StopAsyncIteration: + break def stream_output(self, *, debounce_by: float | None = 0.1) -> Iterator[OutputDataT]: # type: ignore[reportIncompatibleMethodOverride] - """Collect and stream the output as an iterable. + """Stream the output as an iterable. The pydantic validator for structured data will be called in [partial mode](https://docs.pydantic.dev/dev/concepts/experimental/#partial-validation) @@ -587,10 +592,10 @@ def stream_output(self, *, debounce_by: float | None = 0.1) -> Iterator[OutputDa An iterable of the response data. """ async_stream = super().stream_output(debounce_by=debounce_by) - yield from self._collect_async_iterator(async_stream) + yield from self._lazy_async_iterator(async_stream) def stream_text(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> Iterator[str]: # type: ignore[reportIncompatibleMethodOverride] - """Collect and stream the text result as an iterable. + """Stream the text result as an iterable. !!! note Result validators will NOT be called on the text result if `delta=True`. @@ -603,10 +608,10 @@ def stream_text(self, *, delta: bool = False, debounce_by: float | None = 0.1) - performing validation as each token is received. """ async_stream = super().stream_text(delta=delta, debounce_by=debounce_by) - yield from self._collect_async_iterator(async_stream) + yield from self._lazy_async_iterator(async_stream) def stream_responses(self, *, debounce_by: float | None = 0.1) -> Iterator[tuple[_messages.ModelResponse, bool]]: # type: ignore[reportIncompatibleMethodOverride] - """Collect and stream the response as an iterable of Structured LLM Messages. + """Stream the response as an iterable of Structured LLM Messages. Args: debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing. @@ -617,7 +622,7 @@ def stream_responses(self, *, debounce_by: float | None = 0.1) -> Iterator[tuple An iterable of the structured response message and whether that is the last message. """ async_stream = super().stream_responses(debounce_by=debounce_by) - yield from self._collect_async_iterator(async_stream) + yield from self._lazy_async_iterator(async_stream) def get_output(self) -> OutputDataT: # type: ignore[reportIncompatibleMethodOverride] """Stream the whole response, validate and return it.""" From 6e74a2a8c69e88d52b7766a4810b84759035c583 Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Thu, 16 Oct 2025 16:25:02 -0600 Subject: [PATCH 03/18] add _sync methods to StreamedRunResult --- .../pydantic_ai/agent/abstract.py | 5 +- pydantic_ai_slim/pydantic_ai/result.py | 149 ++++++++---------- tests/test_streaming.py | 16 +- 3 files changed, 75 insertions(+), 95 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index 91cb78a9c2..89a3bb9d53 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -598,7 +598,7 @@ def run_stream_sync( toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, - ) -> Iterator[result.SyncStreamedRunResult[AgentDepsT, Any]]: + ) -> Iterator[result.StreamedRunResult[AgentDepsT, Any]]: """Run the agent with a user prompt in sync streaming mode. This method builds an internal agent graph (using system prompts, tools and output schemas) and then @@ -659,8 +659,7 @@ def main(): builtin_tools=builtin_tools, event_stream_handler=event_stream_handler, ) - async_result = get_event_loop().run_until_complete(async_cm.__aenter__()) - yield result.SyncStreamedRunResult.from_streamed_result(async_result) # type: ignore[reportReturnType] + yield get_event_loop().run_until_complete(async_cm.__aenter__()) @overload def run_stream_events( diff --git a/pydantic_ai_slim/pydantic_ai/result.py b/pydantic_ai_slim/pydantic_ai/result.py index 3919076856..c35f89d74e 100644 --- a/pydantic_ai_slim/pydantic_ai/result.py +++ b/pydantic_ai_slim/pydantic_ai/result.py @@ -410,6 +410,24 @@ async def stream_output(self, *, debounce_by: float | None = 0.1) -> AsyncIterat else: raise ValueError('No stream response or run result provided') # pragma: no cover + def stream_output_sync(self, *, debounce_by: float | None = 0.1) -> Iterator[OutputDataT]: + """Stream the output as an iterable. + + The pydantic validator for structured data will be called in + [partial mode](https://docs.pydantic.dev/dev/concepts/experimental/#partial-validation) + on each iteration. + + Args: + debounce_by: by how much (if at all) to debounce/group the output chunks by. `None` means no debouncing. + Debouncing is particularly important for long structured outputs to reduce the overhead of + performing validation as each token is received. + + Returns: + An iterable of the response data. + """ + async_stream = self.stream_output(debounce_by=debounce_by) + yield from _lazy_async_iterator(async_stream) + async def stream_text(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> AsyncIterator[str]: """Stream the text result as an async iterable. @@ -438,6 +456,22 @@ async def stream_text(self, *, delta: bool = False, debounce_by: float | None = else: raise ValueError('No stream response or run result provided') # pragma: no cover + def stream_text_sync(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> Iterator[str]: + """Stream the text result as an async iterable. + + !!! note + Result validators will NOT be called on the text result if `delta=True`. + + Args: + delta: if `True`, yield each chunk of text as it is received, if `False` (default), yield the full text + up to the current point. + debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing. + Debouncing is particularly important for long structured responses to reduce the overhead of + performing validation as each token is received. + """ + async_stream = self.stream_text(delta=delta, debounce_by=debounce_by) + yield from _lazy_async_iterator(async_stream) + @deprecated('`StreamedRunResult.stream_structured` is deprecated, use `stream_responses` instead.') async def stream_structured( self, *, debounce_by: float | None = 0.1 @@ -473,6 +507,22 @@ async def stream_responses( else: raise ValueError('No stream response or run result provided') # pragma: no cover + def stream_responses_sync( + self, *, debounce_by: float | None = 0.1 + ) -> Iterator[tuple[_messages.ModelResponse, bool]]: + """Stream the response as an iterable of Structured LLM Messages. + + Args: + debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing. + Debouncing is particularly important for long structured responses to reduce the overhead of + performing validation as each token is received. + + Returns: + An iterable of the structured response message and whether that is the last message. + """ + async_stream = self.stream_responses(debounce_by=debounce_by) + yield from _lazy_async_iterator(async_stream) + async def get_output(self) -> OutputDataT: """Stream the whole response, validate and return it.""" if self._run_result is not None: @@ -486,6 +536,10 @@ async def get_output(self) -> OutputDataT: else: raise ValueError('No stream response or run result provided') # pragma: no cover + def get_output_sync(self) -> OutputDataT: + """Stream the whole response, validate and return it.""" + return get_event_loop().run_until_complete(self.get_output()) + @property def response(self) -> _messages.ModelResponse: """Return the current state of the response.""" @@ -545,90 +599,6 @@ async def _marked_completed(self, message: _messages.ModelResponse | None = None await self._on_complete() -@dataclass(init=False) -class SyncStreamedRunResult(StreamedRunResult[AgentDepsT, OutputDataT]): - """Provides a synchronous API over 'StreamedRunResult'.""" - - @classmethod - def from_streamed_result( - cls, streamed_run_result: StreamedRunResult[AgentDepsT, OutputDataT] - ) -> SyncStreamedRunResult[AgentDepsT, OutputDataT]: - """Create a 'SyncStreamedRunResult' from an existing 'StreamedRunResult'.""" - instance = cls.__new__(cls) - - instance._all_messages = streamed_run_result._all_messages - instance._new_message_index = streamed_run_result._new_message_index - instance._stream_response = streamed_run_result._stream_response - instance._on_complete = streamed_run_result._on_complete - instance._run_result = streamed_run_result._run_result - instance.is_complete = streamed_run_result.is_complete - - return instance - - def _lazy_async_iterator(self, async_iter: AsyncIterator[T]) -> Iterator[T]: - """Lazily yield items from async iterator as they're requested.""" - loop = get_event_loop() - - while True: - try: - item = loop.run_until_complete(async_iter.__anext__()) - yield item - except StopAsyncIteration: - break - - def stream_output(self, *, debounce_by: float | None = 0.1) -> Iterator[OutputDataT]: # type: ignore[reportIncompatibleMethodOverride] - """Stream the output as an iterable. - - The pydantic validator for structured data will be called in - [partial mode](https://docs.pydantic.dev/dev/concepts/experimental/#partial-validation) - on each iteration. - - Args: - debounce_by: by how much (if at all) to debounce/group the output chunks by. `None` means no debouncing. - Debouncing is particularly important for long structured outputs to reduce the overhead of - performing validation as each token is received. - - Returns: - An iterable of the response data. - """ - async_stream = super().stream_output(debounce_by=debounce_by) - yield from self._lazy_async_iterator(async_stream) - - def stream_text(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> Iterator[str]: # type: ignore[reportIncompatibleMethodOverride] - """Stream the text result as an iterable. - - !!! note - Result validators will NOT be called on the text result if `delta=True`. - - Args: - delta: if `True`, yield each chunk of text as it is received, if `False` (default), yield the full text - up to the current point. - debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing. - Debouncing is particularly important for long structured responses to reduce the overhead of - performing validation as each token is received. - """ - async_stream = super().stream_text(delta=delta, debounce_by=debounce_by) - yield from self._lazy_async_iterator(async_stream) - - def stream_responses(self, *, debounce_by: float | None = 0.1) -> Iterator[tuple[_messages.ModelResponse, bool]]: # type: ignore[reportIncompatibleMethodOverride] - """Stream the response as an iterable of Structured LLM Messages. - - Args: - debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing. - Debouncing is particularly important for long structured responses to reduce the overhead of - performing validation as each token is received. - - Returns: - An iterable of the structured response message and whether that is the last message. - """ - async_stream = super().stream_responses(debounce_by=debounce_by) - yield from self._lazy_async_iterator(async_stream) - - def get_output(self) -> OutputDataT: # type: ignore[reportIncompatibleMethodOverride] - """Stream the whole response, validate and return it.""" - return get_event_loop().run_until_complete(super().get_output()) - - @dataclass(repr=False) class FinalResult(Generic[OutputDataT]): """Marker class storing the final output of an agent run and associated metadata.""" @@ -645,6 +615,17 @@ class FinalResult(Generic[OutputDataT]): __repr__ = _utils.dataclasses_no_defaults_repr +def _lazy_async_iterator(async_iter: AsyncIterator[T]) -> Iterator[T]: + loop = get_event_loop() + + while True: + try: + item = loop.run_until_complete(async_iter.__anext__()) + yield item + except StopAsyncIteration: + break + + def _get_usage_checking_stream_response( stream_response: models.StreamedResponse, limits: UsageLimits | None, diff --git a/tests/test_streaming.py b/tests/test_streaming.py index c453993026..0e240b2d8d 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -174,7 +174,7 @@ async def ret_a(x: str) -> str: tool_calls=1, ) ) - response = result.get_output() + response = result.get_output_sync() assert response == snapshot('{"ret_a":"a-apple"}') assert result.is_complete assert result.timestamp() == IsNow(tz=timezone.utc) @@ -389,20 +389,20 @@ def test_streamed_text_stream_sync(): with agent.run_stream_sync('Hello') as result: # typehint to test (via static typing) that the stream type is correctly inferred - chunks: list[str] = [c for c in result.stream_text()] + chunks: list[str] = [c for c in result.stream_text_sync()] # one chunk with `stream_text()` due to group_by_temporal assert chunks == snapshot(['The cat sat on the mat.']) assert result.is_complete with agent.run_stream_sync('Hello') as result: # typehint to test (via static typing) that the stream type is correctly inferred - chunks: list[str] = [c for c in result.stream_output()] + chunks: list[str] = [c for c in result.stream_output_sync()] # two chunks with `stream()` due to not-final vs. final assert chunks == snapshot(['The cat sat on the mat.', 'The cat sat on the mat.']) assert result.is_complete with agent.run_stream_sync('Hello') as result: - assert [c for c in result.stream_text(debounce_by=None)] == snapshot( + assert [c for c in result.stream_text_sync(debounce_by=None)] == snapshot( [ 'The ', 'The cat ', @@ -415,12 +415,12 @@ def test_streamed_text_stream_sync(): with agent.run_stream_sync('Hello') as result: # with stream_text, there is no need to do partial validation, so we only get the final message once: - assert [c for c in result.stream_text(delta=False, debounce_by=None)] == snapshot( + assert [c for c in result.stream_text_sync(delta=False, debounce_by=None)] == snapshot( ['The ', 'The cat ', 'The cat sat ', 'The cat sat on ', 'The cat sat on the ', 'The cat sat on the mat.'] ) with agent.run_stream_sync('Hello') as result: - assert [c for c in result.stream_text(delta=True, debounce_by=None)] == snapshot( + assert [c for c in result.stream_text_sync(delta=True, debounce_by=None)] == snapshot( ['The ', 'cat ', 'sat ', 'on ', 'the ', 'mat.'] ) @@ -428,7 +428,7 @@ def upcase(text: str) -> str: return text.upper() with agent.run_stream_sync('Hello', output_type=TextOutput(upcase)) as result: - assert [c for c in result.stream_output(debounce_by=None)] == snapshot( + assert [c for c in result.stream_output_sync(debounce_by=None)] == snapshot( [ 'THE ', 'THE CAT ', @@ -441,7 +441,7 @@ def upcase(text: str) -> str: ) with agent.run_stream_sync('Hello') as result: - assert [c for c, _is_last in result.stream_responses(debounce_by=None)] == snapshot( + assert [c for c, _is_last in result.stream_responses_sync(debounce_by=None)] == snapshot( [ ModelResponse( parts=[TextPart(content='The ')], From db860ed529084a833ac0a20e254e896327bb73d6 Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Thu, 16 Oct 2025 21:38:08 -0600 Subject: [PATCH 04/18] fix doctest --- pydantic_ai_slim/pydantic_ai/agent/abstract.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index 89a3bb9d53..a68584ad45 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -619,7 +619,7 @@ def run_stream_sync( def main(): with agent.run_stream_sync('What is the capital of the UK?') as response: - print(response.get_output()) + print(response.get_output_sync()) #> The capital of the UK is London. ``` From 5ec9b49d3e6dbfacde0a287ae45795736cbd6208 Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Sat, 25 Oct 2025 09:07:08 -0600 Subject: [PATCH 05/18] add disclaimers --- .../pydantic_ai/agent/abstract.py | 3 +++ pydantic_ai_slim/pydantic_ai/result.py | 25 ++++++++++++++----- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index a68584ad45..137aef73a6 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -601,6 +601,9 @@ def run_stream_sync( ) -> Iterator[result.StreamedRunResult[AgentDepsT, Any]]: """Run the agent with a user prompt in sync streaming mode. + This is a convenience method that wraps [`self.run_stream`][pydantic_ai.agent.AbstractAgent.run_stream] with `loop.run_until_complete(...)`. + You therefore can't use this method inside async code or if there's an active event loop. + This method builds an internal agent graph (using system prompts, tools and output schemas) and then runs the graph until the model produces output matching the `output_type`, for example text or structured data. At this point, a streaming run result object is yielded from which you can stream the output as it comes in, diff --git a/pydantic_ai_slim/pydantic_ai/result.py b/pydantic_ai_slim/pydantic_ai/result.py index c35f89d74e..d3e47d3788 100644 --- a/pydantic_ai_slim/pydantic_ai/result.py +++ b/pydantic_ai_slim/pydantic_ai/result.py @@ -413,6 +413,9 @@ async def stream_output(self, *, debounce_by: float | None = 0.1) -> AsyncIterat def stream_output_sync(self, *, debounce_by: float | None = 0.1) -> Iterator[OutputDataT]: """Stream the output as an iterable. + This is a convenience method that wraps [`self.stream_output`][pydantic_ai.result.StreamedRunResult.stream_output] with `loop.run_until_complete(...)`. + You therefore can't use this method inside async code or if there's an active event loop. + The pydantic validator for structured data will be called in [partial mode](https://docs.pydantic.dev/dev/concepts/experimental/#partial-validation) on each iteration. @@ -426,7 +429,7 @@ def stream_output_sync(self, *, debounce_by: float | None = 0.1) -> Iterator[Out An iterable of the response data. """ async_stream = self.stream_output(debounce_by=debounce_by) - yield from _lazy_async_iterator(async_stream) + yield from _blocking_async_iterator(async_stream) async def stream_text(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> AsyncIterator[str]: """Stream the text result as an async iterable. @@ -457,7 +460,10 @@ async def stream_text(self, *, delta: bool = False, debounce_by: float | None = raise ValueError('No stream response or run result provided') # pragma: no cover def stream_text_sync(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> Iterator[str]: - """Stream the text result as an async iterable. + """Stream the text result as a sync iterable. + + This is a convenience method that wraps [`self.stream_text`][pydantic_ai.result.StreamedRunResult.stream_text] with `loop.run_until_complete(...)`. + You therefore can't use this method inside async code or if there's an active event loop. !!! note Result validators will NOT be called on the text result if `delta=True`. @@ -470,7 +476,7 @@ def stream_text_sync(self, *, delta: bool = False, debounce_by: float | None = 0 performing validation as each token is received. """ async_stream = self.stream_text(delta=delta, debounce_by=debounce_by) - yield from _lazy_async_iterator(async_stream) + yield from _blocking_async_iterator(async_stream) @deprecated('`StreamedRunResult.stream_structured` is deprecated, use `stream_responses` instead.') async def stream_structured( @@ -512,6 +518,9 @@ def stream_responses_sync( ) -> Iterator[tuple[_messages.ModelResponse, bool]]: """Stream the response as an iterable of Structured LLM Messages. + This is a convenience method that wraps [`self.stream_responses`][pydantic_ai.result.StreamedRunResult.stream_responses] with `loop.run_until_complete(...)`. + You therefore can't use this method inside async code or if there's an active event loop. + Args: debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing. Debouncing is particularly important for long structured responses to reduce the overhead of @@ -521,7 +530,7 @@ def stream_responses_sync( An iterable of the structured response message and whether that is the last message. """ async_stream = self.stream_responses(debounce_by=debounce_by) - yield from _lazy_async_iterator(async_stream) + yield from _blocking_async_iterator(async_stream) async def get_output(self) -> OutputDataT: """Stream the whole response, validate and return it.""" @@ -537,7 +546,11 @@ async def get_output(self) -> OutputDataT: raise ValueError('No stream response or run result provided') # pragma: no cover def get_output_sync(self) -> OutputDataT: - """Stream the whole response, validate and return it.""" + """Stream the whole response, validate and return it. + + This is a convenience method that wraps [`self.get_output`][pydantic_ai.result.StreamedRunResult.get_output] with `loop.run_until_complete(...)`. + You therefore can't use this method inside async code or if there's an active event loop. + """ return get_event_loop().run_until_complete(self.get_output()) @property @@ -615,7 +628,7 @@ class FinalResult(Generic[OutputDataT]): __repr__ = _utils.dataclasses_no_defaults_repr -def _lazy_async_iterator(async_iter: AsyncIterator[T]) -> Iterator[T]: +def _blocking_async_iterator(async_iter: AsyncIterator[T]) -> Iterator[T]: loop = get_event_loop() while True: From e1765d351bbbd17baf6133abdf364f3dadef4742 Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Sun, 26 Oct 2025 19:46:08 -0600 Subject: [PATCH 06/18] add fixture override --- pydantic_ai_slim/pydantic_ai/agent/abstract.py | 12 ------------ tests/test_streaming.py | 8 +++++++- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index 137aef73a6..e3b6e64f8d 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -614,18 +614,6 @@ def run_stream_sync( If you want to always run the agent graph to completion and stream events and output at the same time, use [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] with an `event_stream_handler` or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] instead. - Example: - ```python - from pydantic_ai import Agent - - agent = Agent('openai:gpt-4o') - - def main(): - with agent.run_stream_sync('What is the capital of the UK?') as response: - print(response.get_output_sync()) - #> The capital of the UK is London. - ``` - Args: user_prompt: User input to start/continue the conversation. output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 0e240b2d8d..08fefed42e 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -134,7 +134,13 @@ async def ret_a(x: str) -> str: ) -def test_streamed_text_sync_response(): +@pytest.fixture +def close_cached_httpx_client(): + """Override the global fixture to avoid async context issues in sync tests.""" + yield + + +def test_streamed_text_sync_response(close_cached_httpx_client): # type: ignore[reportUnknownParameterType] m = TestModel() test_agent = Agent(m) From 4f0d2adc64f983c06f6b7e048375dee7c142e5bf Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Sun, 26 Oct 2025 21:41:15 -0600 Subject: [PATCH 07/18] update docs --- docs/agents.md | 2 +- .../pydantic_ai/agent/abstract.py | 40 ++++++++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/docs/agents.md b/docs/agents.md index ab3f658b7a..b5640c2a1d 100644 --- a/docs/agents.md +++ b/docs/agents.md @@ -65,7 +65,7 @@ There are five ways to run an agent: 1. [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] — an async function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response. 2. [`agent.run_sync()`][pydantic_ai.agent.AbstractAgent.run_sync] — a plain, synchronous function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response (internally, this just calls `loop.run_until_complete(self.run())`). -3. [`agent.run_stream()`][pydantic_ai.agent.AbstractAgent.run_stream] — an async context manager which returns a [`StreamedRunResult`][pydantic_ai.result.StreamedRunResult], which contains methods to stream text and structured output as an async iterable. +3. [`agent.run_stream()`][pydantic_ai.agent.AbstractAgent.run_stream] — an async context manager which returns a [`StreamedRunResult`][pydantic_ai.result.StreamedRunResult], which contains methods to stream text and structured output as an async iterable. [`agent.run_stream_sync()`][pydantic_ai.agent.AbstractAgent.run_stream_sync] is a synchronous context manager variation with the same return type. 4. [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] — a function which returns an async iterable of [`AgentStreamEvent`s][pydantic_ai.messages.AgentStreamEvent] and a [`AgentRunResultEvent`][pydantic_ai.run.AgentRunResultEvent] containing the final run result. 5. [`agent.iter()`][pydantic_ai.Agent.iter] — a context manager which returns an [`AgentRun`][pydantic_ai.agent.AgentRun], an async iterable over the nodes of the agent's underlying [`Graph`][pydantic_graph.graph.Graph]. diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index e3b6e64f8d..f1e10b49d1 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -4,7 +4,7 @@ import inspect from abc import ABC, abstractmethod from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable, Iterator, Mapping, Sequence -from contextlib import AbstractAsyncContextManager, asynccontextmanager, contextmanager +from contextlib import AbstractAsyncContextManager, AbstractContextManager, asynccontextmanager, contextmanager from types import FrameType from typing import TYPE_CHECKING, Any, Generic, TypeAlias, cast, overload @@ -581,6 +581,44 @@ async def on_complete() -> None: if not yielded: raise exceptions.AgentRunError('Agent run finished without producing a final result') # pragma: no cover + @overload + def run_stream_sync( + self, + user_prompt: str | Sequence[_messages.UserContent] | None = None, + *, + output_type: None = None, + message_history: Sequence[_messages.ModelMessage] | None = None, + deferred_tool_results: DeferredToolResults | None = None, + model: models.Model | models.KnownModelName | str | None = None, + deps: AgentDepsT = None, + model_settings: ModelSettings | None = None, + usage_limits: _usage.UsageLimits | None = None, + usage: _usage.RunUsage | None = None, + infer_name: bool = True, + toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, + builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, + ) -> AbstractContextManager[result.StreamedRunResult[AgentDepsT, OutputDataT]]: ... + + @overload + def run_stream_sync( + self, + user_prompt: str | Sequence[_messages.UserContent] | None = None, + *, + output_type: OutputSpec[RunOutputDataT], + message_history: Sequence[_messages.ModelMessage] | None = None, + deferred_tool_results: DeferredToolResults | None = None, + model: models.Model | models.KnownModelName | str | None = None, + deps: AgentDepsT = None, + model_settings: ModelSettings | None = None, + usage_limits: _usage.UsageLimits | None = None, + usage: _usage.RunUsage | None = None, + infer_name: bool = True, + toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, + builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, + ) -> AbstractContextManager[result.StreamedRunResult[AgentDepsT, RunOutputDataT]]: ... + @contextmanager def run_stream_sync( self, From 842d0e903daf85e580e8bb797c289dcec99b5763 Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Thu, 30 Oct 2025 22:37:45 -0600 Subject: [PATCH 08/18] Update tests/test_streaming.py Co-authored-by: Douwe Maan --- tests/test_streaming.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 08fefed42e..a27ef1d12b 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -393,12 +393,12 @@ def test_streamed_text_stream_sync(): agent = Agent(m) - with agent.run_stream_sync('Hello') as result: - # typehint to test (via static typing) that the stream type is correctly inferred - chunks: list[str] = [c for c in result.stream_text_sync()] - # one chunk with `stream_text()` due to group_by_temporal - assert chunks == snapshot(['The cat sat on the mat.']) - assert result.is_complete + result = agent.run_stream_sync('Hello') + # typehint to test (via static typing) that the stream type is correctly inferred + chunks: list[str] = [c for c in result.stream_text_sync()] + # one chunk with `stream_text()` due to group_by_temporal + assert chunks == snapshot(['The cat sat on the mat.']) + assert result.is_complete with agent.run_stream_sync('Hello') as result: # typehint to test (via static typing) that the stream type is correctly inferred From 12e96a6ba6b61baf3d7bea2336147a1677295f6f Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Thu, 30 Oct 2025 22:37:55 -0600 Subject: [PATCH 09/18] Update pydantic_ai_slim/pydantic_ai/result.py Co-authored-by: Douwe Maan --- pydantic_ai_slim/pydantic_ai/result.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pydantic_ai_slim/pydantic_ai/result.py b/pydantic_ai_slim/pydantic_ai/result.py index d3e47d3788..88f694bb17 100644 --- a/pydantic_ai_slim/pydantic_ai/result.py +++ b/pydantic_ai_slim/pydantic_ai/result.py @@ -462,7 +462,7 @@ async def stream_text(self, *, delta: bool = False, debounce_by: float | None = def stream_text_sync(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> Iterator[str]: """Stream the text result as a sync iterable. - This is a convenience method that wraps [`self.stream_text`][pydantic_ai.result.StreamedRunResult.stream_text] with `loop.run_until_complete(...)`. + This is a convenience method that wraps [`stream_text()`][pydantic_ai.result.StreamedRunResult.stream_text] with `loop.run_until_complete(...)`. You therefore can't use this method inside async code or if there's an active event loop. !!! note From 98969013e8f6bfed60b525ed3e79b6e1529b5296 Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Fri, 31 Oct 2025 08:04:08 -0600 Subject: [PATCH 10/18] turn run_stream_sync into normal method --- .../pydantic_ai/agent/abstract.py | 57 ++-- pydantic_ai_slim/pydantic_ai/result.py | 10 +- tests/test_streaming.py | 315 +++++++++--------- 3 files changed, 197 insertions(+), 185 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index 6521bf3c2e..a348ccb996 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -4,7 +4,7 @@ import inspect from abc import ABC, abstractmethod from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable, Iterator, Mapping, Sequence -from contextlib import AbstractAsyncContextManager, AbstractContextManager, asynccontextmanager, contextmanager +from contextlib import AbstractAsyncContextManager, asynccontextmanager, contextmanager from types import FrameType from typing import TYPE_CHECKING, Any, Generic, TypeAlias, cast, overload @@ -598,7 +598,7 @@ def run_stream_sync( toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, - ) -> AbstractContextManager[result.StreamedRunResult[AgentDepsT, OutputDataT]]: ... + ) -> result.StreamedRunResult[AgentDepsT, OutputDataT]: ... @overload def run_stream_sync( @@ -617,9 +617,8 @@ def run_stream_sync( toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, - ) -> AbstractContextManager[result.StreamedRunResult[AgentDepsT, RunOutputDataT]]: ... + ) -> result.StreamedRunResult[AgentDepsT, RunOutputDataT]: ... - @contextmanager def run_stream_sync( self, user_prompt: str | Sequence[_messages.UserContent] | None = None, @@ -636,7 +635,7 @@ def run_stream_sync( toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, - ) -> Iterator[result.StreamedRunResult[AgentDepsT, Any]]: + ) -> result.StreamedRunResult[AgentDepsT, Any]: """Run the agent with a user prompt in sync streaming mode. This is a convenience method that wraps [`self.run_stream`][pydantic_ai.agent.AbstractAgent.run_stream] with `loop.run_until_complete(...)`. @@ -652,6 +651,18 @@ def run_stream_sync( If you want to always run the agent graph to completion and stream events and output at the same time, use [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] with an `event_stream_handler` or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] instead. + Example: + ```python + from pydantic_ai import Agent + + agent = Agent('openai:gpt-4o') + + def main(): + response = agent.run_stream_sync('What is the capital of the UK?') + print(response.get_output_sync()) + #> The capital of the UK is London. + ``` + Args: user_prompt: User input to start/continue the conversation. output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no @@ -673,22 +684,26 @@ def run_stream_sync( Returns: The result of the run. """ - async_cm = self.run_stream( - user_prompt, - output_type=output_type, - message_history=message_history, - deferred_tool_results=deferred_tool_results, - model=model, - deps=deps, - model_settings=model_settings, - usage_limits=usage_limits, - usage=usage, - infer_name=infer_name, - toolsets=toolsets, - builtin_tools=builtin_tools, - event_stream_handler=event_stream_handler, - ) - yield get_event_loop().run_until_complete(async_cm.__aenter__()) + + async def _consume_stream(): + async with self.run_stream( + user_prompt, + output_type=output_type, + message_history=message_history, + deferred_tool_results=deferred_tool_results, + model=model, + deps=deps, + model_settings=model_settings, + usage_limits=usage_limits, + usage=usage, + infer_name=infer_name, + toolsets=toolsets, + builtin_tools=builtin_tools, + event_stream_handler=event_stream_handler, + ) as stream_result: + yield stream_result + + return get_event_loop().run_until_complete(_consume_stream().__anext__()) @overload def run_stream_events( diff --git a/pydantic_ai_slim/pydantic_ai/result.py b/pydantic_ai_slim/pydantic_ai/result.py index b347ec2868..f772364a06 100644 --- a/pydantic_ai_slim/pydantic_ai/result.py +++ b/pydantic_ai_slim/pydantic_ai/result.py @@ -425,7 +425,7 @@ async def stream_output(self, *, debounce_by: float | None = 0.1) -> AsyncIterat def stream_output_sync(self, *, debounce_by: float | None = 0.1) -> Iterator[OutputDataT]: """Stream the output as an iterable. - This is a convenience method that wraps [`self.stream_output`][pydantic_ai.result.StreamedRunResult.stream_output] with `loop.run_until_complete(...)`. + This is a convenience method that wraps [`stream_output()`][pydantic_ai.result.StreamedRunResult.stream_output] with `loop.run_until_complete(...)`. You therefore can't use this method inside async code or if there's an active event loop. The pydantic validator for structured data will be called in @@ -530,7 +530,7 @@ def stream_responses_sync( ) -> Iterator[tuple[_messages.ModelResponse, bool]]: """Stream the response as an iterable of Structured LLM Messages. - This is a convenience method that wraps [`self.stream_responses`][pydantic_ai.result.StreamedRunResult.stream_responses] with `loop.run_until_complete(...)`. + This is a convenience method that wraps [`stream_responses()`][pydantic_ai.result.StreamedRunResult.stream_responses] with `loop.run_until_complete(...)`. You therefore can't use this method inside async code or if there's an active event loop. Args: @@ -560,7 +560,7 @@ async def get_output(self) -> OutputDataT: def get_output_sync(self) -> OutputDataT: """Stream the whole response, validate and return it. - This is a convenience method that wraps [`self.get_output`][pydantic_ai.result.StreamedRunResult.get_output] with `loop.run_until_complete(...)`. + This is a convenience method that wraps [`get_output()`][pydantic_ai.result.StreamedRunResult.get_output] with `loop.run_until_complete(...)`. You therefore can't use this method inside async code or if there's an active event loop. """ return get_event_loop().run_until_complete(self.get_output()) @@ -642,11 +642,9 @@ class FinalResult(Generic[OutputDataT]): def _blocking_async_iterator(async_iter: AsyncIterator[T]) -> Iterator[T]: loop = get_event_loop() - while True: try: - item = loop.run_until_complete(async_iter.__anext__()) - yield item + yield loop.run_until_complete(async_iter.__anext__()) except StopAsyncIteration: break diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 3716e09e14..84f0ef3fa4 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -151,74 +151,74 @@ def test_streamed_text_sync_response(close_cached_httpx_client): # type: ignore async def ret_a(x: str) -> str: return f'{x}-apple' - with test_agent.run_stream_sync('Hello') as result: - # assert test_agent.name == 'test_agent' - assert not result.is_complete - assert result.all_messages() == snapshot( - [ - ModelRequest(parts=[UserPromptPart(content='Hello', timestamp=IsNow(tz=timezone.utc))]), - ModelResponse( - parts=[ToolCallPart(tool_name='ret_a', args={'x': 'a'}, tool_call_id=IsStr())], - usage=RequestUsage(input_tokens=51), - model_name='test', - timestamp=IsNow(tz=timezone.utc), - provider_name='test', - ), - ModelRequest( - parts=[ - ToolReturnPart( - tool_name='ret_a', content='a-apple', timestamp=IsNow(tz=timezone.utc), tool_call_id=IsStr() - ) - ] - ), - ] - ) - assert result.usage() == snapshot( - RunUsage( - requests=2, - input_tokens=103, - output_tokens=5, - tool_calls=1, - ) - ) - response = result.get_output_sync() - assert response == snapshot('{"ret_a":"a-apple"}') - assert result.is_complete - assert result.timestamp() == IsNow(tz=timezone.utc) - assert result.all_messages() == snapshot( - [ - ModelRequest(parts=[UserPromptPart(content='Hello', timestamp=IsNow(tz=timezone.utc))]), - ModelResponse( - parts=[ToolCallPart(tool_name='ret_a', args={'x': 'a'}, tool_call_id=IsStr())], - usage=RequestUsage(input_tokens=51), - model_name='test', - timestamp=IsNow(tz=timezone.utc), - provider_name='test', - ), - ModelRequest( - parts=[ - ToolReturnPart( - tool_name='ret_a', content='a-apple', timestamp=IsNow(tz=timezone.utc), tool_call_id=IsStr() - ) - ] - ), - ModelResponse( - parts=[TextPart(content='{"ret_a":"a-apple"}')], - usage=RequestUsage(input_tokens=52, output_tokens=11), - model_name='test', - timestamp=IsNow(tz=timezone.utc), - provider_name='test', - ), - ] + result = test_agent.run_stream_sync('Hello') + # assert test_agent.name == 'test_agent' + assert not result.is_complete + assert result.all_messages() == snapshot( + [ + ModelRequest(parts=[UserPromptPart(content='Hello', timestamp=IsNow(tz=timezone.utc))]), + ModelResponse( + parts=[ToolCallPart(tool_name='ret_a', args={'x': 'a'}, tool_call_id=IsStr())], + usage=RequestUsage(input_tokens=51), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ModelRequest( + parts=[ + ToolReturnPart( + tool_name='ret_a', content='a-apple', timestamp=IsNow(tz=timezone.utc), tool_call_id=IsStr() + ) + ] + ), + ] + ) + assert result.usage() == snapshot( + RunUsage( + requests=2, + input_tokens=103, + output_tokens=5, + tool_calls=1, ) - assert result.usage() == snapshot( - RunUsage( - requests=2, - input_tokens=103, - output_tokens=11, - tool_calls=1, - ) + ) + response = result.get_output_sync() + assert response == snapshot('{"ret_a":"a-apple"}') + assert result.is_complete + assert result.timestamp() == IsNow(tz=timezone.utc) + assert result.all_messages() == snapshot( + [ + ModelRequest(parts=[UserPromptPart(content='Hello', timestamp=IsNow(tz=timezone.utc))]), + ModelResponse( + parts=[ToolCallPart(tool_name='ret_a', args={'x': 'a'}, tool_call_id=IsStr())], + usage=RequestUsage(input_tokens=51), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ModelRequest( + parts=[ + ToolReturnPart( + tool_name='ret_a', content='a-apple', timestamp=IsNow(tz=timezone.utc), tool_call_id=IsStr() + ) + ] + ), + ModelResponse( + parts=[TextPart(content='{"ret_a":"a-apple"}')], + usage=RequestUsage(input_tokens=52, output_tokens=11), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ] + ) + assert result.usage() == snapshot( + RunUsage( + requests=2, + input_tokens=103, + output_tokens=11, + tool_calls=1, ) + ) async def test_streamed_structured_response(): @@ -400,106 +400,105 @@ def test_streamed_text_stream_sync(): assert chunks == snapshot(['The cat sat on the mat.']) assert result.is_complete - with agent.run_stream_sync('Hello') as result: - # typehint to test (via static typing) that the stream type is correctly inferred - chunks: list[str] = [c for c in result.stream_output_sync()] - # two chunks with `stream()` due to not-final vs. final - assert chunks == snapshot(['The cat sat on the mat.', 'The cat sat on the mat.']) - assert result.is_complete + result = agent.run_stream_sync('Hello') + # typehint to test (via static typing) that the stream type is correctly inferred + chunks: list[str] = [c for c in result.stream_output_sync()] + # two chunks with `stream()` due to not-final vs. final + assert chunks == snapshot(['The cat sat on the mat.']) + assert result.is_complete - with agent.run_stream_sync('Hello') as result: - assert [c for c in result.stream_text_sync(debounce_by=None)] == snapshot( - [ - 'The ', - 'The cat ', - 'The cat sat ', - 'The cat sat on ', - 'The cat sat on the ', - 'The cat sat on the mat.', - ] - ) + result = agent.run_stream_sync('Hello') + assert [c for c in result.stream_text_sync(debounce_by=None)] == snapshot( + [ + 'The ', + 'The cat ', + 'The cat sat ', + 'The cat sat on ', + 'The cat sat on the ', + 'The cat sat on the mat.', + ] + ) - with agent.run_stream_sync('Hello') as result: - # with stream_text, there is no need to do partial validation, so we only get the final message once: - assert [c for c in result.stream_text_sync(delta=False, debounce_by=None)] == snapshot( - ['The ', 'The cat ', 'The cat sat ', 'The cat sat on ', 'The cat sat on the ', 'The cat sat on the mat.'] - ) + result = agent.run_stream_sync('Hello') + # with stream_text, there is no need to do partial validation, so we only get the final message once: + assert [c for c in result.stream_text_sync(delta=False, debounce_by=None)] == snapshot( + ['The ', 'The cat ', 'The cat sat ', 'The cat sat on ', 'The cat sat on the ', 'The cat sat on the mat.'] + ) - with agent.run_stream_sync('Hello') as result: - assert [c for c in result.stream_text_sync(delta=True, debounce_by=None)] == snapshot( - ['The ', 'cat ', 'sat ', 'on ', 'the ', 'mat.'] - ) + result = agent.run_stream_sync('Hello') + assert [c for c in result.stream_text_sync(delta=True, debounce_by=None)] == snapshot( + ['The ', 'cat ', 'sat ', 'on ', 'the ', 'mat.'] + ) def upcase(text: str) -> str: return text.upper() - with agent.run_stream_sync('Hello', output_type=TextOutput(upcase)) as result: - assert [c for c in result.stream_output_sync(debounce_by=None)] == snapshot( - [ - 'THE ', - 'THE CAT ', - 'THE CAT SAT ', - 'THE CAT SAT ON ', - 'THE CAT SAT ON THE ', - 'THE CAT SAT ON THE MAT.', - 'THE CAT SAT ON THE MAT.', - ] - ) + result = agent.run_stream_sync('Hello', output_type=TextOutput(upcase)) + assert [c for c in result.stream_output_sync(debounce_by=None)] == snapshot( + ['THE ', 'THE CAT ', 'THE CAT SAT ', 'THE CAT SAT ON ', 'THE CAT SAT ON THE ', 'THE CAT SAT ON THE MAT.'] + ) - with agent.run_stream_sync('Hello') as result: - assert [c for c, _is_last in result.stream_responses_sync(debounce_by=None)] == snapshot( - [ - ModelResponse( - parts=[TextPart(content='The ')], - usage=RequestUsage(input_tokens=51, output_tokens=1), - model_name='test', - timestamp=IsNow(tz=timezone.utc), - provider_name='test', - ), - ModelResponse( - parts=[TextPart(content='The cat ')], - usage=RequestUsage(input_tokens=51, output_tokens=2), - model_name='test', - timestamp=IsNow(tz=timezone.utc), - provider_name='test', - ), - ModelResponse( - parts=[TextPart(content='The cat sat ')], - usage=RequestUsage(input_tokens=51, output_tokens=3), - model_name='test', - timestamp=IsNow(tz=timezone.utc), - provider_name='test', - ), - ModelResponse( - parts=[TextPart(content='The cat sat on ')], - usage=RequestUsage(input_tokens=51, output_tokens=4), - model_name='test', - timestamp=IsNow(tz=timezone.utc), - provider_name='test', - ), - ModelResponse( - parts=[TextPart(content='The cat sat on the ')], - usage=RequestUsage(input_tokens=51, output_tokens=5), - model_name='test', - timestamp=IsNow(tz=timezone.utc), - provider_name='test', - ), - ModelResponse( - parts=[TextPart(content='The cat sat on the mat.')], - usage=RequestUsage(input_tokens=51, output_tokens=7), - model_name='test', - timestamp=IsNow(tz=timezone.utc), - provider_name='test', - ), - ModelResponse( - parts=[TextPart(content='The cat sat on the mat.')], - usage=RequestUsage(input_tokens=51, output_tokens=7), - model_name='test', - timestamp=IsNow(tz=timezone.utc), - provider_name='test', - ), - ] - ) + result = agent.run_stream_sync('Hello') + assert [c for c, _is_last in result.stream_responses_sync(debounce_by=None)] == snapshot( + [ + ModelResponse( + parts=[TextPart(content='The ')], + usage=RequestUsage(input_tokens=51, output_tokens=1), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ModelResponse( + parts=[TextPart(content='The cat ')], + usage=RequestUsage(input_tokens=51, output_tokens=2), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ModelResponse( + parts=[TextPart(content='The cat sat ')], + usage=RequestUsage(input_tokens=51, output_tokens=3), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ModelResponse( + parts=[TextPart(content='The cat sat on ')], + usage=RequestUsage(input_tokens=51, output_tokens=4), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ModelResponse( + parts=[TextPart(content='The cat sat on the ')], + usage=RequestUsage(input_tokens=51, output_tokens=5), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ModelResponse( + parts=[TextPart(content='The cat sat on the mat.')], + usage=RequestUsage(input_tokens=51, output_tokens=7), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ModelResponse( + parts=[TextPart(content='The cat sat on the mat.')], + usage=RequestUsage(input_tokens=51, output_tokens=7), + model_name='test', + timestamp=IsNow(tz=timezone.utc), + provider_name='test', + ), + ModelResponse( + parts=[TextPart(content='The cat sat on the mat.')], + usage=RequestUsage(input_tokens=51, output_tokens=7), + model_name='test', + timestamp=IsDatetime(), + provider_name='test', + ), + ] + ) async def test_plain_response(): From 1d0b46f208a0aece3b7a274d3552abab241dad14 Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Fri, 31 Oct 2025 08:11:36 -0600 Subject: [PATCH 11/18] add infer_name --- pydantic_ai_slim/pydantic_ai/agent/abstract.py | 3 +++ tests/test_streaming.py | 10 ++-------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index a348ccb996..7a95e07c47 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -684,6 +684,9 @@ def main(): Returns: The result of the run. """ + if infer_name and self.name is None: + if frame := inspect.currentframe(): # pragma: no branch + self._infer_name(frame) async def _consume_stream(): async with self.run_stream( diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 84f0ef3fa4..a36f38417c 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -135,13 +135,7 @@ async def ret_a(x: str) -> str: ) -@pytest.fixture -def close_cached_httpx_client(): - """Override the global fixture to avoid async context issues in sync tests.""" - yield - - -def test_streamed_text_sync_response(close_cached_httpx_client): # type: ignore[reportUnknownParameterType] +def test_streamed_text_sync_response(): m = TestModel() test_agent = Agent(m) @@ -152,7 +146,7 @@ async def ret_a(x: str) -> str: return f'{x}-apple' result = test_agent.run_stream_sync('Hello') - # assert test_agent.name == 'test_agent' + assert test_agent.name == 'test_agent' assert not result.is_complete assert result.all_messages() == snapshot( [ From 344c510ffe1d6989b07e48a6fc8713bd1b031cda Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Fri, 31 Oct 2025 10:29:45 -0600 Subject: [PATCH 12/18] update docstrings --- docs/agents.md | 2 +- pydantic_ai_slim/pydantic_ai/_utils.py | 9 +++++++++ pydantic_ai_slim/pydantic_ai/agent/abstract.py | 2 +- pydantic_ai_slim/pydantic_ai/result.py | 6 ++---- 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/docs/agents.md b/docs/agents.md index f272a4cb27..af7cc66a6b 100644 --- a/docs/agents.md +++ b/docs/agents.md @@ -65,7 +65,7 @@ There are five ways to run an agent: 1. [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] — an async function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response. 2. [`agent.run_sync()`][pydantic_ai.agent.AbstractAgent.run_sync] — a plain, synchronous function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response (internally, this just calls `loop.run_until_complete(self.run())`). -3. [`agent.run_stream()`][pydantic_ai.agent.AbstractAgent.run_stream] — an async context manager which returns a [`StreamedRunResult`][pydantic_ai.result.StreamedRunResult], which contains methods to stream text and structured output as an async iterable. [`agent.run_stream_sync()`][pydantic_ai.agent.AbstractAgent.run_stream_sync] is a synchronous context manager variation with the same return type. +3. [`agent.run_stream()`][pydantic_ai.agent.AbstractAgent.run_stream] — an async context manager which returns a [`StreamedRunResult`][pydantic_ai.result.StreamedRunResult], which contains methods to stream text and structured output as an async iterable. [`agent.run_stream_sync()`][pydantic_ai.agent.AbstractAgent.run_stream_sync] is a synchronous variation that returns the result directly. 4. [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] — a function which returns an async iterable of [`AgentStreamEvent`s][pydantic_ai.messages.AgentStreamEvent] and a [`AgentRunResultEvent`][pydantic_ai.run.AgentRunResultEvent] containing the final run result. 5. [`agent.iter()`][pydantic_ai.Agent.iter] — a context manager which returns an [`AgentRun`][pydantic_ai.agent.AgentRun], an async iterable over the nodes of the agent's underlying [`Graph`][pydantic_graph.graph.Graph]. diff --git a/pydantic_ai_slim/pydantic_ai/_utils.py b/pydantic_ai_slim/pydantic_ai/_utils.py index c9d75914f7..cd9f3f1126 100644 --- a/pydantic_ai_slim/pydantic_ai/_utils.py +++ b/pydantic_ai_slim/pydantic_ai/_utils.py @@ -489,3 +489,12 @@ def get_union_args(tp: Any) -> tuple[Any, ...]: return tuple(_unwrap_annotated(arg) for arg in get_args(tp)) else: return () + + +def get_event_loop(): + try: + event_loop = asyncio.get_event_loop() + except RuntimeError: + event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(event_loop) + return event_loop diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index 7a95e07c47..57d2c8578f 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -638,7 +638,7 @@ def run_stream_sync( ) -> result.StreamedRunResult[AgentDepsT, Any]: """Run the agent with a user prompt in sync streaming mode. - This is a convenience method that wraps [`self.run_stream`][pydantic_ai.agent.AbstractAgent.run_stream] with `loop.run_until_complete(...)`. + This is a convenience method that wraps [`run_stream()`][pydantic_ai.agent.AbstractAgent.run_stream] with `loop.run_until_complete(...)`. You therefore can't use this method inside async code or if there's an active event loop. This method builds an internal agent graph (using system prompts, tools and output schemas) and then diff --git a/pydantic_ai_slim/pydantic_ai/result.py b/pydantic_ai_slim/pydantic_ai/result.py index f772364a06..27be2b5023 100644 --- a/pydantic_ai_slim/pydantic_ai/result.py +++ b/pydantic_ai_slim/pydantic_ai/result.py @@ -9,8 +9,6 @@ from pydantic import ValidationError from typing_extensions import TypeVar, deprecated -from pydantic_graph._utils import get_event_loop - from . import _utils, exceptions, messages as _messages, models from ._output import ( OutputDataT_inv, @@ -563,7 +561,7 @@ def get_output_sync(self) -> OutputDataT: This is a convenience method that wraps [`get_output()`][pydantic_ai.result.StreamedRunResult.get_output] with `loop.run_until_complete(...)`. You therefore can't use this method inside async code or if there's an active event loop. """ - return get_event_loop().run_until_complete(self.get_output()) + return _utils.get_event_loop().run_until_complete(self.get_output()) @property def response(self) -> _messages.ModelResponse: @@ -641,7 +639,7 @@ class FinalResult(Generic[OutputDataT]): def _blocking_async_iterator(async_iter: AsyncIterator[T]) -> Iterator[T]: - loop = get_event_loop() + loop = _utils.get_event_loop() while True: try: yield loop.run_until_complete(async_iter.__anext__()) From 584ae6a2a87da696699126d4a26e9604f2d555d2 Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Fri, 31 Oct 2025 11:27:01 -0600 Subject: [PATCH 13/18] Update pydantic_ai_slim/pydantic_ai/_utils.py --- pydantic_ai_slim/pydantic_ai/_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pydantic_ai_slim/pydantic_ai/_utils.py b/pydantic_ai_slim/pydantic_ai/_utils.py index cd9f3f1126..e4ac2a66ae 100644 --- a/pydantic_ai_slim/pydantic_ai/_utils.py +++ b/pydantic_ai_slim/pydantic_ai/_utils.py @@ -494,7 +494,7 @@ def get_union_args(tp: Any) -> tuple[Any, ...]: def get_event_loop(): try: event_loop = asyncio.get_event_loop() - except RuntimeError: + except RuntimeError: # pragma: lax no cover event_loop = asyncio.new_event_loop() asyncio.set_event_loop(event_loop) return event_loop From cf2b9b68312f0f162e6d92d148829a1031ae0a6d Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Fri, 31 Oct 2025 17:42:55 +0000 Subject: [PATCH 14/18] Tweaks, add validate_response_output_sync --- pydantic_ai_slim/pydantic_ai/_utils.py | 9 ++++++ .../pydantic_ai/agent/abstract.py | 10 +++---- pydantic_ai_slim/pydantic_ai/result.py | 30 +++++++++---------- tests/test_streaming.py | 21 +++++++++++++ 4 files changed, 49 insertions(+), 21 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/_utils.py b/pydantic_ai_slim/pydantic_ai/_utils.py index e4ac2a66ae..4f6deb2f2e 100644 --- a/pydantic_ai_slim/pydantic_ai/_utils.py +++ b/pydantic_ai_slim/pydantic_ai/_utils.py @@ -234,6 +234,15 @@ def sync_anext(iterator: Iterator[T]) -> T: raise StopAsyncIteration() from e +def sync_async_iterator(async_iter: AsyncIterator[T]) -> Iterator[T]: + loop = get_event_loop() + while True: + try: + yield loop.run_until_complete(anext(async_iter)) + except StopAsyncIteration: + break + + def now_utc() -> datetime: return datetime.now(tz=timezone.utc) diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index 57d2c8578f..bdb9428a14 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -12,7 +12,6 @@ from typing_extensions import Self, TypeIs, TypeVar from pydantic_graph import End -from pydantic_graph._utils import get_event_loop from .. import ( _agent_graph, @@ -335,7 +334,7 @@ def run_sync( if infer_name and self.name is None: self._infer_name(inspect.currentframe()) - return get_event_loop().run_until_complete( + return _utils.get_event_loop().run_until_complete( self.run( user_prompt, output_type=output_type, @@ -685,8 +684,7 @@ def main(): The result of the run. """ if infer_name and self.name is None: - if frame := inspect.currentframe(): # pragma: no branch - self._infer_name(frame) + self._infer_name(inspect.currentframe()) async def _consume_stream(): async with self.run_stream( @@ -706,7 +704,7 @@ async def _consume_stream(): ) as stream_result: yield stream_result - return get_event_loop().run_until_complete(_consume_stream().__anext__()) + return _utils.get_event_loop().run_until_complete(anext(_consume_stream())) @overload def run_stream_events( @@ -1344,6 +1342,6 @@ def to_cli_sync( agent.to_cli_sync(prog_name='assistant') ``` """ - return get_event_loop().run_until_complete( + return _utils.get_event_loop().run_until_complete( self.to_cli(deps=deps, prog_name=prog_name, message_history=message_history) ) diff --git a/pydantic_ai_slim/pydantic_ai/result.py b/pydantic_ai_slim/pydantic_ai/result.py index 27be2b5023..f83e570d2e 100644 --- a/pydantic_ai_slim/pydantic_ai/result.py +++ b/pydantic_ai_slim/pydantic_ai/result.py @@ -438,8 +438,7 @@ def stream_output_sync(self, *, debounce_by: float | None = 0.1) -> Iterator[Out Returns: An iterable of the response data. """ - async_stream = self.stream_output(debounce_by=debounce_by) - yield from _blocking_async_iterator(async_stream) + return _utils.sync_async_iterator(self.stream_output(debounce_by=debounce_by)) async def stream_text(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> AsyncIterator[str]: """Stream the text result as an async iterable. @@ -485,8 +484,7 @@ def stream_text_sync(self, *, delta: bool = False, debounce_by: float | None = 0 Debouncing is particularly important for long structured responses to reduce the overhead of performing validation as each token is received. """ - async_stream = self.stream_text(delta=delta, debounce_by=debounce_by) - yield from _blocking_async_iterator(async_stream) + return _utils.sync_async_iterator(self.stream_text(delta=delta, debounce_by=debounce_by)) @deprecated('`StreamedRunResult.stream_structured` is deprecated, use `stream_responses` instead.') async def stream_structured( @@ -539,8 +537,7 @@ def stream_responses_sync( Returns: An iterable of the structured response message and whether that is the last message. """ - async_stream = self.stream_responses(debounce_by=debounce_by) - yield from _blocking_async_iterator(async_stream) + return _utils.sync_async_iterator(self.stream_responses(debounce_by=debounce_by)) async def get_output(self) -> OutputDataT: """Stream the whole response, validate and return it.""" @@ -614,6 +611,18 @@ async def validate_response_output( else: raise ValueError('No stream response or run result provided') # pragma: no cover + def validate_response_output_sync( + self, message: _messages.ModelResponse, *, allow_partial: bool = False + ) -> OutputDataT: + """Validate a structured result message. + + This is a convenience method that wraps [`validate_response_output()`][pydantic_ai.result.StreamedRunResult.validate_response_output] with `loop.run_until_complete(...)`. + You therefore can't use this method inside async code or if there's an active event loop. + """ + return _utils.get_event_loop().run_until_complete( + self.validate_response_output(message, allow_partial=allow_partial) + ) + async def _marked_completed(self, message: _messages.ModelResponse | None = None) -> None: self.is_complete = True if message is not None: @@ -638,15 +647,6 @@ class FinalResult(Generic[OutputDataT]): __repr__ = _utils.dataclasses_no_defaults_repr -def _blocking_async_iterator(async_iter: AsyncIterator[T]) -> Iterator[T]: - loop = _utils.get_event_loop() - while True: - try: - yield loop.run_until_complete(async_iter.__anext__()) - except StopAsyncIteration: - break - - def _get_usage_checking_stream_response( stream_response: models.StreamedResponse, limits: UsageLimits | None, diff --git a/tests/test_streaming.py b/tests/test_streaming.py index a36f38417c..318b1ab1db 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -2085,3 +2085,24 @@ async def ret_a(x: str) -> str: AgentRunResultEvent(result=AgentRunResult(output='{"ret_a":"a-apple"}')), ] ) + + +def test_structured_response_sync_validation(): + async def text_stream(_messages: list[ModelMessage], agent_info: AgentInfo) -> AsyncIterator[DeltaToolCalls]: + assert agent_info.output_tools is not None + assert len(agent_info.output_tools) == 1 + name = agent_info.output_tools[0].name + json_data = json.dumps({'response': [1, 2, 3, 4]}) + yield {0: DeltaToolCall(name=name)} + yield {0: DeltaToolCall(json_args=json_data[:15])} + yield {0: DeltaToolCall(json_args=json_data[15:])} + + agent = Agent(FunctionModel(stream_function=text_stream), output_type=list[int]) + + chunks: list[list[int]] = [] + result = agent.run_stream_sync('') + for structured_response, last in result.stream_responses_sync(debounce_by=None): + response_data = result.validate_response_output_sync(structured_response, allow_partial=not last) + chunks.append(response_data) + + assert chunks == snapshot([[1], [1, 2, 3, 4], [1, 2, 3, 4], [1, 2, 3, 4]]) From db271eb30d1350bed2056f7c0813b83998e5d9c5 Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Fri, 31 Oct 2025 18:28:33 +0000 Subject: [PATCH 15/18] StreamedRunResultSync --- .../pydantic_ai/agent/abstract.py | 11 +- pydantic_ai_slim/pydantic_ai/result.py | 229 ++++++++++++------ tests/test_streaming.py | 95 +++++++- 3 files changed, 243 insertions(+), 92 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index bdb9428a14..302fdc83dd 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -597,7 +597,7 @@ def run_stream_sync( toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, - ) -> result.StreamedRunResult[AgentDepsT, OutputDataT]: ... + ) -> result.StreamedRunResultSync[AgentDepsT, OutputDataT]: ... @overload def run_stream_sync( @@ -616,7 +616,7 @@ def run_stream_sync( toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, - ) -> result.StreamedRunResult[AgentDepsT, RunOutputDataT]: ... + ) -> result.StreamedRunResultSync[AgentDepsT, RunOutputDataT]: ... def run_stream_sync( self, @@ -634,7 +634,7 @@ def run_stream_sync( toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, - ) -> result.StreamedRunResult[AgentDepsT, Any]: + ) -> result.StreamedRunResultSync[AgentDepsT, Any]: """Run the agent with a user prompt in sync streaming mode. This is a convenience method that wraps [`run_stream()`][pydantic_ai.agent.AbstractAgent.run_stream] with `loop.run_until_complete(...)`. @@ -658,7 +658,7 @@ def run_stream_sync( def main(): response = agent.run_stream_sync('What is the capital of the UK?') - print(response.get_output_sync()) + print(response.get_output()) #> The capital of the UK is London. ``` @@ -704,7 +704,8 @@ async def _consume_stream(): ) as stream_result: yield stream_result - return _utils.get_event_loop().run_until_complete(anext(_consume_stream())) + async_result = _utils.get_event_loop().run_until_complete(anext(_consume_stream())) + return result.StreamedRunResultSync(async_result) @overload def run_stream_events( diff --git a/pydantic_ai_slim/pydantic_ai/result.py b/pydantic_ai_slim/pydantic_ai/result.py index f83e570d2e..20967c299f 100644 --- a/pydantic_ai_slim/pydantic_ai/result.py +++ b/pydantic_ai_slim/pydantic_ai/result.py @@ -35,6 +35,7 @@ 'OutputDataT_inv', 'ToolOutput', 'OutputValidatorFunc', + 'StreamedRunResultSync', ) @@ -420,26 +421,6 @@ async def stream_output(self, *, debounce_by: float | None = 0.1) -> AsyncIterat else: raise ValueError('No stream response or run result provided') # pragma: no cover - def stream_output_sync(self, *, debounce_by: float | None = 0.1) -> Iterator[OutputDataT]: - """Stream the output as an iterable. - - This is a convenience method that wraps [`stream_output()`][pydantic_ai.result.StreamedRunResult.stream_output] with `loop.run_until_complete(...)`. - You therefore can't use this method inside async code or if there's an active event loop. - - The pydantic validator for structured data will be called in - [partial mode](https://docs.pydantic.dev/dev/concepts/experimental/#partial-validation) - on each iteration. - - Args: - debounce_by: by how much (if at all) to debounce/group the output chunks by. `None` means no debouncing. - Debouncing is particularly important for long structured outputs to reduce the overhead of - performing validation as each token is received. - - Returns: - An iterable of the response data. - """ - return _utils.sync_async_iterator(self.stream_output(debounce_by=debounce_by)) - async def stream_text(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> AsyncIterator[str]: """Stream the text result as an async iterable. @@ -468,24 +449,6 @@ async def stream_text(self, *, delta: bool = False, debounce_by: float | None = else: raise ValueError('No stream response or run result provided') # pragma: no cover - def stream_text_sync(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> Iterator[str]: - """Stream the text result as a sync iterable. - - This is a convenience method that wraps [`stream_text()`][pydantic_ai.result.StreamedRunResult.stream_text] with `loop.run_until_complete(...)`. - You therefore can't use this method inside async code or if there's an active event loop. - - !!! note - Result validators will NOT be called on the text result if `delta=True`. - - Args: - delta: if `True`, yield each chunk of text as it is received, if `False` (default), yield the full text - up to the current point. - debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing. - Debouncing is particularly important for long structured responses to reduce the overhead of - performing validation as each token is received. - """ - return _utils.sync_async_iterator(self.stream_text(delta=delta, debounce_by=debounce_by)) - @deprecated('`StreamedRunResult.stream_structured` is deprecated, use `stream_responses` instead.') async def stream_structured( self, *, debounce_by: float | None = 0.1 @@ -521,24 +484,6 @@ async def stream_responses( else: raise ValueError('No stream response or run result provided') # pragma: no cover - def stream_responses_sync( - self, *, debounce_by: float | None = 0.1 - ) -> Iterator[tuple[_messages.ModelResponse, bool]]: - """Stream the response as an iterable of Structured LLM Messages. - - This is a convenience method that wraps [`stream_responses()`][pydantic_ai.result.StreamedRunResult.stream_responses] with `loop.run_until_complete(...)`. - You therefore can't use this method inside async code or if there's an active event loop. - - Args: - debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing. - Debouncing is particularly important for long structured responses to reduce the overhead of - performing validation as each token is received. - - Returns: - An iterable of the structured response message and whether that is the last message. - """ - return _utils.sync_async_iterator(self.stream_responses(debounce_by=debounce_by)) - async def get_output(self) -> OutputDataT: """Stream the whole response, validate and return it.""" if self._run_result is not None: @@ -552,14 +497,6 @@ async def get_output(self) -> OutputDataT: else: raise ValueError('No stream response or run result provided') # pragma: no cover - def get_output_sync(self) -> OutputDataT: - """Stream the whole response, validate and return it. - - This is a convenience method that wraps [`get_output()`][pydantic_ai.result.StreamedRunResult.get_output] with `loop.run_until_complete(...)`. - You therefore can't use this method inside async code or if there's an active event loop. - """ - return _utils.get_event_loop().run_until_complete(self.get_output()) - @property def response(self) -> _messages.ModelResponse: """Return the current state of the response.""" @@ -611,18 +548,6 @@ async def validate_response_output( else: raise ValueError('No stream response or run result provided') # pragma: no cover - def validate_response_output_sync( - self, message: _messages.ModelResponse, *, allow_partial: bool = False - ) -> OutputDataT: - """Validate a structured result message. - - This is a convenience method that wraps [`validate_response_output()`][pydantic_ai.result.StreamedRunResult.validate_response_output] with `loop.run_until_complete(...)`. - You therefore can't use this method inside async code or if there's an active event loop. - """ - return _utils.get_event_loop().run_until_complete( - self.validate_response_output(message, allow_partial=allow_partial) - ) - async def _marked_completed(self, message: _messages.ModelResponse | None = None) -> None: self.is_complete = True if message is not None: @@ -631,6 +556,158 @@ async def _marked_completed(self, message: _messages.ModelResponse | None = None await self._on_complete() +@dataclass(init=False) +class StreamedRunResultSync(Generic[AgentDepsT, OutputDataT]): + """Synchronous wrapper for [`StreamedRunResult`][pydantic_ai.result.StreamedRunResult] that only exposes sync methods.""" + + _streamed_run_result: StreamedRunResult[AgentDepsT, OutputDataT] + + def __init__(self, streamed_run_result: StreamedRunResult[AgentDepsT, OutputDataT]) -> None: + self._streamed_run_result = streamed_run_result + + def all_messages(self, *, output_tool_return_content: str | None = None) -> list[_messages.ModelMessage]: + """Return the history of messages. + + Args: + output_tool_return_content: The return content of the tool call to set in the last message. + This provides a convenient way to modify the content of the output tool call if you want to continue + the conversation and want to set the response to the output tool call. If `None`, the last message will + not be modified. + + Returns: + List of messages. + """ + return self._streamed_run_result.all_messages(output_tool_return_content=output_tool_return_content) + + def all_messages_json(self, *, output_tool_return_content: str | None = None) -> bytes: # pragma: no cover + """Return all messages from [`all_messages`][pydantic_ai.result.StreamedRunResultSync.all_messages] as JSON bytes. + + Args: + output_tool_return_content: The return content of the tool call to set in the last message. + This provides a convenient way to modify the content of the output tool call if you want to continue + the conversation and want to set the response to the output tool call. If `None`, the last message will + not be modified. + + Returns: + JSON bytes representing the messages. + """ + return self._streamed_run_result.all_messages_json(output_tool_return_content=output_tool_return_content) + + def new_messages(self, *, output_tool_return_content: str | None = None) -> list[_messages.ModelMessage]: + """Return new messages associated with this run. + + Messages from older runs are excluded. + + Args: + output_tool_return_content: The return content of the tool call to set in the last message. + This provides a convenient way to modify the content of the output tool call if you want to continue + the conversation and want to set the response to the output tool call. If `None`, the last message will + not be modified. + + Returns: + List of new messages. + """ + return self._streamed_run_result.new_messages(output_tool_return_content=output_tool_return_content) + + def new_messages_json(self, *, output_tool_return_content: str | None = None) -> bytes: # pragma: no cover + """Return new messages from [`new_messages`][pydantic_ai.result.StreamedRunResultSync.new_messages] as JSON bytes. + + Args: + output_tool_return_content: The return content of the tool call to set in the last message. + This provides a convenient way to modify the content of the output tool call if you want to continue + the conversation and want to set the response to the output tool call. If `None`, the last message will + not be modified. + + Returns: + JSON bytes representing the new messages. + """ + return self._streamed_run_result.new_messages_json(output_tool_return_content=output_tool_return_content) + + def stream_output(self, *, debounce_by: float | None = 0.1) -> Iterator[OutputDataT]: + """Stream the output as an iterable. + + The pydantic validator for structured data will be called in + [partial mode](https://docs.pydantic.dev/dev/concepts/experimental/#partial-validation) + on each iteration. + + Args: + debounce_by: by how much (if at all) to debounce/group the output chunks by. `None` means no debouncing. + Debouncing is particularly important for long structured outputs to reduce the overhead of + performing validation as each token is received. + + Returns: + An iterable of the response data. + """ + return _utils.sync_async_iterator(self._streamed_run_result.stream_output(debounce_by=debounce_by)) + + def stream_text(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> Iterator[str]: + """Stream the text result as an iterable. + + !!! note + Result validators will NOT be called on the text result if `delta=True`. + + Args: + delta: if `True`, yield each chunk of text as it is received, if `False` (default), yield the full text + up to the current point. + debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing. + Debouncing is particularly important for long structured responses to reduce the overhead of + performing validation as each token is received. + """ + return _utils.sync_async_iterator(self._streamed_run_result.stream_text(delta=delta, debounce_by=debounce_by)) + + def stream_responses(self, *, debounce_by: float | None = 0.1) -> Iterator[tuple[_messages.ModelResponse, bool]]: + """Stream the response as an iterable of Structured LLM Messages. + + Args: + debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing. + Debouncing is particularly important for long structured responses to reduce the overhead of + performing validation as each token is received. + + Returns: + An iterable of the structured response message and whether that is the last message. + """ + return _utils.sync_async_iterator(self._streamed_run_result.stream_responses(debounce_by=debounce_by)) + + def get_output(self) -> OutputDataT: + """Stream the whole response, validate and return it.""" + return _utils.get_event_loop().run_until_complete(self._streamed_run_result.get_output()) + + @property + def response(self) -> _messages.ModelResponse: + """Return the current state of the response.""" + return self._streamed_run_result.response + + def usage(self) -> RunUsage: + """Return the usage of the whole run. + + !!! note + This won't return the full usage until the stream is finished. + """ + return self._streamed_run_result.usage() + + def timestamp(self) -> datetime: + """Get the timestamp of the response.""" + return self._streamed_run_result.timestamp() + + def validate_response_output(self, message: _messages.ModelResponse, *, allow_partial: bool = False) -> OutputDataT: + """Validate a structured result message.""" + return _utils.get_event_loop().run_until_complete( + self._streamed_run_result.validate_response_output(message, allow_partial=allow_partial) + ) + + @property + def is_complete(self) -> bool: + """Whether the stream has all been received. + + This is set to `True` when one of + [`stream_output`][pydantic_ai.result.StreamedRunResultSync.stream_output], + [`stream_text`][pydantic_ai.result.StreamedRunResultSync.stream_text], + [`stream_responses`][pydantic_ai.result.StreamedRunResultSync.stream_responses] or + [`get_output`][pydantic_ai.result.StreamedRunResultSync.get_output] completes. + """ + return self._streamed_run_result.is_complete + + @dataclass(repr=False) class FinalResult(Generic[OutputDataT]): """Marker class storing the final output of an agent run and associated metadata.""" diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 318b1ab1db..b3e805577f 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -45,7 +45,7 @@ from pydantic_ai.models.function import AgentInfo, DeltaToolCall, DeltaToolCalls, FunctionModel from pydantic_ai.models.test import TestModel from pydantic_ai.output import PromptedOutput, TextOutput -from pydantic_ai.result import AgentStream, FinalResult, RunUsage +from pydantic_ai.result import AgentStream, FinalResult, RunUsage, StreamedRunResultSync from pydantic_ai.tools import DeferredToolRequests, DeferredToolResults, ToolApproved, ToolDefinition from pydantic_ai.usage import RequestUsage from pydantic_graph import End @@ -175,7 +175,7 @@ async def ret_a(x: str) -> str: tool_calls=1, ) ) - response = result.get_output_sync() + response = result.get_output() assert response == snapshot('{"ret_a":"a-apple"}') assert result.is_complete assert result.timestamp() == IsNow(tz=timezone.utc) @@ -389,20 +389,20 @@ def test_streamed_text_stream_sync(): result = agent.run_stream_sync('Hello') # typehint to test (via static typing) that the stream type is correctly inferred - chunks: list[str] = [c for c in result.stream_text_sync()] + chunks: list[str] = [c for c in result.stream_text()] # one chunk with `stream_text()` due to group_by_temporal assert chunks == snapshot(['The cat sat on the mat.']) assert result.is_complete result = agent.run_stream_sync('Hello') # typehint to test (via static typing) that the stream type is correctly inferred - chunks: list[str] = [c for c in result.stream_output_sync()] + chunks: list[str] = [c for c in result.stream_output()] # two chunks with `stream()` due to not-final vs. final assert chunks == snapshot(['The cat sat on the mat.']) assert result.is_complete result = agent.run_stream_sync('Hello') - assert [c for c in result.stream_text_sync(debounce_by=None)] == snapshot( + assert [c for c in result.stream_text(debounce_by=None)] == snapshot( [ 'The ', 'The cat ', @@ -415,12 +415,12 @@ def test_streamed_text_stream_sync(): result = agent.run_stream_sync('Hello') # with stream_text, there is no need to do partial validation, so we only get the final message once: - assert [c for c in result.stream_text_sync(delta=False, debounce_by=None)] == snapshot( + assert [c for c in result.stream_text(delta=False, debounce_by=None)] == snapshot( ['The ', 'The cat ', 'The cat sat ', 'The cat sat on ', 'The cat sat on the ', 'The cat sat on the mat.'] ) result = agent.run_stream_sync('Hello') - assert [c for c in result.stream_text_sync(delta=True, debounce_by=None)] == snapshot( + assert [c for c in result.stream_text(delta=True, debounce_by=None)] == snapshot( ['The ', 'cat ', 'sat ', 'on ', 'the ', 'mat.'] ) @@ -428,12 +428,12 @@ def upcase(text: str) -> str: return text.upper() result = agent.run_stream_sync('Hello', output_type=TextOutput(upcase)) - assert [c for c in result.stream_output_sync(debounce_by=None)] == snapshot( + assert [c for c in result.stream_output(debounce_by=None)] == snapshot( ['THE ', 'THE CAT ', 'THE CAT SAT ', 'THE CAT SAT ON ', 'THE CAT SAT ON THE ', 'THE CAT SAT ON THE MAT.'] ) result = agent.run_stream_sync('Hello') - assert [c for c, _is_last in result.stream_responses_sync(debounce_by=None)] == snapshot( + assert [c for c, _is_last in result.stream_responses(debounce_by=None)] == snapshot( [ ModelResponse( parts=[TextPart(content='The ')], @@ -2101,8 +2101,81 @@ async def text_stream(_messages: list[ModelMessage], agent_info: AgentInfo) -> A chunks: list[list[int]] = [] result = agent.run_stream_sync('') - for structured_response, last in result.stream_responses_sync(debounce_by=None): - response_data = result.validate_response_output_sync(structured_response, allow_partial=not last) + for structured_response, last in result.stream_responses(debounce_by=None): + response_data = result.validate_response_output(structured_response, allow_partial=not last) chunks.append(response_data) assert chunks == snapshot([[1], [1, 2, 3, 4], [1, 2, 3, 4], [1, 2, 3, 4]]) + + +def test_streamed_run_result_sync_wrapper(): + """Test that run_stream_sync returns StreamedRunResultSync with all sync methods.""" + m = TestModel(custom_output_text='The cat sat on the mat.') + agent = Agent(m) + + # Test that run_stream_sync returns StreamedRunResultSync directly + result = agent.run_stream_sync('Hello') + assert isinstance(result, StreamedRunResultSync) + + # Test all_messages + messages = result.all_messages() + assert len(messages) > 0 + + # Test all_messages_json + messages_json = result.all_messages_json() + assert isinstance(messages_json, bytes) + + # Test new_messages + new_messages = result.new_messages() + assert len(new_messages) > 0 + + # Test new_messages_json + new_messages_json = result.new_messages_json() + assert isinstance(new_messages_json, bytes) + + # Test stream_output + result = agent.run_stream_sync('Hello') + chunks: list[str] = [c for c in result.stream_output(debounce_by=None)] + assert chunks == snapshot( + ['The ', 'The cat ', 'The cat sat ', 'The cat sat on ', 'The cat sat on the ', 'The cat sat on the mat.'] + ) + + # Test stream_text + result = agent.run_stream_sync('Hello') + text_chunks: list[str] = [c for c in result.stream_text(debounce_by=None)] + assert text_chunks == snapshot( + [ + 'The ', + 'The cat ', + 'The cat sat ', + 'The cat sat on ', + 'The cat sat on the ', + 'The cat sat on the mat.', + ] + ) + + # Test stream_responses + result = agent.run_stream_sync('Hello') + responses = [c for c, _is_last in result.stream_responses(debounce_by=None)] + assert len(responses) > 0 + assert all(isinstance(r, ModelResponse) for r in responses) + + # Test get_output + result = agent.run_stream_sync('Hello') + output = result.get_output() + assert output == snapshot('The cat sat on the mat.') + + # Test properties + result = agent.run_stream_sync('Hello') + _ = result.get_output() + assert result.is_complete + assert isinstance(result.response, ModelResponse) + assert isinstance(result.usage(), RunUsage) + assert isinstance(result.timestamp(), datetime.datetime) + + # Test validate_response_output + result = agent.run_stream_sync('Hello') + _ = result.get_output() # Need to complete the stream first + response = result.response + validated = result.validate_response_output(response) + assert validated == snapshot('The cat sat on the mat.') From 888ee182149f42a367b47fd260085d04c299a368 Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Sat, 1 Nov 2025 06:57:06 -0600 Subject: [PATCH 16/18] test wrapper methods --- tests/test_streaming.py | 87 ++++++----------------------------------- 1 file changed, 13 insertions(+), 74 deletions(-) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index b3e805577f..c0d3d94fc1 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -45,7 +45,7 @@ from pydantic_ai.models.function import AgentInfo, DeltaToolCall, DeltaToolCalls, FunctionModel from pydantic_ai.models.test import TestModel from pydantic_ai.output import PromptedOutput, TextOutput -from pydantic_ai.result import AgentStream, FinalResult, RunUsage, StreamedRunResultSync +from pydantic_ai.result import AgentStream, FinalResult, RunUsage from pydantic_ai.tools import DeferredToolRequests, DeferredToolResults, ToolApproved, ToolDefinition from pydantic_ai.usage import RequestUsage from pydantic_graph import End @@ -167,6 +167,9 @@ async def ret_a(x: str) -> str: ), ] ) + assert result.new_messages() == result.all_messages() + assert result.all_messages_json().startswith(b'[{"parts":[{"content":"Hello",') + assert result.new_messages_json().startswith(b'[{"parts":[{"content":"Hello",') assert result.usage() == snapshot( RunUsage( requests=2, @@ -179,6 +182,15 @@ async def ret_a(x: str) -> str: assert response == snapshot('{"ret_a":"a-apple"}') assert result.is_complete assert result.timestamp() == IsNow(tz=timezone.utc) + assert result.response == snapshot( + ModelResponse( + parts=[TextPart(content='{"ret_a":"a-apple"}')], + usage=RequestUsage(input_tokens=52, output_tokens=11), + model_name='test', + timestamp=IsDatetime(), + provider_name='test', + ) + ) assert result.all_messages() == snapshot( [ ModelRequest(parts=[UserPromptPart(content='Hello', timestamp=IsNow(tz=timezone.utc))]), @@ -2106,76 +2118,3 @@ async def text_stream(_messages: list[ModelMessage], agent_info: AgentInfo) -> A chunks.append(response_data) assert chunks == snapshot([[1], [1, 2, 3, 4], [1, 2, 3, 4], [1, 2, 3, 4]]) - - -def test_streamed_run_result_sync_wrapper(): - """Test that run_stream_sync returns StreamedRunResultSync with all sync methods.""" - m = TestModel(custom_output_text='The cat sat on the mat.') - agent = Agent(m) - - # Test that run_stream_sync returns StreamedRunResultSync directly - result = agent.run_stream_sync('Hello') - assert isinstance(result, StreamedRunResultSync) - - # Test all_messages - messages = result.all_messages() - assert len(messages) > 0 - - # Test all_messages_json - messages_json = result.all_messages_json() - assert isinstance(messages_json, bytes) - - # Test new_messages - new_messages = result.new_messages() - assert len(new_messages) > 0 - - # Test new_messages_json - new_messages_json = result.new_messages_json() - assert isinstance(new_messages_json, bytes) - - # Test stream_output - result = agent.run_stream_sync('Hello') - chunks: list[str] = [c for c in result.stream_output(debounce_by=None)] - assert chunks == snapshot( - ['The ', 'The cat ', 'The cat sat ', 'The cat sat on ', 'The cat sat on the ', 'The cat sat on the mat.'] - ) - - # Test stream_text - result = agent.run_stream_sync('Hello') - text_chunks: list[str] = [c for c in result.stream_text(debounce_by=None)] - assert text_chunks == snapshot( - [ - 'The ', - 'The cat ', - 'The cat sat ', - 'The cat sat on ', - 'The cat sat on the ', - 'The cat sat on the mat.', - ] - ) - - # Test stream_responses - result = agent.run_stream_sync('Hello') - responses = [c for c, _is_last in result.stream_responses(debounce_by=None)] - assert len(responses) > 0 - assert all(isinstance(r, ModelResponse) for r in responses) - - # Test get_output - result = agent.run_stream_sync('Hello') - output = result.get_output() - assert output == snapshot('The cat sat on the mat.') - - # Test properties - result = agent.run_stream_sync('Hello') - _ = result.get_output() - assert result.is_complete - assert isinstance(result.response, ModelResponse) - assert isinstance(result.usage(), RunUsage) - assert isinstance(result.timestamp(), datetime.datetime) - - # Test validate_response_output - result = agent.run_stream_sync('Hello') - _ = result.get_output() # Need to complete the stream first - response = result.response - validated = result.validate_response_output(response) - assert validated == snapshot('The cat sat on the mat.') From 83b46fdc0aeaaf80bb34fb0e800be18169fb5432 Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Sat, 1 Nov 2025 07:28:45 -0600 Subject: [PATCH 17/18] fix strict-no-cover --- tests/test_streaming.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index c0d3d94fc1..1a126f26dc 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -168,8 +168,6 @@ async def ret_a(x: str) -> str: ] ) assert result.new_messages() == result.all_messages() - assert result.all_messages_json().startswith(b'[{"parts":[{"content":"Hello",') - assert result.new_messages_json().startswith(b'[{"parts":[{"content":"Hello",') assert result.usage() == snapshot( RunUsage( requests=2, From cec5df6675482342df6811db2efbc7a4c158f18f Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Mon, 3 Nov 2025 14:51:57 -0600 Subject: [PATCH 18/18] Update docs/agents.md --- docs/agents.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/agents.md b/docs/agents.md index af7cc66a6b..3c6b1a6ef4 100644 --- a/docs/agents.md +++ b/docs/agents.md @@ -65,7 +65,7 @@ There are five ways to run an agent: 1. [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] — an async function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response. 2. [`agent.run_sync()`][pydantic_ai.agent.AbstractAgent.run_sync] — a plain, synchronous function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response (internally, this just calls `loop.run_until_complete(self.run())`). -3. [`agent.run_stream()`][pydantic_ai.agent.AbstractAgent.run_stream] — an async context manager which returns a [`StreamedRunResult`][pydantic_ai.result.StreamedRunResult], which contains methods to stream text and structured output as an async iterable. [`agent.run_stream_sync()`][pydantic_ai.agent.AbstractAgent.run_stream_sync] is a synchronous variation that returns the result directly. +3. [`agent.run_stream()`][pydantic_ai.agent.AbstractAgent.run_stream] — an async context manager which returns a [`StreamedRunResult`][pydantic_ai.result.StreamedRunResult], which contains methods to stream text and structured output as an async iterable. [`agent.run_stream_sync()`][pydantic_ai.agent.AbstractAgent.run_stream_sync] is a synchronous variation that returns a [`StreamedRunResultSync`][pydantic_ai.result.StreamedRunResultSync] with synchronous versions of the same methods. 4. [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] — a function which returns an async iterable of [`AgentStreamEvent`s][pydantic_ai.messages.AgentStreamEvent] and a [`AgentRunResultEvent`][pydantic_ai.run.AgentRunResultEvent] containing the final run result. 5. [`agent.iter()`][pydantic_ai.Agent.iter] — a context manager which returns an [`AgentRun`][pydantic_ai.agent.AgentRun], an async iterable over the nodes of the agent's underlying [`Graph`][pydantic_graph.graph.Graph].