diff --git a/docs/running_agents.md b/docs/running_agents.md index e51b109cf..5d7634999 100644 --- a/docs/running_agents.md +++ b/docs/running_agents.md @@ -121,6 +121,71 @@ Sessions automatically: See the [Sessions documentation](sessions.md) for more details. + +### Server-managed conversations + +You can also let the OpenAI conversation state feature manage conversation state on the server side, instead of handling it locally with `to_input_list()` or `Sessions`. This allows you to preserve conversation history without manually resending all past messages. See the [OpenAI Conversation state guide](https://platform.openai.com/docs/guides/conversation-state?api-mode=responses) for more details. + +OpenAI provides two ways to track state across turns: + +#### 1. Using `conversation_id` + +You first create a conversation using the OpenAI Conversations API and then reuse its ID for every subsequent call: + +```python +from agents import Agent, Runner +from openai import AsyncOpenAI + +client = AsyncOpenAI() + +async def main(): + # Create a server-managed conversation + conversation = await client.conversations.create() + conv_id = conversation.id + + agent = Agent(name="Assistant", instructions="Reply very concisely.") + + # First turn + result1 = await Runner.run(agent, "What city is the Golden Gate Bridge in?", conversation_id=conv_id) + print(result1.final_output) + # San Francisco + + # Second turn reuses the same conversation_id + result2 = await Runner.run( + agent, + "What state is it in?", + conversation_id=conv_id, + ) + print(result2.final_output) + # California +``` + +#### 2. Using `previous_response_id` + +Another option is **response chaining**, where each turn links explicitly to the response ID from the previous turn. + +```python +from agents import Agent, Runner + +async def main(): + agent = Agent(name="Assistant", instructions="Reply very concisely.") + + # First turn + result1 = await Runner.run(agent, "What city is the Golden Gate Bridge in?") + print(result1.final_output) + # San Francisco + + # Second turn, chained to the previous response + result2 = await Runner.run( + agent, + "What state is it in?", + previous_response_id=result1.last_response_id, + ) + print(result2.final_output) + # California +``` + + ## Long running agents & human-in-the-loop You can use the Agents SDK [Temporal](https://temporal.io/) integration to run durable, long-running workflows, including human-in-the-loop tasks. View a demo of Temporal and the Agents SDK working in action to complete long-running tasks [in this video](https://www.youtube.com/watch?v=fFBZqzT4DD8), and [view docs here](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/openai_agents). diff --git a/src/agents/run.py b/src/agents/run.py index b8f9dfebf..52d395a13 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -122,6 +122,51 @@ class CallModelData(Generic[TContext]): context: TContext | None +@dataclass +class _ServerConversationTracker: + """Tracks server-side conversation state for either conversation_id or + previous_response_id modes.""" + + conversation_id: str | None = None + previous_response_id: str | None = None + sent_items: set[int] = field(default_factory=set) + server_items: set[int] = field(default_factory=set) + + def track_server_items(self, model_response: ModelResponse) -> None: + for output_item in model_response.output: + self.server_items.add(id(output_item)) + + # Update previous_response_id only when using previous_response_id + if ( + self.conversation_id is None + and self.previous_response_id is not None + and model_response.response_id is not None + ): + self.previous_response_id = model_response.response_id + + def prepare_input( + self, + original_input: str | list[TResponseInputItem], + generated_items: list[RunItem], + ) -> list[TResponseInputItem]: + input_items: list[TResponseInputItem] = [] + + # On first call (when there are no generated items yet), include the original input + if not generated_items: + input_items.extend(ItemHelpers.input_to_new_input_list(original_input)) + + # Process generated_items, skip items already sent or from server + for item in generated_items: + raw_item_id = id(item.raw_item) + + if raw_item_id in self.sent_items or raw_item_id in self.server_items: + continue + input_items.append(item.to_input_item()) + self.sent_items.add(raw_item_id) + + return input_items + + # Type alias for the optional input filter callback CallModelInputFilter = Callable[[CallModelData[Any]], MaybeAwaitable[ModelInputData]] @@ -470,6 +515,13 @@ async def run( if run_config is None: run_config = RunConfig() + if conversation_id is not None or previous_response_id is not None: + server_conversation_tracker = _ServerConversationTracker( + conversation_id=conversation_id, previous_response_id=previous_response_id + ) + else: + server_conversation_tracker = None + # Keep original user input separate from session-prepared input original_user_input = input prepared_input = await self._prepare_input_with_session( @@ -563,8 +615,7 @@ async def run( run_config=run_config, should_run_agent_start_hooks=should_run_agent_start_hooks, tool_use_tracker=tool_use_tracker, - previous_response_id=previous_response_id, - conversation_id=conversation_id, + server_conversation_tracker=server_conversation_tracker, ), ) else: @@ -578,8 +629,7 @@ async def run( run_config=run_config, should_run_agent_start_hooks=should_run_agent_start_hooks, tool_use_tracker=tool_use_tracker, - previous_response_id=previous_response_id, - conversation_id=conversation_id, + server_conversation_tracker=server_conversation_tracker, ) should_run_agent_start_hooks = False @@ -587,6 +637,9 @@ async def run( original_input = turn_result.original_input generated_items = turn_result.generated_items + if server_conversation_tracker is not None: + server_conversation_tracker.track_server_items(turn_result.model_response) + # Collect tool guardrail results from this turn tool_input_guardrail_results.extend(turn_result.tool_input_guardrail_results) tool_output_guardrail_results.extend(turn_result.tool_output_guardrail_results) @@ -863,6 +916,13 @@ async def _start_streaming( should_run_agent_start_hooks = True tool_use_tracker = AgentToolUseTracker() + if conversation_id is not None or previous_response_id is not None: + server_conversation_tracker = _ServerConversationTracker( + conversation_id=conversation_id, previous_response_id=previous_response_id + ) + else: + server_conversation_tracker = None + streamed_result._event_queue.put_nowait(AgentUpdatedStreamEvent(new_agent=current_agent)) try: @@ -938,8 +998,7 @@ async def _start_streaming( should_run_agent_start_hooks, tool_use_tracker, all_tools, - previous_response_id, - conversation_id, + server_conversation_tracker, ) should_run_agent_start_hooks = False @@ -949,6 +1008,9 @@ async def _start_streaming( streamed_result.input = turn_result.original_input streamed_result.new_items = turn_result.generated_items + if server_conversation_tracker is not None: + server_conversation_tracker.track_server_items(turn_result.model_response) + if isinstance(turn_result.next_step, NextStepHandoff): current_agent = turn_result.next_step.new_agent current_span.finish(reset_current=True) @@ -1032,8 +1094,7 @@ async def _run_single_turn_streamed( should_run_agent_start_hooks: bool, tool_use_tracker: AgentToolUseTracker, all_tools: list[Tool], - previous_response_id: str | None, - conversation_id: str | None, + server_conversation_tracker: _ServerConversationTracker | None = None, ) -> SingleStepResult: emitted_tool_call_ids: set[str] = set() @@ -1064,8 +1125,13 @@ async def _run_single_turn_streamed( final_response: ModelResponse | None = None - input = ItemHelpers.input_to_new_input_list(streamed_result.input) - input.extend([item.to_input_item() for item in streamed_result.new_items]) + if server_conversation_tracker is not None: + input = server_conversation_tracker.prepare_input( + streamed_result.input, streamed_result.new_items + ) + else: + input = ItemHelpers.input_to_new_input_list(streamed_result.input) + input.extend([item.to_input_item() for item in streamed_result.new_items]) # THIS IS THE RESOLVED CONFLICT BLOCK filtered = await cls._maybe_filter_model_input( @@ -1088,6 +1154,15 @@ async def _run_single_turn_streamed( ), ) + previous_response_id = ( + server_conversation_tracker.previous_response_id + if server_conversation_tracker + else None + ) + conversation_id = ( + server_conversation_tracker.conversation_id if server_conversation_tracker else None + ) + # 1. Stream the output events async for event in model.stream_response( filtered.instructions, @@ -1219,8 +1294,7 @@ async def _run_single_turn( run_config: RunConfig, should_run_agent_start_hooks: bool, tool_use_tracker: AgentToolUseTracker, - previous_response_id: str | None, - conversation_id: str | None, + server_conversation_tracker: _ServerConversationTracker | None = None, ) -> SingleStepResult: # Ensure we run the hooks before anything else if should_run_agent_start_hooks: @@ -1240,8 +1314,11 @@ async def _run_single_turn( output_schema = cls._get_output_schema(agent) handoffs = await cls._get_handoffs(agent, context_wrapper) - input = ItemHelpers.input_to_new_input_list(original_input) - input.extend([generated_item.to_input_item() for generated_item in generated_items]) + if server_conversation_tracker is not None: + input = server_conversation_tracker.prepare_input(original_input, generated_items) + else: + input = ItemHelpers.input_to_new_input_list(original_input) + input.extend([generated_item.to_input_item() for generated_item in generated_items]) new_response = await cls._get_new_response( agent, @@ -1254,8 +1331,7 @@ async def _run_single_turn( context_wrapper, run_config, tool_use_tracker, - previous_response_id, - conversation_id, + server_conversation_tracker, prompt_config, ) @@ -1459,8 +1535,7 @@ async def _get_new_response( context_wrapper: RunContextWrapper[TContext], run_config: RunConfig, tool_use_tracker: AgentToolUseTracker, - previous_response_id: str | None, - conversation_id: str | None, + server_conversation_tracker: _ServerConversationTracker | None, prompt_config: ResponsePromptParam | None, ) -> ModelResponse: # Allow user to modify model input right before the call, if configured @@ -1491,6 +1566,15 @@ async def _get_new_response( ), ) + previous_response_id = ( + server_conversation_tracker.previous_response_id + if server_conversation_tracker + else None + ) + conversation_id = ( + server_conversation_tracker.conversation_id if server_conversation_tracker else None + ) + new_response = await model.get_response( system_instructions=filtered.instructions, input=filtered.input, diff --git a/tests/fake_model.py b/tests/fake_model.py index 7de629448..b38b3790a 100644 --- a/tests/fake_model.py +++ b/tests/fake_model.py @@ -34,6 +34,7 @@ def __init__( ) self.tracing_enabled = tracing_enabled self.last_turn_args: dict[str, Any] = {} + self.first_turn_args: dict[str, Any] | None = None self.hardcoded_usage: Usage | None = None def set_hardcoded_usage(self, usage: Usage): @@ -64,7 +65,7 @@ async def get_response( conversation_id: str | None, prompt: Any | None, ) -> ModelResponse: - self.last_turn_args = { + turn_args = { "system_instructions": system_instructions, "input": input, "model_settings": model_settings, @@ -74,6 +75,11 @@ async def get_response( "conversation_id": conversation_id, } + if self.first_turn_args is None: + self.first_turn_args = turn_args.copy() + + self.last_turn_args = turn_args + with generation_span(disabled=not self.tracing_enabled) as span: output = self.get_next_output() @@ -92,7 +98,7 @@ async def get_response( return ModelResponse( output=output, usage=self.hardcoded_usage or Usage(), - response_id=None, + response_id="resp-789", ) async def stream_response( @@ -109,7 +115,7 @@ async def stream_response( conversation_id: str | None = None, prompt: Any | None = None, ) -> AsyncIterator[TResponseStreamEvent]: - self.last_turn_args = { + turn_args = { "system_instructions": system_instructions, "input": input, "model_settings": model_settings, @@ -118,6 +124,11 @@ async def stream_response( "previous_response_id": previous_response_id, "conversation_id": conversation_id, } + + if self.first_turn_args is None: + self.first_turn_args = turn_args.copy() + + self.last_turn_args = turn_args with generation_span(disabled=not self.tracing_enabled) as span: output = self.get_next_output() if isinstance(output, Exception): @@ -145,7 +156,7 @@ def get_response_obj( usage: Usage | None = None, ) -> Response: return Response( - id=response_id or "123", + id=response_id or "resp-789", created_at=123, model="test_model", object="response", diff --git a/tests/test_agent_runner.py b/tests/test_agent_runner.py index 661afd6ef..dae68fc4c 100644 --- a/tests/test_agent_runner.py +++ b/tests/test_agent_runner.py @@ -704,7 +704,7 @@ async def test_multi_turn_previous_response_id_passed_between_runs(): assert model.last_turn_args.get("previous_response_id") is None await Runner.run(agent, input="test", previous_response_id="resp-test-123") - assert model.last_turn_args.get("previous_response_id") == "resp-test-123" + assert model.last_turn_args.get("previous_response_id") == "resp-789" @pytest.mark.asyncio @@ -750,7 +750,343 @@ async def test_previous_response_id_passed_between_runs_streamed_multi_turn(): async for _ in result.stream_events(): pass - assert model.last_turn_args.get("previous_response_id") == "resp-stream-test" + assert model.last_turn_args.get("previous_response_id") == "resp-789" + + +@pytest.mark.asyncio +async def test_conversation_id_only_sends_new_items_multi_turn(): + """Test that conversation_id mode only sends new items on subsequent turns.""" + model = FakeModel() + agent = Agent( + name="test", + model=model, + tools=[get_function_tool("test_func", "tool_result")], + ) + + model.add_multiple_turn_outputs( + [ + # First turn: a message and tool call + [get_text_message("a_message"), get_function_tool_call("test_func", '{"arg": "foo"}')], + # Second turn: another message and tool call + [get_text_message("b_message"), get_function_tool_call("test_func", '{"arg": "bar"}')], + # Third turn: final text message + [get_text_message("done")], + ] + ) + + result = await Runner.run(agent, input="user_message", conversation_id="conv-test-123") + assert result.final_output == "done" + + # Check the first call - it should include the original input since generated_items is empty + assert model.first_turn_args is not None + first_input = model.first_turn_args["input"] + + # First call should include the original user input + assert isinstance(first_input, list) + assert len(first_input) == 1 # Should contain the user message + + # The input should be the user message + user_message = first_input[0] + assert user_message.get("role") == "user" + assert user_message.get("content") == "user_message" + + # Check the input from the last turn (third turn after function execution) + last_input = model.last_turn_args["input"] + + # In conversation_id mode, the third turn should only contain the tool output + assert isinstance(last_input, list) + assert len(last_input) == 1 + + # The single item should be a tool result + tool_result_item = last_input[0] + assert tool_result_item.get("type") == "function_call_output" + assert tool_result_item.get("call_id") is not None + + +@pytest.mark.asyncio +async def test_conversation_id_only_sends_new_items_multi_turn_streamed(): + """Test that conversation_id mode only sends new items on subsequent turns (streamed mode).""" + model = FakeModel() + agent = Agent( + name="test", + model=model, + tools=[get_function_tool("test_func", "tool_result")], + ) + + model.add_multiple_turn_outputs( + [ + # First turn: a message and tool call + [get_text_message("a_message"), get_function_tool_call("test_func", '{"arg": "foo"}')], + # Second turn: another message and tool call + [get_text_message("b_message"), get_function_tool_call("test_func", '{"arg": "bar"}')], + # Third turn: final text message + [get_text_message("done")], + ] + ) + + result = Runner.run_streamed(agent, input="user_message", conversation_id="conv-test-123") + async for _ in result.stream_events(): + pass + + assert result.final_output == "done" + + # Check the first call - it should include the original input since generated_items is empty + assert model.first_turn_args is not None + first_input = model.first_turn_args["input"] + + # First call should include the original user input + assert isinstance(first_input, list) + assert len(first_input) == 1 # Should contain the user message + + # The input should be the user message + user_message = first_input[0] + assert user_message.get("role") == "user" + assert user_message.get("content") == "user_message" + + # Check the input from the last turn (third turn after function execution) + last_input = model.last_turn_args["input"] + + # In conversation_id mode, the third turn should only contain the tool output + assert isinstance(last_input, list) + assert len(last_input) == 1 + + # The single item should be a tool result + tool_result_item = last_input[0] + assert tool_result_item.get("type") == "function_call_output" + assert tool_result_item.get("call_id") is not None + + +@pytest.mark.asyncio +async def test_previous_response_id_only_sends_new_items_multi_turn(): + """Test that previous_response_id mode only sends new items and updates + previous_response_id between turns.""" + model = FakeModel() + agent = Agent( + name="test", + model=model, + tools=[get_function_tool("test_func", "tool_result")], + ) + + model.add_multiple_turn_outputs( + [ + # First turn: a message and tool call + [get_text_message("a_message"), get_function_tool_call("test_func", '{"arg": "foo"}')], + # Second turn: final text message + [get_text_message("done")], + ] + ) + + result = await Runner.run( + agent, input="user_message", previous_response_id="initial-response-123" + ) + assert result.final_output == "done" + + # Check the first call - it should include the original input since generated_items is empty + assert model.first_turn_args is not None + first_input = model.first_turn_args["input"] + + # First call should include the original user input + assert isinstance(first_input, list) + assert len(first_input) == 1 # Should contain the user message + + # The input should be the user message + user_message = first_input[0] + assert user_message.get("role") == "user" + assert user_message.get("content") == "user_message" + + # Check the input from the last turn (second turn after function execution) + last_input = model.last_turn_args["input"] + + # In previous_response_id mode, the third turn should only contain the tool output + assert isinstance(last_input, list) + assert len(last_input) == 1 # Only the function result + + # The single item should be a tool result + tool_result_item = last_input[0] + assert tool_result_item.get("type") == "function_call_output" + assert tool_result_item.get("call_id") is not None + + # Verify that previous_response_id is modified according to fake_model behavior + assert model.last_turn_args.get("previous_response_id") == "resp-789" + + +@pytest.mark.asyncio +async def test_previous_response_id_only_sends_new_items_multi_turn_streamed(): + """Test that previous_response_id mode only sends new items and updates + previous_response_id between turns (streamed mode).""" + model = FakeModel() + agent = Agent( + name="test", + model=model, + tools=[get_function_tool("test_func", "tool_result")], + ) + + model.add_multiple_turn_outputs( + [ + # First turn: a message and tool call + [get_text_message("a_message"), get_function_tool_call("test_func", '{"arg": "foo"}')], + # Second turn: final text message + [get_text_message("done")], + ] + ) + + result = Runner.run_streamed( + agent, input="user_message", previous_response_id="initial-response-123" + ) + async for _ in result.stream_events(): + pass + + assert result.final_output == "done" + + # Check the first call - it should include the original input since generated_items is empty + assert model.first_turn_args is not None + first_input = model.first_turn_args["input"] + + # First call should include the original user input + assert isinstance(first_input, list) + assert len(first_input) == 1 # Should contain the user message + + # The input should be the user message + user_message = first_input[0] + assert user_message.get("role") == "user" + assert user_message.get("content") == "user_message" + + # Check the input from the last turn (second turn after function execution) + last_input = model.last_turn_args["input"] + + # In previous_response_id mode, the third turn should only contain the tool output + assert isinstance(last_input, list) + assert len(last_input) == 1 # Only the function result + + # The single item should be a tool result + tool_result_item = last_input[0] + assert tool_result_item.get("type") == "function_call_output" + assert tool_result_item.get("call_id") is not None + + # Verify that previous_response_id is modified according to fake_model behavior + assert model.last_turn_args.get("previous_response_id") == "resp-789" + + +@pytest.mark.asyncio +async def test_default_send_all_items(): + """Test that without conversation_id or previous_response_id, all items are sent.""" + model = FakeModel() + agent = Agent( + name="test", + model=model, + tools=[get_function_tool("test_func", "tool_result")], + ) + + model.add_multiple_turn_outputs( + [ + # First turn: a message and tool call + [get_text_message("a_message"), get_function_tool_call("test_func", '{"arg": "foo"}')], + # Second turn: final text message + [get_text_message("done")], + ] + ) + + result = await Runner.run( + agent, input="user_message" + ) # No conversation_id or previous_response_id + assert result.final_output == "done" + + # Check the input from the last turn (second turn after function execution) + last_input = model.last_turn_args["input"] + + # In defaul, the second turn should contain ALL items: + # 1. Original user message + # 2. Assistant response message + # 3. Function call + # 4. Function result + assert isinstance(last_input, list) + assert ( + len(last_input) == 4 + ) # User message + assistant message + function call + function result + + # Verify the items are in the expected order + user_message = last_input[0] + assistant_message = last_input[1] + function_call = last_input[2] + function_result = last_input[3] + + # Check user message + assert user_message.get("role") == "user" + assert user_message.get("content") == "user_message" + + # Check assistant message + assert assistant_message.get("role") == "assistant" + + # Check function call + assert function_call.get("name") == "test_func" + assert function_call.get("arguments") == '{"arg": "foo"}' + + # Check function result + assert function_result.get("type") == "function_call_output" + assert function_result.get("call_id") is not None + + +@pytest.mark.asyncio +async def test_default_send_all_items_streamed(): + """Test that without conversation_id or previous_response_id, all items are sent + (streamed mode).""" + model = FakeModel() + agent = Agent( + name="test", + model=model, + tools=[get_function_tool("test_func", "tool_result")], + ) + + model.add_multiple_turn_outputs( + [ + # First turn: a message and tool call + [get_text_message("a_message"), get_function_tool_call("test_func", '{"arg": "foo"}')], + # Second turn: final text message + [get_text_message("done")], + ] + ) + + result = Runner.run_streamed( + agent, input="user_message" + ) # No conversation_id or previous_response_id + async for _ in result.stream_events(): + pass + + assert result.final_output == "done" + + # Check the input from the last turn (second turn after function execution) + last_input = model.last_turn_args["input"] + + # In default mode, the second turn should contain ALL items: + # 1. Original user message + # 2. Assistant response message + # 3. Function call + # 4. Function result + assert isinstance(last_input, list) + assert ( + len(last_input) == 4 + ) # User message + assistant message + function call + function result + + # Verify the items are in the expected order + user_message = last_input[0] + assistant_message = last_input[1] + function_call = last_input[2] + function_result = last_input[3] + + # Check user message + assert user_message.get("role") == "user" + assert user_message.get("content") == "user_message" + + # Check assistant message + assert assistant_message.get("role") == "assistant" + + # Check function call + assert function_call.get("name") == "test_func" + assert function_call.get("arguments") == '{"arg": "foo"}' + + # Check function result + assert function_result.get("type") == "function_call_output" + assert function_result.get("call_id") is not None @pytest.mark.asyncio