-
Notifications
You must be signed in to change notification settings - Fork 2.9k
fix: #2008 Fix agent memory leak using weakref #2014
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
|
|
||
| import abc | ||
| import asyncio | ||
| import weakref | ||
| from collections.abc import AsyncIterator | ||
| from dataclasses import dataclass, field | ||
| from typing import TYPE_CHECKING, Any, Literal, cast | ||
|
|
@@ -74,6 +75,32 @@ class RunResultBase(abc.ABC): | |
| def last_agent(self) -> Agent[Any]: | ||
| """The last agent that was run.""" | ||
|
|
||
| def release_agents(self) -> None: | ||
| """ | ||
| Release strong references to agents held by this result. After calling this method, | ||
| accessing `item.agent` or `last_agent` may return `None` if the agent has been garbage | ||
| collected. Callers can use this when they are done inspecting the result and want to | ||
| eagerly drop any associated agent graph. | ||
| """ | ||
| for item in self.new_items: | ||
| release = getattr(item, "release_agent", None) | ||
| if callable(release): | ||
| release() | ||
| self._release_last_agent_reference() | ||
|
|
||
| def __del__(self) -> None: | ||
| try: | ||
| # Fall back to releasing agents automatically in case the caller never invoked | ||
| # `release_agents()` explicitly. This keeps the no-leak guarantee confirmed by tests. | ||
| self.release_agents() | ||
|
||
| except Exception: | ||
| # Avoid raising from __del__. | ||
| pass | ||
|
|
||
| @abc.abstractmethod | ||
| def _release_last_agent_reference(self) -> None: | ||
| """Release stored agent reference specific to the concrete result type.""" | ||
|
|
||
| def final_output_as(self, cls: type[T], raise_if_incorrect_type: bool = False) -> T: | ||
| """A convenience method to cast the final output to a specific type. By default, the cast | ||
| is only for the typechecker. If you set `raise_if_incorrect_type` to True, we'll raise a | ||
|
|
@@ -111,11 +138,33 @@ def last_response_id(self) -> str | None: | |
| @dataclass | ||
| class RunResult(RunResultBase): | ||
| _last_agent: Agent[Any] | ||
| _last_agent_ref: weakref.ReferenceType[Agent[Any]] | None = field( | ||
| init=False, | ||
| repr=False, | ||
| default=None, | ||
| ) | ||
|
|
||
| def __post_init__(self) -> None: | ||
| self._last_agent_ref = weakref.ref(self._last_agent) | ||
|
|
||
| @property | ||
| def last_agent(self) -> Agent[Any]: | ||
| """The last agent that was run.""" | ||
| return self._last_agent | ||
| agent = cast("Agent[Any] | None", self.__dict__.get("_last_agent")) | ||
| if agent is not None: | ||
| return agent | ||
| if self._last_agent_ref: | ||
| agent = self._last_agent_ref() | ||
| if agent is not None: | ||
| return agent | ||
| raise AgentsException("Last agent reference is no longer available.") | ||
|
|
||
| def _release_last_agent_reference(self) -> None: | ||
| agent = cast("Agent[Any] | None", self.__dict__.get("_last_agent")) | ||
| if agent is None: | ||
| return | ||
| self._last_agent_ref = weakref.ref(agent) | ||
| object.__delattr__(self, "_last_agent") | ||
|
|
||
| def __str__(self) -> str: | ||
| return pretty_print_result(self) | ||
|
|
@@ -150,6 +199,12 @@ class RunResultStreaming(RunResultBase): | |
| is_complete: bool = False | ||
| """Whether the agent has finished running.""" | ||
|
|
||
| _current_agent_ref: weakref.ReferenceType[Agent[Any]] | None = field( | ||
| init=False, | ||
| repr=False, | ||
| default=None, | ||
| ) | ||
|
|
||
| # Queues that the background run_loop writes to | ||
| _event_queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel] = field( | ||
| default_factory=asyncio.Queue, repr=False | ||
|
|
@@ -167,12 +222,29 @@ class RunResultStreaming(RunResultBase): | |
| # Soft cancel state | ||
| _cancel_mode: Literal["none", "immediate", "after_turn"] = field(default="none", repr=False) | ||
|
|
||
| def __post_init__(self) -> None: | ||
| self._current_agent_ref = weakref.ref(self.current_agent) | ||
|
|
||
| @property | ||
| def last_agent(self) -> Agent[Any]: | ||
| """The last agent that was run. Updates as the agent run progresses, so the true last agent | ||
| is only available after the agent run is complete. | ||
| """ | ||
| return self.current_agent | ||
| agent = cast("Agent[Any] | None", self.__dict__.get("current_agent")) | ||
| if agent is not None: | ||
| return agent | ||
| if self._current_agent_ref: | ||
| agent = self._current_agent_ref() | ||
| if agent is not None: | ||
| return agent | ||
| raise AgentsException("Last agent reference is no longer available.") | ||
|
|
||
| def _release_last_agent_reference(self) -> None: | ||
| agent = cast("Agent[Any] | None", self.__dict__.get("current_agent")) | ||
| if agent is None: | ||
| return | ||
| self._current_agent_ref = weakref.ref(agent) | ||
| object.__delattr__(self, "current_agent") | ||
|
|
||
| def cancel(self, mode: Literal["immediate", "after_turn"] = "immediate") -> None: | ||
| """Cancel the streaming run. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import gc | ||
| import weakref | ||
|
|
||
| import pytest | ||
| from openai.types.responses import ResponseOutputMessage, ResponseOutputText | ||
|
|
||
| from agents import Agent, Runner | ||
| from tests.fake_model import FakeModel | ||
|
|
||
|
|
||
| def _make_message(text: str) -> ResponseOutputMessage: | ||
| return ResponseOutputMessage( | ||
| id="msg-1", | ||
| content=[ResponseOutputText(annotations=[], text=text, type="output_text")], | ||
| role="assistant", | ||
| status="completed", | ||
| type="message", | ||
| ) | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_agent_is_released_after_run() -> None: | ||
| fake_model = FakeModel(initial_output=[_make_message("Paris")]) | ||
| agent = Agent(name="leak-test-agent", instructions="Answer questions.", model=fake_model) | ||
| agent_ref = weakref.ref(agent) | ||
|
|
||
| # Running the agent should not leave behind strong references once the result goes out of scope. | ||
| await Runner.run(agent, "What is the capital of France?") | ||
|
|
||
| del agent | ||
| gc.collect() | ||
|
|
||
| assert agent_ref() is None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In
RunItemBase.release_agent()(lines 103‑109) the code removes the dataclass field entirely viaobject.__delattr__(self, "agent")after storing the weakref. Deleting a dataclass field violates the assumptions of the generated__repr__,__eq__,dataclasses.asdict, etc., which immediately raiseAttributeErroronce the field disappears. BecauseRunResult.release_agents()now calls this helper automatically (and tests encourage users to call it manually), any instrumentation that logs or serializes a RunItem after releasing agents will start crashing—e.g.repr(item)ordataclasses.asdict(item)now fail even though release was supposed to be a benign cleanup step. Setting the field toNone(or keeping a lightweight struct with the metadata) would drop the strong reference without breaking dataclass behavior; please add coverage for repr/asdict after release so this regression is caught.Useful? React with 👍 / 👎.