Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions docs/running_agents.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,71 @@ Sessions automatically:

See the [Sessions documentation](sessions.md) for more details.


### Server-managed conversations

You can also let the OpenAI conversation state feature manage conversation state on the server side, instead of handling it locally with `to_input_list()` or `Sessions`. This allows you to preserve conversation history without manually resending all past messages. See the [OpenAI Conversation state guide](https://platform.openai.com/docs/guides/conversation-state?api-mode=responses) for more details.

OpenAI provides two ways to track state across turns:

#### 1. Using `conversation_id`

You first create a conversation using the OpenAI Conversations API and then reuse its ID for every subsequent call:

```python
from agents import Agent, Runner
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def main():
# Create a server-managed conversation
conversation = await client.conversations.create()
conv_id = conversation.id

agent = Agent(name="Assistant", instructions="Reply very concisely.")

# First turn
result1 = await Runner.run(agent, "What city is the Golden Gate Bridge in?", conversation_id=conv_id)
print(result1.final_output)
# San Francisco

# Second turn reuses the same conversation_id
result2 = await Runner.run(
agent,
"What state is it in?",
conversation_id=conv_id,
)
print(result2.final_output)
# California
```

#### 2. Using `previous_response_id`

Another option is **response chaining**, where each turn links explicitly to the response ID from the previous turn.

```python
from agents import Agent, Runner

async def main():
agent = Agent(name="Assistant", instructions="Reply very concisely.")

# First turn
result1 = await Runner.run(agent, "What city is the Golden Gate Bridge in?")
print(result1.final_output)
# San Francisco

# Second turn, chained to the previous response
result2 = await Runner.run(
agent,
"What state is it in?",
previous_response_id=result1.last_response_id,
)
print(result2.final_output)
# California
```


## Long running agents & human-in-the-loop

You can use the Agents SDK [Temporal](https://temporal.io/) integration to run durable, long-running workflows, including human-in-the-loop tasks. View a demo of Temporal and the Agents SDK working in action to complete long-running tasks [in this video](https://www.youtube.com/watch?v=fFBZqzT4DD8), and [view docs here](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/openai_agents).
Expand Down
120 changes: 102 additions & 18 deletions src/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,51 @@ class CallModelData(Generic[TContext]):
context: TContext | None


@dataclass
class _ServerConversationTracker:
"""Tracks server-side conversation state for either conversation_id or
previous_response_id modes."""

conversation_id: str | None = None
previous_response_id: str | None = None
sent_items: set[int] = field(default_factory=set)
server_items: set[int] = field(default_factory=set)

def track_server_items(self, model_response: ModelResponse) -> None:
for output_item in model_response.output:
self.server_items.add(id(output_item))

# Update previous_response_id only when using previous_response_id
if (
self.conversation_id is None
and self.previous_response_id is not None
and model_response.response_id is not None
):
self.previous_response_id = model_response.response_id

def prepare_input(
self,
original_input: str | list[TResponseInputItem],
generated_items: list[RunItem],
) -> list[TResponseInputItem]:
input_items: list[TResponseInputItem] = []

# On first call (when there are no generated items yet), include the original input
if not generated_items:
input_items.extend(ItemHelpers.input_to_new_input_list(original_input))

# Process generated_items, skip items already sent or from server
for item in generated_items:
raw_item_id = id(item.raw_item)

if raw_item_id in self.sent_items or raw_item_id in self.server_items:
continue
input_items.append(item.to_input_item())
self.sent_items.add(raw_item_id)

return input_items


# Type alias for the optional input filter callback
CallModelInputFilter = Callable[[CallModelData[Any]], MaybeAwaitable[ModelInputData]]

Expand Down Expand Up @@ -470,6 +515,13 @@ async def run(
if run_config is None:
run_config = RunConfig()

if conversation_id is not None or previous_response_id is not None:
server_conversation_tracker = _ServerConversationTracker(
conversation_id=conversation_id, previous_response_id=previous_response_id
)
else:
server_conversation_tracker = None

# Keep original user input separate from session-prepared input
original_user_input = input
prepared_input = await self._prepare_input_with_session(
Expand Down Expand Up @@ -563,8 +615,7 @@ async def run(
run_config=run_config,
should_run_agent_start_hooks=should_run_agent_start_hooks,
tool_use_tracker=tool_use_tracker,
previous_response_id=previous_response_id,
conversation_id=conversation_id,
server_conversation_tracker=server_conversation_tracker,
),
)
else:
Expand All @@ -578,15 +629,17 @@ async def run(
run_config=run_config,
should_run_agent_start_hooks=should_run_agent_start_hooks,
tool_use_tracker=tool_use_tracker,
previous_response_id=previous_response_id,
conversation_id=conversation_id,
server_conversation_tracker=server_conversation_tracker,
)
should_run_agent_start_hooks = False

model_responses.append(turn_result.model_response)
original_input = turn_result.original_input
generated_items = turn_result.generated_items

if server_conversation_tracker is not None:
server_conversation_tracker.track_server_items(turn_result.model_response)

# Collect tool guardrail results from this turn
tool_input_guardrail_results.extend(turn_result.tool_input_guardrail_results)
tool_output_guardrail_results.extend(turn_result.tool_output_guardrail_results)
Expand Down Expand Up @@ -863,6 +916,13 @@ async def _start_streaming(
should_run_agent_start_hooks = True
tool_use_tracker = AgentToolUseTracker()

if conversation_id is not None or previous_response_id is not None:
server_conversation_tracker = _ServerConversationTracker(
conversation_id=conversation_id, previous_response_id=previous_response_id
)
else:
server_conversation_tracker = None

streamed_result._event_queue.put_nowait(AgentUpdatedStreamEvent(new_agent=current_agent))

try:
Expand Down Expand Up @@ -938,8 +998,7 @@ async def _start_streaming(
should_run_agent_start_hooks,
tool_use_tracker,
all_tools,
previous_response_id,
conversation_id,
server_conversation_tracker,
)
should_run_agent_start_hooks = False

Expand All @@ -949,6 +1008,9 @@ async def _start_streaming(
streamed_result.input = turn_result.original_input
streamed_result.new_items = turn_result.generated_items

if server_conversation_tracker is not None:
server_conversation_tracker.track_server_items(turn_result.model_response)

if isinstance(turn_result.next_step, NextStepHandoff):
current_agent = turn_result.next_step.new_agent
current_span.finish(reset_current=True)
Expand Down Expand Up @@ -1032,8 +1094,7 @@ async def _run_single_turn_streamed(
should_run_agent_start_hooks: bool,
tool_use_tracker: AgentToolUseTracker,
all_tools: list[Tool],
previous_response_id: str | None,
conversation_id: str | None,
server_conversation_tracker: _ServerConversationTracker | None = None,
) -> SingleStepResult:
emitted_tool_call_ids: set[str] = set()

Expand Down Expand Up @@ -1064,8 +1125,13 @@ async def _run_single_turn_streamed(

final_response: ModelResponse | None = None

input = ItemHelpers.input_to_new_input_list(streamed_result.input)
input.extend([item.to_input_item() for item in streamed_result.new_items])
if server_conversation_tracker is not None:
input = server_conversation_tracker.prepare_input(
streamed_result.input, streamed_result.new_items
)
else:
input = ItemHelpers.input_to_new_input_list(streamed_result.input)
input.extend([item.to_input_item() for item in streamed_result.new_items])

# THIS IS THE RESOLVED CONFLICT BLOCK
filtered = await cls._maybe_filter_model_input(
Expand All @@ -1088,6 +1154,15 @@ async def _run_single_turn_streamed(
),
)

previous_response_id = (
server_conversation_tracker.previous_response_id
if server_conversation_tracker
else None
)
conversation_id = (
server_conversation_tracker.conversation_id if server_conversation_tracker else None
)

# 1. Stream the output events
async for event in model.stream_response(
filtered.instructions,
Expand Down Expand Up @@ -1219,8 +1294,7 @@ async def _run_single_turn(
run_config: RunConfig,
should_run_agent_start_hooks: bool,
tool_use_tracker: AgentToolUseTracker,
previous_response_id: str | None,
conversation_id: str | None,
server_conversation_tracker: _ServerConversationTracker | None = None,
) -> SingleStepResult:
# Ensure we run the hooks before anything else
if should_run_agent_start_hooks:
Expand All @@ -1240,8 +1314,11 @@ async def _run_single_turn(

output_schema = cls._get_output_schema(agent)
handoffs = await cls._get_handoffs(agent, context_wrapper)
input = ItemHelpers.input_to_new_input_list(original_input)
input.extend([generated_item.to_input_item() for generated_item in generated_items])
if server_conversation_tracker is not None:
input = server_conversation_tracker.prepare_input(original_input, generated_items)
else:
input = ItemHelpers.input_to_new_input_list(original_input)
input.extend([generated_item.to_input_item() for generated_item in generated_items])

new_response = await cls._get_new_response(
agent,
Expand All @@ -1254,8 +1331,7 @@ async def _run_single_turn(
context_wrapper,
run_config,
tool_use_tracker,
previous_response_id,
conversation_id,
server_conversation_tracker,
prompt_config,
)

Expand Down Expand Up @@ -1459,8 +1535,7 @@ async def _get_new_response(
context_wrapper: RunContextWrapper[TContext],
run_config: RunConfig,
tool_use_tracker: AgentToolUseTracker,
previous_response_id: str | None,
conversation_id: str | None,
server_conversation_tracker: _ServerConversationTracker | None,
prompt_config: ResponsePromptParam | None,
) -> ModelResponse:
# Allow user to modify model input right before the call, if configured
Expand Down Expand Up @@ -1491,6 +1566,15 @@ async def _get_new_response(
),
)

previous_response_id = (
server_conversation_tracker.previous_response_id
if server_conversation_tracker
else None
)
conversation_id = (
server_conversation_tracker.conversation_id if server_conversation_tracker else None
)

new_response = await model.get_response(
system_instructions=filtered.instructions,
input=filtered.input,
Expand Down
19 changes: 15 additions & 4 deletions tests/fake_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(
)
self.tracing_enabled = tracing_enabled
self.last_turn_args: dict[str, Any] = {}
self.first_turn_args: dict[str, Any] | None = None
self.hardcoded_usage: Usage | None = None

def set_hardcoded_usage(self, usage: Usage):
Expand Down Expand Up @@ -64,7 +65,7 @@ async def get_response(
conversation_id: str | None,
prompt: Any | None,
) -> ModelResponse:
self.last_turn_args = {
turn_args = {
"system_instructions": system_instructions,
"input": input,
"model_settings": model_settings,
Expand All @@ -74,6 +75,11 @@ async def get_response(
"conversation_id": conversation_id,
}

if self.first_turn_args is None:
self.first_turn_args = turn_args.copy()

self.last_turn_args = turn_args

with generation_span(disabled=not self.tracing_enabled) as span:
output = self.get_next_output()

Expand All @@ -92,7 +98,7 @@ async def get_response(
return ModelResponse(
output=output,
usage=self.hardcoded_usage or Usage(),
response_id=None,
response_id="resp-789",
)

async def stream_response(
Expand All @@ -109,7 +115,7 @@ async def stream_response(
conversation_id: str | None = None,
prompt: Any | None = None,
) -> AsyncIterator[TResponseStreamEvent]:
self.last_turn_args = {
turn_args = {
"system_instructions": system_instructions,
"input": input,
"model_settings": model_settings,
Expand All @@ -118,6 +124,11 @@ async def stream_response(
"previous_response_id": previous_response_id,
"conversation_id": conversation_id,
}

if self.first_turn_args is None:
self.first_turn_args = turn_args.copy()

self.last_turn_args = turn_args
with generation_span(disabled=not self.tracing_enabled) as span:
output = self.get_next_output()
if isinstance(output, Exception):
Expand Down Expand Up @@ -145,7 +156,7 @@ def get_response_obj(
usage: Usage | None = None,
) -> Response:
return Response(
id=response_id or "123",
id=response_id or "resp-789",
created_at=123,
model="test_model",
object="response",
Expand Down
Loading