Skip to content

Conversation

@ajac-zero
Copy link

@ajac-zero ajac-zero commented Oct 12, 2025

Fixes #3005.

This PR tries implementing a possible solution to #3005 by adding a new run_stream_sync method that uses run_stream under the hood, based on the run_sync implementation that uses run internally.

Currently, it uses an 'eager' approach by loading the async iterable upfront and then providing normal iterable to access the stream. This has an impact on latency as it is not true streaming, but it avoids some opaque race conditions that can appear in a 'lazy' implementation.

I'd like to get some feedback on whether this approach is worthwhile or if it would be better to try and get a working 'lazy' implementation. My thinking is that, if latency is a concern, it would probably be best to point users towards the async version regardless, leaving the sync methods as conveniences for certain cases.

from pydantic_ai import Agent

agent = Agent('openrouter:openai/gpt-4o')

with agent.run_stream_sync('What are the capitals of France and Germany?') as response:
    for text in response.stream_text():
        print(text)

# The capital of France is Paris, and the capital
# The capital of France is Paris, and the capital of Germany is Berlin
# The capital of France is Paris, and the capital of Germany is Berlin.

@DouweM
Copy link
Collaborator

DouweM commented Oct 13, 2025

I'd like to get some feedback on whether this approach is worthwhile or if it would be better to try and get a working 'lazy' implementation. My thinking is that, if latency is a concern, it would probably be best to point users towards the async version regardless, leaving the sync methods as conveniences for certain cases.

@ajac-zero Thanks for working on this! I do think that streaming means people want to see the value as they come in, even when running sync code -- it'll just block until the next chunk is received, or buffer chunks if sync processing takes longer than the next chunk comes in. Otherwise, there's not much point in using this new method over just run_sync without streaming.

@ajac-zero
Copy link
Author

@DouweM I took a shot at true streaming implementation, and so far so good. What do you think?



@dataclass(init=False)
class SyncStreamedRunResult(StreamedRunResult[AgentDepsT, OutputDataT]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you seen pydantic_ai.direct.StreamedResponseSync? I wonder if there's some overlap we could factor out.

"""Create a 'SyncStreamedRunResult' from an existing 'StreamedRunResult'."""
instance = cls.__new__(cls)

instance._all_messages = streamed_run_result._all_messages
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love copying the private values here, even if this is a subclass...

Would it help to add a new class, or to add _sync methods to StreamedRunResult directly?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @DouweM. I removed the new class in favor of _sync methods in StreamedRunResult. The public API changes a bit more but the code is much cleaner.

builtin_tools=builtin_tools,
event_stream_handler=event_stream_handler,
)
yield get_event_loop().run_until_complete(async_cm.__aenter__())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's sync, do we need to yield or could we just return?

else:
raise ValueError('No stream response or run result provided') # pragma: no cover

def stream_output_sync(self, *, debounce_by: float | None = 0.1) -> Iterator[OutputDataT]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add this note that's in the agent.run_sync docstring to all these methods:

This is a convenience method that wraps [`self.X`][pydantic_ai.result.StreamedRunResult.X] with `loop.run_until_complete(...)`.
        You therefore can't use this method inside async code or if there's an active event loop.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also to run_stream_sync

raise exceptions.AgentRunError('Agent run finished without producing a final result') # pragma: no cover

@contextmanager
def run_stream_sync(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need all the overloads we have for run_stream itself.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also let's mention this in agents.md where we also introduce run_sync and run_stream. It doesn't need a new list item, but can be at the end of the run_stream explanation.

@DouweM DouweM changed the title Add 'run_stream_sync' method Add Agent.run_stream_sync method and sync convenience methods on StreamedRunResult Oct 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add sync iter and stream methods

2 participants