Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 76 additions & 26 deletions libs/core/langchain_core/tracers/root_listeners.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,52 @@
"""Tracers that call listeners."""

from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING
from collections.abc import Awaitable
from typing import TYPE_CHECKING, Callable, Optional, Union

Check failure on line 4 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.12) / Python 3.12

Ruff (UP035)

langchain_core/tracers/root_listeners.py:4:1: UP035 Import from `collections.abc` instead: `Callable`

Check failure on line 4 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.11) / Python 3.11

Ruff (UP035)

langchain_core/tracers/root_listeners.py:4:1: UP035 Import from `collections.abc` instead: `Callable`

Check failure on line 4 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.14) / Python 3.14

Ruff (UP035)

langchain_core/tracers/root_listeners.py:4:1: UP035 Import from `collections.abc` instead: `Callable`

Check failure on line 4 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.10) / Python 3.10

Ruff (UP035)

langchain_core/tracers/root_listeners.py:4:1: UP035 Import from `collections.abc` instead: `Callable`

Check failure on line 4 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.13) / Python 3.13

Ruff (UP035)

langchain_core/tracers/root_listeners.py:4:1: UP035 Import from `collections.abc` instead: `Callable`
import contextvars

from langchain_core.runnables.config import (
RunnableConfig,
acall_func_with_variable_args,
call_func_with_variable_args,
)
from langchain_core.tracers.base import AsyncBaseTracer, BaseTracer
from langchain_core.tracers.schemas import Run

Check failure on line 13 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.12) / Python 3.12

Ruff (I001)

langchain_core/tracers/root_listeners.py:3:1: I001 Import block is un-sorted or un-formatted

Check failure on line 13 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.11) / Python 3.11

Ruff (I001)

langchain_core/tracers/root_listeners.py:3:1: I001 Import block is un-sorted or un-formatted

Check failure on line 13 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.14) / Python 3.14

Ruff (I001)

langchain_core/tracers/root_listeners.py:3:1: I001 Import block is un-sorted or un-formatted

Check failure on line 13 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.10) / Python 3.10

Ruff (I001)

langchain_core/tracers/root_listeners.py:3:1: I001 Import block is un-sorted or un-formatted

Check failure on line 13 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.13) / Python 3.13

Ruff (I001)

langchain_core/tracers/root_listeners.py:3:1: I001 Import block is un-sorted or un-formatted

# Re-entrancy guard to prevent listeners from triggering nested listener calls
_IN_LISTENER: contextvars.ContextVar[bool] = contextvars.ContextVar(
"root_listeners_in_listener", default=False
)

if TYPE_CHECKING:
from uuid import UUID

Listener = Callable[[Run], None] | Callable[[Run, RunnableConfig], None]
AsyncListener = (
Callable[[Run], Awaitable[None]] | Callable[[Run, RunnableConfig], Awaitable[None]]
)
Listener = Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]

Check failure on line 23 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.12) / Python 3.12

Ruff (UP007)

langchain_core/tracers/root_listeners.py:23:12: UP007 Use `X | Y` for type annotations

Check failure on line 23 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.11) / Python 3.11

Ruff (UP007)

langchain_core/tracers/root_listeners.py:23:12: UP007 Use `X | Y` for type annotations

Check failure on line 23 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.14) / Python 3.14

Ruff (UP007)

langchain_core/tracers/root_listeners.py:23:12: UP007 Use `X | Y` for type annotations

Check failure on line 23 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.10) / Python 3.10

Ruff (UP007)

langchain_core/tracers/root_listeners.py:23:12: UP007 Use `X | Y` for type annotations

Check failure on line 23 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.13) / Python 3.13

Ruff (UP007)

langchain_core/tracers/root_listeners.py:23:12: UP007 Use `X | Y` for type annotations
AsyncListener = Union[
Callable[[Run], Awaitable[None]], Callable[[Run, RunnableConfig], Awaitable[None]]
]

Check failure on line 26 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.12) / Python 3.12

Ruff (UP007)

langchain_core/tracers/root_listeners.py:24:17: UP007 Use `X | Y` for type annotations

Check failure on line 26 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.11) / Python 3.11

Ruff (UP007)

langchain_core/tracers/root_listeners.py:24:17: UP007 Use `X | Y` for type annotations

Check failure on line 26 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.14) / Python 3.14

Ruff (UP007)

langchain_core/tracers/root_listeners.py:24:17: UP007 Use `X | Y` for type annotations

Check failure on line 26 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.10) / Python 3.10

Ruff (UP007)

langchain_core/tracers/root_listeners.py:24:17: UP007 Use `X | Y` for type annotations

Check failure on line 26 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.13) / Python 3.13

Ruff (UP007)

langchain_core/tracers/root_listeners.py:24:17: UP007 Use `X | Y` for type annotations


class RootListenersTracer(BaseTracer):
"""Tracer that calls listeners on run start, end, and error."""
"""Tracer that calls listeners on run start, end, and error.

Parameters:
log_missing_parent: Whether to log a warning if the parent is missing.
Default is False.
config: The runnable config.
on_start: The listener to call on run start.
on_end: The listener to call on run end.
on_error: The listener to call on run error.
"""

log_missing_parent = False
"""Whether to log a warning if the parent is missing."""

def __init__(
self,
*,
config: RunnableConfig,
on_start: Listener | None,
on_end: Listener | None,
on_error: Listener | None,
on_start: Optional[Listener],

Check failure on line 47 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.12) / Python 3.12

Ruff (UP045)

langchain_core/tracers/root_listeners.py:47:19: UP045 Use `X | None` for type annotations

Check failure on line 47 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.11) / Python 3.11

Ruff (UP045)

langchain_core/tracers/root_listeners.py:47:19: UP045 Use `X | None` for type annotations

Check failure on line 47 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.14) / Python 3.14

Ruff (UP045)

langchain_core/tracers/root_listeners.py:47:19: UP045 Use `X | None` for type annotations

Check failure on line 47 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.10) / Python 3.10

Ruff (UP045)

langchain_core/tracers/root_listeners.py:47:19: UP045 Use `X | None` for type annotations

Check failure on line 47 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.13) / Python 3.13

Ruff (UP045)

langchain_core/tracers/root_listeners.py:47:19: UP045 Use `X | None` for type annotations
on_end: Optional[Listener],

Check failure on line 48 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.12) / Python 3.12

Ruff (UP045)

langchain_core/tracers/root_listeners.py:48:17: UP045 Use `X | None` for type annotations

Check failure on line 48 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.11) / Python 3.11

Ruff (UP045)

langchain_core/tracers/root_listeners.py:48:17: UP045 Use `X | None` for type annotations

Check failure on line 48 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.14) / Python 3.14

Ruff (UP045)

langchain_core/tracers/root_listeners.py:48:17: UP045 Use `X | None` for type annotations

Check failure on line 48 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.10) / Python 3.10

Ruff (UP045)

langchain_core/tracers/root_listeners.py:48:17: UP045 Use `X | None` for type annotations

Check failure on line 48 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.13) / Python 3.13

Ruff (UP045)

langchain_core/tracers/root_listeners.py:48:17: UP045 Use `X | None` for type annotations
on_error: Optional[Listener],

Check failure on line 49 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.12) / Python 3.12

Ruff (UP045)

langchain_core/tracers/root_listeners.py:49:19: UP045 Use `X | None` for type annotations

Check failure on line 49 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.11) / Python 3.11

Ruff (UP045)

langchain_core/tracers/root_listeners.py:49:19: UP045 Use `X | None` for type annotations

Check failure on line 49 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.14) / Python 3.14

Ruff (UP045)

langchain_core/tracers/root_listeners.py:49:19: UP045 Use `X | None` for type annotations

Check failure on line 49 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.10) / Python 3.10

Ruff (UP045)

langchain_core/tracers/root_listeners.py:49:19: UP045 Use `X | None` for type annotations

Check failure on line 49 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.13) / Python 3.13

Ruff (UP045)

langchain_core/tracers/root_listeners.py:49:19: UP045 Use `X | None` for type annotations
) -> None:
"""Initialize the tracer.

Expand All @@ -48,7 +62,7 @@
self._arg_on_start = on_start
self._arg_on_end = on_end
self._arg_on_error = on_error
self.root_id: UUID | None = None
self.root_id: Optional[UUID] = None

Check failure on line 65 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.12) / Python 3.12

Ruff (UP045)

langchain_core/tracers/root_listeners.py:65:23: UP045 Use `X | None` for type annotations

Check failure on line 65 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.11) / Python 3.11

Ruff (UP045)

langchain_core/tracers/root_listeners.py:65:23: UP045 Use `X | None` for type annotations

Check failure on line 65 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.14) / Python 3.14

Ruff (UP045)

langchain_core/tracers/root_listeners.py:65:23: UP045 Use `X | None` for type annotations

Check failure on line 65 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.10) / Python 3.10

Ruff (UP045)

langchain_core/tracers/root_listeners.py:65:23: UP045 Use `X | None` for type annotations

Check failure on line 65 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.13) / Python 3.13

Ruff (UP045)

langchain_core/tracers/root_listeners.py:65:23: UP045 Use `X | None` for type annotations

def _persist_run(self, run: Run) -> None:
# This is a legacy method only called once for an entire run tree
Expand All @@ -61,33 +75,55 @@

self.root_id = run.id

if self._arg_on_start is not None:
call_func_with_variable_args(self._arg_on_start, run, self.config)
if self._arg_on_start is not None and not _IN_LISTENER.get():
token = _IN_LISTENER.set(True)
try:
call_func_with_variable_args(self._arg_on_start, run, self.config)
finally:
_IN_LISTENER.reset(token)

def _on_run_update(self, run: Run) -> None:
if run.id != self.root_id:
return

if _IN_LISTENER.get():
return
if run.error is None:
if self._arg_on_end is not None:
call_func_with_variable_args(self._arg_on_end, run, self.config)
token = _IN_LISTENER.set(True)
try:
call_func_with_variable_args(self._arg_on_end, run, self.config)
finally:
_IN_LISTENER.reset(token)
elif self._arg_on_error is not None:
call_func_with_variable_args(self._arg_on_error, run, self.config)
token = _IN_LISTENER.set(True)
try:
call_func_with_variable_args(self._arg_on_error, run, self.config)
finally:
_IN_LISTENER.reset(token)


class AsyncRootListenersTracer(AsyncBaseTracer):
"""Async Tracer that calls listeners on run start, end, and error."""
"""Async Tracer that calls listeners on run start, end, and error.

Parameters:
log_missing_parent: Whether to log a warning if the parent is missing.
Default is False.
config: The runnable config.
on_start: The listener to call on run start.
on_end: The listener to call on run end.
on_error: The listener to call on run error.
"""

log_missing_parent = False
"""Whether to log a warning if the parent is missing."""

def __init__(
self,
*,
config: RunnableConfig,
on_start: AsyncListener | None,
on_end: AsyncListener | None,
on_error: AsyncListener | None,
on_start: Optional[AsyncListener],

Check failure on line 124 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.12) / Python 3.12

Ruff (UP045)

langchain_core/tracers/root_listeners.py:124:19: UP045 Use `X | None` for type annotations

Check failure on line 124 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.11) / Python 3.11

Ruff (UP045)

langchain_core/tracers/root_listeners.py:124:19: UP045 Use `X | None` for type annotations

Check failure on line 124 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.14) / Python 3.14

Ruff (UP045)

langchain_core/tracers/root_listeners.py:124:19: UP045 Use `X | None` for type annotations

Check failure on line 124 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.10) / Python 3.10

Ruff (UP045)

langchain_core/tracers/root_listeners.py:124:19: UP045 Use `X | None` for type annotations

Check failure on line 124 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.13) / Python 3.13

Ruff (UP045)

langchain_core/tracers/root_listeners.py:124:19: UP045 Use `X | None` for type annotations
on_end: Optional[AsyncListener],

Check failure on line 125 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.12) / Python 3.12

Ruff (UP045)

langchain_core/tracers/root_listeners.py:125:17: UP045 Use `X | None` for type annotations

Check failure on line 125 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.11) / Python 3.11

Ruff (UP045)

langchain_core/tracers/root_listeners.py:125:17: UP045 Use `X | None` for type annotations

Check failure on line 125 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.14) / Python 3.14

Ruff (UP045)

langchain_core/tracers/root_listeners.py:125:17: UP045 Use `X | None` for type annotations

Check failure on line 125 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.10) / Python 3.10

Ruff (UP045)

langchain_core/tracers/root_listeners.py:125:17: UP045 Use `X | None` for type annotations

Check failure on line 125 in libs/core/langchain_core/tracers/root_listeners.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.13) / Python 3.13

Ruff (UP045)

langchain_core/tracers/root_listeners.py:125:17: UP045 Use `X | None` for type annotations
on_error: Optional[AsyncListener],
) -> None:
"""Initialize the tracer.

Expand All @@ -103,7 +139,7 @@
self._arg_on_start = on_start
self._arg_on_end = on_end
self._arg_on_error = on_error
self.root_id: UUID | None = None
self.root_id: Optional[UUID] = None

async def _persist_run(self, run: Run) -> None:
# This is a legacy method only called once for an entire run tree
Expand All @@ -116,15 +152,29 @@

self.root_id = run.id

if self._arg_on_start is not None:
await acall_func_with_variable_args(self._arg_on_start, run, self.config)
if self._arg_on_start is not None and not _IN_LISTENER.get():
token = _IN_LISTENER.set(True)
try:
await acall_func_with_variable_args(self._arg_on_start, run, self.config)
finally:
_IN_LISTENER.reset(token)

async def _on_run_update(self, run: Run) -> None:
if run.id != self.root_id:
return

if _IN_LISTENER.get():
return
if run.error is None:
if self._arg_on_end is not None:
await acall_func_with_variable_args(self._arg_on_end, run, self.config)
token = _IN_LISTENER.set(True)
try:
await acall_func_with_variable_args(self._arg_on_end, run, self.config)
finally:
_IN_LISTENER.reset(token)
elif self._arg_on_error is not None:
await acall_func_with_variable_args(self._arg_on_error, run, self.config)
token = _IN_LISTENER.set(True)
try:
await acall_func_with_variable_args(self._arg_on_error, run, self.config)
finally:
_IN_LISTENER.reset(token)
Loading