From 84f55ba0750b0b71e05f33924e50411ad23c6856 Mon Sep 17 00:00:00 2001 From: habema Date: Tue, 29 Jul 2025 18:19:21 +0300 Subject: [PATCH 1/7] emit tool call output items immediately --- src/agents/run.py | 78 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 74 insertions(+), 4 deletions(-) diff --git a/src/agents/run.py b/src/agents/run.py index 2dd9524bb..38807685f 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -6,7 +6,21 @@ from dataclasses import dataclass, field from typing import Any, Generic, cast -from openai.types.responses import ResponseCompletedEvent +from openai.types.responses import ( + ResponseCompletedEvent, + ResponseComputerToolCall, + ResponseFileSearchToolCall, + ResponseFunctionToolCall, + ResponseOutputItemAddedEvent, +) +from openai.types.responses.response_code_interpreter_tool_call import ( + ResponseCodeInterpreterToolCall, +) +from openai.types.responses.response_output_item import ( + ImageGenerationCall, + LocalShellCall, + McpCall, +) from openai.types.responses.response_prompt_param import ( ResponsePromptParam, ) @@ -41,7 +55,7 @@ OutputGuardrailResult, ) from .handoffs import Handoff, HandoffInputFilter, handoff -from .items import ItemHelpers, ModelResponse, RunItem, TResponseInputItem +from .items import ItemHelpers, ModelResponse, RunItem, ToolCallItem, TResponseInputItem from .lifecycle import RunHooks from .logger import logger from .memory import Session @@ -50,7 +64,7 @@ from .models.multi_provider import MultiProvider from .result import RunResult, RunResultStreaming from .run_context import RunContextWrapper, TContext -from .stream_events import AgentUpdatedStreamEvent, RawResponsesStreamEvent +from .stream_events import AgentUpdatedStreamEvent, RawResponsesStreamEvent, RunItemStreamEvent from .tool import Tool from .tracing import Span, SpanError, agent_span, get_current_trace, trace from .tracing.span_data import AgentSpanData @@ -833,6 +847,10 @@ async def _run_single_turn_streamed( all_tools: list[Tool], previous_response_id: str | None, ) -> SingleStepResult: + # Track tool call IDs we've already emitted to avoid duplicates when we later + # enqueue all items at the end of the turn. + emitted_tool_call_ids: set[str] = set() + if should_run_agent_start_hooks: await asyncio.gather( hooks.on_agent_start(context_wrapper, agent), @@ -877,6 +895,8 @@ async def _run_single_turn_streamed( previous_response_id=previous_response_id, prompt=prompt_config, ): + # 1. If the event signals the end of the assistant response, remember it so we can + # process the full response after the streaming loop. if isinstance(event, ResponseCompletedEvent): usage = ( Usage( @@ -897,6 +917,34 @@ async def _run_single_turn_streamed( ) context_wrapper.usage.add(usage) + # 2. Detect tool call output-item additions **while** the model is still streaming. + # Emit a high-level RunItemStreamEvent so UIs can react immediately. + if isinstance(event, ResponseOutputItemAddedEvent): + output_item = event.item + + if isinstance( + output_item, + ( + ResponseFunctionToolCall, + ResponseFileSearchToolCall, + ResponseComputerToolCall, + ResponseCodeInterpreterToolCall, + ImageGenerationCall, + LocalShellCall, + McpCall, + ), + ): + call_id = getattr(output_item, "call_id", getattr(output_item, "id", None)) + + if call_id not in emitted_tool_call_ids: + emitted_tool_call_ids.add(call_id) + + tool_item = ToolCallItem(raw_item=output_item, agent=agent) + streamed_result._event_queue.put_nowait( + RunItemStreamEvent(item=tool_item, name="tool_called") + ) + + # Always forward the raw event. streamed_result._event_queue.put_nowait(RawResponsesStreamEvent(data=event)) # 2. At this point, the streaming is complete for this turn of the agent loop. @@ -918,7 +966,29 @@ async def _run_single_turn_streamed( tool_use_tracker=tool_use_tracker, ) - RunImpl.stream_step_result_to_queue(single_step_result, streamed_result._event_queue) + # Remove tool_called items we've already emitted during streaming to avoid duplicates. + if emitted_tool_call_ids: + import dataclasses as _dc # local import to avoid polluting module namespace + + filtered_items = [ + item + for item in single_step_result.new_step_items + if not ( + isinstance(item, ToolCallItem) + and getattr(item.raw_item, "call_id", getattr(item.raw_item, "id", None)) + in emitted_tool_call_ids + ) + ] + + single_step_result_filtered = _dc.replace( + single_step_result, new_step_items=filtered_items + ) + + RunImpl.stream_step_result_to_queue( + single_step_result_filtered, streamed_result._event_queue + ) + else: + RunImpl.stream_step_result_to_queue(single_step_result, streamed_result._event_queue) return single_step_result @classmethod From 43909a8dc2bea445eeb4bdafc093b4e3fc456f19 Mon Sep 17 00:00:00 2001 From: habema Date: Tue, 29 Jul 2025 18:30:06 +0300 Subject: [PATCH 2/7] use `ToolCallItemTypes`'s args better maintainability --- src/agents/run.py | 37 ++++++++++++------------------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/src/agents/run.py b/src/agents/run.py index 38807685f..c11c8c8db 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -4,23 +4,12 @@ import copy import inspect from dataclasses import dataclass, field -from typing import Any, Generic, cast +from typing import Any, Generic, cast, get_args from openai.types.responses import ( ResponseCompletedEvent, - ResponseComputerToolCall, - ResponseFileSearchToolCall, - ResponseFunctionToolCall, ResponseOutputItemAddedEvent, ) -from openai.types.responses.response_code_interpreter_tool_call import ( - ResponseCodeInterpreterToolCall, -) -from openai.types.responses.response_output_item import ( - ImageGenerationCall, - LocalShellCall, - McpCall, -) from openai.types.responses.response_prompt_param import ( ResponsePromptParam, ) @@ -55,7 +44,14 @@ OutputGuardrailResult, ) from .handoffs import Handoff, HandoffInputFilter, handoff -from .items import ItemHelpers, ModelResponse, RunItem, ToolCallItem, TResponseInputItem +from .items import ( + ItemHelpers, + ModelResponse, + RunItem, + ToolCallItem, + ToolCallItemTypes, + TResponseInputItem, +) from .lifecycle import RunHooks from .logger import logger from .memory import Session @@ -922,18 +918,7 @@ async def _run_single_turn_streamed( if isinstance(event, ResponseOutputItemAddedEvent): output_item = event.item - if isinstance( - output_item, - ( - ResponseFunctionToolCall, - ResponseFileSearchToolCall, - ResponseComputerToolCall, - ResponseCodeInterpreterToolCall, - ImageGenerationCall, - LocalShellCall, - McpCall, - ), - ): + if isinstance(output_item, _TOOL_CALL_TYPES): call_id = getattr(output_item, "call_id", getattr(output_item, "id", None)) if call_id not in emitted_tool_call_ids: @@ -1310,3 +1295,5 @@ async def _save_result_to_session( DEFAULT_AGENT_RUNNER = AgentRunner() + +_TOOL_CALL_TYPES: tuple[type, ...] = get_args(ToolCallItemTypes) From 696b1b1792a19d208c1077b7632dd449b639d303 Mon Sep 17 00:00:00 2001 From: habema Date: Tue, 29 Jul 2025 18:54:48 +0300 Subject: [PATCH 3/7] clean up comments --- src/agents/run.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/src/agents/run.py b/src/agents/run.py index c11c8c8db..12edc4081 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -843,8 +843,6 @@ async def _run_single_turn_streamed( all_tools: list[Tool], previous_response_id: str | None, ) -> SingleStepResult: - # Track tool call IDs we've already emitted to avoid duplicates when we later - # enqueue all items at the end of the turn. emitted_tool_call_ids: set[str] = set() if should_run_agent_start_hooks: @@ -891,8 +889,6 @@ async def _run_single_turn_streamed( previous_response_id=previous_response_id, prompt=prompt_config, ): - # 1. If the event signals the end of the assistant response, remember it so we can - # process the full response after the streaming loop. if isinstance(event, ResponseCompletedEvent): usage = ( Usage( @@ -913,8 +909,6 @@ async def _run_single_turn_streamed( ) context_wrapper.usage.add(usage) - # 2. Detect tool call output-item additions **while** the model is still streaming. - # Emit a high-level RunItemStreamEvent so UIs can react immediately. if isinstance(event, ResponseOutputItemAddedEvent): output_item = event.item @@ -929,10 +923,8 @@ async def _run_single_turn_streamed( RunItemStreamEvent(item=tool_item, name="tool_called") ) - # Always forward the raw event. streamed_result._event_queue.put_nowait(RawResponsesStreamEvent(data=event)) - # 2. At this point, the streaming is complete for this turn of the agent loop. if not final_response: raise ModelBehaviorError("Model did not produce a final response!") @@ -951,9 +943,8 @@ async def _run_single_turn_streamed( tool_use_tracker=tool_use_tracker, ) - # Remove tool_called items we've already emitted during streaming to avoid duplicates. if emitted_tool_call_ids: - import dataclasses as _dc # local import to avoid polluting module namespace + import dataclasses as _dc filtered_items = [ item From 81d7d9af0a47983c82575a4b8298b13e1f7deb2d Mon Sep 17 00:00:00 2001 From: habema Date: Tue, 29 Jul 2025 19:08:29 +0300 Subject: [PATCH 4/7] resolve typecheck test problems --- src/agents/run.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/agents/run.py b/src/agents/run.py index 12edc4081..397dff223 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -913,12 +913,17 @@ async def _run_single_turn_streamed( output_item = event.item if isinstance(output_item, _TOOL_CALL_TYPES): - call_id = getattr(output_item, "call_id", getattr(output_item, "id", None)) + call_id: str | None = getattr( + output_item, "call_id", getattr(output_item, "id", None) + ) - if call_id not in emitted_tool_call_ids: + if call_id and call_id not in emitted_tool_call_ids: emitted_tool_call_ids.add(call_id) - tool_item = ToolCallItem(raw_item=output_item, agent=agent) + tool_item = ToolCallItem( + raw_item=cast(ToolCallItemTypes, output_item), + agent=agent, + ) streamed_result._event_queue.put_nowait( RunItemStreamEvent(item=tool_item, name="tool_called") ) @@ -951,8 +956,12 @@ async def _run_single_turn_streamed( for item in single_step_result.new_step_items if not ( isinstance(item, ToolCallItem) - and getattr(item.raw_item, "call_id", getattr(item.raw_item, "id", None)) - in emitted_tool_call_ids + and ( + call_id := getattr( + item.raw_item, "call_id", getattr(item.raw_item, "id", None) + ) + ) + and call_id in emitted_tool_call_ids ) ] From 61aa9f9224942a50d718d6efdf6c5e965ef355d1 Mon Sep 17 00:00:00 2001 From: habema Date: Mon, 11 Aug 2025 16:35:35 +0300 Subject: [PATCH 5/7] fix format --- examples/basic/stream_function_call_args.py | 11 ++++------- examples/customer_service/main.py | 10 ++++++++-- src/agents/handoffs.py | 6 +++--- src/agents/model_settings.py | 1 - src/agents/tracing/processors.py | 4 ++-- tests/test_agent_clone_shallow_copy.py | 5 ++--- tests/test_stream_events.py | 1 + 7 files changed, 20 insertions(+), 18 deletions(-) diff --git a/examples/basic/stream_function_call_args.py b/examples/basic/stream_function_call_args.py index 46e72896c..3c3538772 100644 --- a/examples/basic/stream_function_call_args.py +++ b/examples/basic/stream_function_call_args.py @@ -35,7 +35,7 @@ async def main(): result = Runner.run_streamed( agent, - input="Create a Python web project called 'my-app' with FastAPI. Version 1.0.0, dependencies: fastapi, uvicorn" + input="Create a Python web project called 'my-app' with FastAPI. Version 1.0.0, dependencies: fastapi, uvicorn", ) # Track function calls for detailed output @@ -50,10 +50,7 @@ async def main(): function_name = getattr(event.data.item, "name", "unknown") call_id = getattr(event.data.item, "call_id", "unknown") - function_calls[call_id] = { - 'name': function_name, - 'arguments': "" - } + function_calls[call_id] = {"name": function_name, "arguments": ""} current_active_call_id = call_id print(f"\n📞 Function call streaming started: {function_name}()") print("📝 Arguments building...") @@ -61,12 +58,12 @@ async def main(): # Real-time argument streaming elif isinstance(event.data, ResponseFunctionCallArgumentsDeltaEvent): if current_active_call_id and current_active_call_id in function_calls: - function_calls[current_active_call_id]['arguments'] += event.data.delta + function_calls[current_active_call_id]["arguments"] += event.data.delta print(event.data.delta, end="", flush=True) # Function call completed elif event.data.type == "response.output_item.done": - if hasattr(event.data.item, 'call_id'): + if hasattr(event.data.item, "call_id"): call_id = getattr(event.data.item, "call_id", "unknown") if call_id in function_calls: function_info = function_calls[call_id] diff --git a/examples/customer_service/main.py b/examples/customer_service/main.py index 8ed218536..266a7e611 100644 --- a/examples/customer_service/main.py +++ b/examples/customer_service/main.py @@ -40,7 +40,10 @@ class AirlineAgentContext(BaseModel): ) async def faq_lookup_tool(question: str) -> str: question_lower = question.lower() - if any(keyword in question_lower for keyword in ["bag", "baggage", "luggage", "carry-on", "hand luggage", "hand carry"]): + if any( + keyword in question_lower + for keyword in ["bag", "baggage", "luggage", "carry-on", "hand luggage", "hand carry"] + ): return ( "You are allowed to bring one bag on the plane. " "It must be under 50 pounds and 22 inches x 14 inches x 9 inches." @@ -52,7 +55,10 @@ async def faq_lookup_tool(question: str) -> str: "Exit rows are rows 4 and 16. " "Rows 5-8 are Economy Plus, with extra legroom. " ) - elif any(keyword in question_lower for keyword in ["wifi", "internet", "wireless", "connectivity", "network", "online"]): + elif any( + keyword in question_lower + for keyword in ["wifi", "internet", "wireless", "connectivity", "network", "online"] + ): return "We have free wifi on the plane, join Airline-Wifi" return "I'm sorry, I don't know the answer to that question." diff --git a/src/agents/handoffs.py b/src/agents/handoffs.py index 4d70f6058..2c52737ad 100644 --- a/src/agents/handoffs.py +++ b/src/agents/handoffs.py @@ -119,9 +119,9 @@ class Handoff(Generic[TContext, TAgent]): True, as it increases the likelihood of correct JSON input. """ - is_enabled: bool | Callable[ - [RunContextWrapper[Any], AgentBase[Any]], MaybeAwaitable[bool] - ] = True + is_enabled: bool | Callable[[RunContextWrapper[Any], AgentBase[Any]], MaybeAwaitable[bool]] = ( + True + ) """Whether the handoff is enabled. Either a bool or a Callable that takes the run context and agent and returns whether the handoff is enabled. You can use this to dynamically enable/disable a handoff based on your context/state.""" diff --git a/src/agents/model_settings.py b/src/agents/model_settings.py index 71e66ed84..f76d64266 100644 --- a/src/agents/model_settings.py +++ b/src/agents/model_settings.py @@ -55,7 +55,6 @@ class MCPToolChoice: ToolChoice: TypeAlias = Union[Literal["auto", "required", "none"], str, MCPToolChoice, None] - @dataclass class ModelSettings: """Settings to use when calling an LLM. diff --git a/src/agents/tracing/processors.py b/src/agents/tracing/processors.py index 32fd290ec..126c71498 100644 --- a/src/agents/tracing/processors.py +++ b/src/agents/tracing/processors.py @@ -70,8 +70,8 @@ def set_api_key(self, api_key: str): client. """ # Clear the cached property if it exists - if 'api_key' in self.__dict__: - del self.__dict__['api_key'] + if "api_key" in self.__dict__: + del self.__dict__["api_key"] # Update the private attribute self._api_key = api_key diff --git a/tests/test_agent_clone_shallow_copy.py b/tests/test_agent_clone_shallow_copy.py index fdf9e0247..44b41bd3d 100644 --- a/tests/test_agent_clone_shallow_copy.py +++ b/tests/test_agent_clone_shallow_copy.py @@ -5,6 +5,7 @@ def greet(name: str) -> str: return f"Hello, {name}!" + def test_agent_clone_shallow_copy(): """Test that clone creates shallow copy with tools.copy() workaround""" target_agent = Agent(name="Target") @@ -16,9 +17,7 @@ def test_agent_clone_shallow_copy(): ) cloned = original.clone( - name="Cloned", - tools=original.tools.copy(), - handoffs=original.handoffs.copy() + name="Cloned", tools=original.tools.copy(), handoffs=original.handoffs.copy() ) # Basic assertions diff --git a/tests/test_stream_events.py b/tests/test_stream_events.py index 11feb9fe0..0f85b63f8 100644 --- a/tests/test_stream_events.py +++ b/tests/test_stream_events.py @@ -14,6 +14,7 @@ async def foo() -> str: await asyncio.sleep(3) return "success!" + @pytest.mark.asyncio async def test_stream_events_main(): model = FakeModel() From 6826905bbced3e5f926af75b3dbc92bebfb8fae1 Mon Sep 17 00:00:00 2001 From: habema Date: Mon, 11 Aug 2025 16:37:36 +0300 Subject: [PATCH 6/7] fix mypy --- examples/realtime/app/server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/realtime/app/server.py b/examples/realtime/app/server.py index 73fcf3e56..4ed05c9b2 100644 --- a/examples/realtime/app/server.py +++ b/examples/realtime/app/server.py @@ -4,7 +4,8 @@ import logging import struct from contextlib import asynccontextmanager -from typing import TYPE_CHECKING, Any, assert_never +from typing import TYPE_CHECKING, Any +from typing_extensions import assert_never from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import FileResponse From 4200b5109045099f56df1907a8f769c47707810a Mon Sep 17 00:00:00 2001 From: habema Date: Mon, 11 Aug 2025 16:38:14 +0300 Subject: [PATCH 7/7] refix format --- examples/realtime/app/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/realtime/app/server.py b/examples/realtime/app/server.py index 4ed05c9b2..04f3def43 100644 --- a/examples/realtime/app/server.py +++ b/examples/realtime/app/server.py @@ -5,11 +5,11 @@ import struct from contextlib import asynccontextmanager from typing import TYPE_CHECKING, Any -from typing_extensions import assert_never from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles +from typing_extensions import assert_never from agents.realtime import RealtimeRunner, RealtimeSession, RealtimeSessionEvent