Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scripts/populate_tox/tox.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ deps =
httpx-latest: httpx

# Langchain
langchain: pytest-asyncio
langchain-v0.1: openai~=1.0.0
langchain-v0.1: langchain~=0.1.11
langchain-v0.1: tiktoken~=0.6.0
Expand Down
138 changes: 138 additions & 0 deletions sentry_sdk/integrations/langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ def setup_once():
AgentExecutor.invoke = _wrap_agent_executor_invoke(AgentExecutor.invoke)
AgentExecutor.stream = _wrap_agent_executor_stream(AgentExecutor.stream)

AgentExecutor.ainvoke = _wrap_agent_executor_ainvoke(AgentExecutor.ainvoke)
AgentExecutor.astream = _wrap_agent_executor_astream(AgentExecutor.astream)


class WatchedSpan:
span = None # type: Span
Expand Down Expand Up @@ -768,3 +771,138 @@ async def new_iterator_async():
return result

return new_stream


def _wrap_agent_executor_ainvoke(f):
# type: (Callable[..., Any]) -> Callable[..., Any]

@wraps(f)
async def new_ainvoke(self, *args, **kwargs):
# type: (Any, Any, Any) -> Any
integration = sentry_sdk.get_client().get_integration(LangchainIntegration)
if integration is None:
return await f(self, *args, **kwargs)

agent_name, tools = _get_request_data(self, args, kwargs)

with sentry_sdk.start_span(
op=OP.GEN_AI_INVOKE_AGENT,
name=f"invoke_agent {agent_name}" if agent_name else "invoke_agent",
origin=LangchainIntegration.origin,
) as span:
if agent_name:
span.set_data(SPANDATA.GEN_AI_AGENT_NAME, agent_name)

span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent")
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, False)

if tools:
set_data_normalized(
span, SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, tools, unpack=False
)

# Run the agent
result = await f(self, *args, **kwargs)

input = result.get("input")
if (
input is not None
and should_send_default_pii()
and integration.include_prompts
):
set_data_normalized(
span,
SPANDATA.GEN_AI_REQUEST_MESSAGES,
[
input,
],
)

output = result.get("output")
if (
output is not None
and should_send_default_pii()
and integration.include_prompts
):
span.set_data(SPANDATA.GEN_AI_RESPONSE_TEXT, output)

return result

new_ainvoke.__wrapped__ = True
return new_ainvoke


def _wrap_agent_executor_astream(f):
# type: (Callable[..., Any]) -> Callable[..., Any]

@wraps(f)
def new_astream(self, *args, **kwargs):
# type: (Any, Any, Any) -> Any
integration = sentry_sdk.get_client().get_integration(LangchainIntegration)
if integration is None:
return f(self, *args, **kwargs)

agent_name, tools = _get_request_data(self, args, kwargs)

span = sentry_sdk.start_span(
op=OP.GEN_AI_INVOKE_AGENT,
name=f"invoke_agent {agent_name}".strip(),
origin=LangchainIntegration.origin,
)
span.__enter__()

if agent_name:
span.set_data(SPANDATA.GEN_AI_AGENT_NAME, agent_name)

span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent")
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)

if tools:
set_data_normalized(
span, SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, tools, unpack=False
)

input = args[0].get("input") if len(args) >= 1 else None
if (
input is not None
and should_send_default_pii()
and integration.include_prompts
):
set_data_normalized(
span,
SPANDATA.GEN_AI_REQUEST_MESSAGES,
[
input,
],
)

# Run the agent - this returns an async iterator
result = f(self, *args, **kwargs)

old_iterator = result

async def new_iterator_async():
# type: () -> AsyncIterator[Any]
event = None
try:
async for event in old_iterator:
yield event
finally:
try:
output = event.get("output") if event else None
except Exception:
output = None

if (
output is not None
and should_send_default_pii()
and integration.include_prompts
):
span.set_data(SPANDATA.GEN_AI_RESPONSE_TEXT, output)

span.__exit__(None, None, None)

return new_iterator_async()

new_astream.__wrapped__ = True
return new_astream
Loading
Loading