diff --git a/scripts/populate_tox/tox.jinja b/scripts/populate_tox/tox.jinja index 4c3b86af81..d46015d821 100644 --- a/scripts/populate_tox/tox.jinja +++ b/scripts/populate_tox/tox.jinja @@ -239,6 +239,7 @@ deps = httpx-latest: httpx # Langchain + langchain: pytest-asyncio langchain-v0.1: openai~=1.0.0 langchain-v0.1: langchain~=0.1.11 langchain-v0.1: tiktoken~=0.6.0 diff --git a/sentry_sdk/integrations/langchain.py b/sentry_sdk/integrations/langchain.py index 7e04a740ed..36b3eef71c 100644 --- a/sentry_sdk/integrations/langchain.py +++ b/sentry_sdk/integrations/langchain.py @@ -78,6 +78,9 @@ def setup_once(): AgentExecutor.invoke = _wrap_agent_executor_invoke(AgentExecutor.invoke) AgentExecutor.stream = _wrap_agent_executor_stream(AgentExecutor.stream) + AgentExecutor.ainvoke = _wrap_agent_executor_ainvoke(AgentExecutor.ainvoke) + AgentExecutor.astream = _wrap_agent_executor_astream(AgentExecutor.astream) + class WatchedSpan: span = None # type: Span @@ -768,3 +771,138 @@ async def new_iterator_async(): return result return new_stream + + +def _wrap_agent_executor_ainvoke(f): + # type: (Callable[..., Any]) -> Callable[..., Any] + + @wraps(f) + async def new_ainvoke(self, *args, **kwargs): + # type: (Any, Any, Any) -> Any + integration = sentry_sdk.get_client().get_integration(LangchainIntegration) + if integration is None: + return await f(self, *args, **kwargs) + + agent_name, tools = _get_request_data(self, args, kwargs) + + with sentry_sdk.start_span( + op=OP.GEN_AI_INVOKE_AGENT, + name=f"invoke_agent {agent_name}" if agent_name else "invoke_agent", + origin=LangchainIntegration.origin, + ) as span: + if agent_name: + span.set_data(SPANDATA.GEN_AI_AGENT_NAME, agent_name) + + span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, False) + + if tools: + set_data_normalized( + span, SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, tools, unpack=False + ) + + # Run the agent + result = await f(self, *args, **kwargs) + + input = result.get("input") + if ( + input is not None + and should_send_default_pii() + and integration.include_prompts + ): + set_data_normalized( + span, + SPANDATA.GEN_AI_REQUEST_MESSAGES, + [ + input, + ], + ) + + output = result.get("output") + if ( + output is not None + and should_send_default_pii() + and integration.include_prompts + ): + span.set_data(SPANDATA.GEN_AI_RESPONSE_TEXT, output) + + return result + + new_ainvoke.__wrapped__ = True + return new_ainvoke + + +def _wrap_agent_executor_astream(f): + # type: (Callable[..., Any]) -> Callable[..., Any] + + @wraps(f) + def new_astream(self, *args, **kwargs): + # type: (Any, Any, Any) -> Any + integration = sentry_sdk.get_client().get_integration(LangchainIntegration) + if integration is None: + return f(self, *args, **kwargs) + + agent_name, tools = _get_request_data(self, args, kwargs) + + span = sentry_sdk.start_span( + op=OP.GEN_AI_INVOKE_AGENT, + name=f"invoke_agent {agent_name}".strip(), + origin=LangchainIntegration.origin, + ) + span.__enter__() + + if agent_name: + span.set_data(SPANDATA.GEN_AI_AGENT_NAME, agent_name) + + span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) + + if tools: + set_data_normalized( + span, SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, tools, unpack=False + ) + + input = args[0].get("input") if len(args) >= 1 else None + if ( + input is not None + and should_send_default_pii() + and integration.include_prompts + ): + set_data_normalized( + span, + SPANDATA.GEN_AI_REQUEST_MESSAGES, + [ + input, + ], + ) + + # Run the agent - this returns an async iterator + result = f(self, *args, **kwargs) + + old_iterator = result + + async def new_iterator_async(): + # type: () -> AsyncIterator[Any] + event = None + try: + async for event in old_iterator: + yield event + finally: + try: + output = event.get("output") if event else None + except Exception: + output = None + + if ( + output is not None + and should_send_default_pii() + and integration.include_prompts + ): + span.set_data(SPANDATA.GEN_AI_RESPONSE_TEXT, output) + + span.__exit__(None, None, None) + + return new_iterator_async() + + new_astream.__wrapped__ = True + return new_astream diff --git a/tests/integrations/langchain/test_langchain.py b/tests/integrations/langchain/test_langchain.py index 9a06ac05d4..d37990ecdc 100644 --- a/tests/integrations/langchain/test_langchain.py +++ b/tests/integrations/langchain/test_langchain.py @@ -589,3 +589,334 @@ def test_langchain_callback_list_existing_callback(sentry_init): [handler] = passed_callbacks assert handler is sentry_callback + + +@pytest.mark.asyncio +@pytest.mark.xfail +@pytest.mark.parametrize( + "send_default_pii, include_prompts, use_unknown_llm_type", + [ + (True, True, False), + (True, False, False), + (False, True, False), + (False, False, True), + ], +) +async def test_langchain_agent_ainvoke( + sentry_init, capture_events, send_default_pii, include_prompts, use_unknown_llm_type +): + global llm_type + llm_type = "acme-llm" if use_unknown_llm_type else "openai-chat" + + sentry_init( + integrations=[ + LangchainIntegration( + include_prompts=include_prompts, + ) + ], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + "You are very powerful assistant, but don't know current events", + ), + ("user", "{input}"), + MessagesPlaceholder(variable_name="agent_scratchpad"), + ] + ) + global stream_result_mock + stream_result_mock = Mock( + return_value=[ + ChatGenerationChunk( + type="ChatGenerationChunk", + message=AIMessageChunk( + content="The word eudca has 5 letters.", + usage_metadata={ + "input_tokens": 89, + "output_tokens": 28, + "total_tokens": 117, + "input_token_details": {"audio": 0, "cache_read": 0}, + "output_token_details": {"audio": 0, "reasoning": 0}, + }, + ), + ), + ] + ) + + class AsyncMockOpenAI(MockOpenAI): + async def _agenerate(self, messages, stop=None, run_manager=None, **kwargs): + from langchain_core.outputs import ChatResult, ChatGeneration + + return ChatResult( + generations=[ + [ + ChatGeneration( + message=AIMessageChunk( + content="The word eudca has 5 letters.", + usage_metadata={ + "input_tokens": 89, + "output_tokens": 28, + "total_tokens": 117, + "input_token_details": { + "audio": 0, + "cache_read": 0, + }, + "output_token_details": { + "audio": 0, + "reasoning": 0, + }, + }, + ) + ) + ] + ] + ) + + llm = AsyncMockOpenAI( + model_name="gpt-3.5-turbo", + temperature=0, + openai_api_key="badkey", + ) + agent = create_openai_tools_agent(llm, [get_word_length], prompt) + + agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) + + with start_transaction(): + result = await agent_executor.ainvoke( + {"input": "How many letters in the word eudca"} + ) + + assert result is not None + tx = events[0] + assert tx["type"] == "transaction" + + # Find the agent invoke span + agent_spans = list(x for x in tx["spans"] if x["op"] == "gen_ai.invoke_agent") + assert len(agent_spans) == 1 + + agent_span = agent_spans[0] + assert agent_span["data"]["gen_ai.operation_name"] == "invoke_agent" + assert not agent_span["data"]["gen_ai.response_streaming"] + + if send_default_pii and include_prompts: + assert SPANDATA.GEN_AI_REQUEST_MESSAGES in agent_span["data"] + assert agent_span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] == [ + "How many letters in the word eudca" + ] + assert SPANDATA.GEN_AI_RESPONSE_TEXT in agent_span["data"] + else: + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in agent_span.get("data", {}) + assert SPANDATA.GEN_AI_RESPONSE_TEXT not in agent_span.get("data", {}) + + +@pytest.mark.asyncio +@pytest.mark.xfail +@pytest.mark.parametrize( + "send_default_pii, include_prompts, use_unknown_llm_type", + [ + (True, True, False), + (True, False, False), + (False, True, False), + (False, False, True), + ], +) +async def test_langchain_agent_astream( + sentry_init, capture_events, send_default_pii, include_prompts, use_unknown_llm_type +): + global llm_type + llm_type = "acme-llm" if use_unknown_llm_type else "openai-chat" + + sentry_init( + integrations=[ + LangchainIntegration( + include_prompts=include_prompts, + ) + ], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + "You are very powerful assistant, but don't know current events", + ), + ("user", "{input}"), + MessagesPlaceholder(variable_name="agent_scratchpad"), + ] + ) + global stream_result_mock + stream_result_mock = Mock( + side_effect=[ + [ + ChatGenerationChunk( + type="ChatGenerationChunk", + message=AIMessageChunk( + content="", + additional_kwargs={ + "tool_calls": [ + { + "index": 0, + "id": "call_BbeyNhCKa6kYLYzrD40NGm3b", + "function": { + "arguments": '{"word": "eudca"}', + "name": "get_word_length", + }, + "type": "function", + } + ] + }, + ), + ), + ], + [ + ChatGenerationChunk( + text="The word eudca has 5 letters.", + type="ChatGenerationChunk", + message=AIMessageChunk( + content="The word eudca has 5 letters.", + usage_metadata={ + "input_tokens": 89, + "output_tokens": 28, + "total_tokens": 117, + "input_token_details": {"audio": 0, "cache_read": 0}, + "output_token_details": {"audio": 0, "reasoning": 0}, + }, + ), + ), + ], + ] + ) + + class AsyncMockOpenAI(MockOpenAI): + async def _astream(self, messages, stop=None, run_manager=None, **kwargs): + for x in stream_result_mock(): + yield x + + llm = AsyncMockOpenAI( + model_name="gpt-3.5-turbo", + temperature=0, + openai_api_key="badkey", + ) + agent = create_openai_tools_agent(llm, [get_word_length], prompt) + + agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) + + with start_transaction(): + events_collected = [] + async for event in agent_executor.astream( + {"input": "How many letters in the word eudca"} + ): + events_collected.append(event) + + assert len(events_collected) > 0 + tx = events[0] + assert tx["type"] == "transaction" + + # Find the agent invoke span + agent_spans = list(x for x in tx["spans"] if x["op"] == "gen_ai.invoke_agent") + assert len(agent_spans) == 1 + + agent_span = agent_spans[0] + assert agent_span["data"]["gen_ai.operation_name"] == "invoke_agent" + assert agent_span["data"]["gen_ai.response_streaming"] + + if send_default_pii and include_prompts: + assert SPANDATA.GEN_AI_REQUEST_MESSAGES in agent_span["data"] + assert agent_span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] == [ + "How many letters in the word eudca" + ] + assert SPANDATA.GEN_AI_RESPONSE_TEXT in agent_span["data"] + else: + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in agent_span.get("data", {}) + assert SPANDATA.GEN_AI_RESPONSE_TEXT not in agent_span.get("data", {}) + + +@pytest.mark.asyncio +async def test_langchain_ainvoke_error(sentry_init, capture_events): + sentry_init( + integrations=[LangchainIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + "You are very powerful assistant, but don't know current events", + ), + ("user", "{input}"), + MessagesPlaceholder(variable_name="agent_scratchpad"), + ] + ) + + class AsyncMockOpenAI(MockOpenAI): + async def _agenerate(self, messages, stop=None, run_manager=None, **kwargs): + raise Exception("API rate limit error") + + llm = AsyncMockOpenAI( + model_name="gpt-3.5-turbo", + temperature=0, + openai_api_key="badkey", + ) + agent = create_openai_tools_agent(llm, [get_word_length], prompt) + + agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) + + with start_transaction(), pytest.raises(Exception): + await agent_executor.ainvoke({"input": "How many letters in the word eudca"}) + + # Should have captured the error + assert len(events) > 0 + + +@pytest.mark.asyncio +async def test_langchain_astream_error(sentry_init, capture_events): + sentry_init( + integrations=[LangchainIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + "You are very powerful assistant, but don't know current events", + ), + ("user", "{input}"), + MessagesPlaceholder(variable_name="agent_scratchpad"), + ] + ) + + class AsyncMockOpenAI(MockOpenAI): + async def _astream(self, messages, stop=None, run_manager=None, **kwargs): + raise Exception("API rate limit error") + + llm = AsyncMockOpenAI( + model_name="gpt-3.5-turbo", + temperature=0, + openai_api_key="badkey", + ) + agent = create_openai_tools_agent(llm, [get_word_length], prompt) + + agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) + + with start_transaction(), pytest.raises(Exception): + async for event in agent_executor.astream( + {"input": "How many letters in the word eudca"} + ): + pass # Should error before yielding anything + + # Should have captured the error + assert len(events) > 0 diff --git a/tox.ini b/tox.ini index bbc1d57c12..dba653b19b 100644 --- a/tox.ini +++ b/tox.ini @@ -408,6 +408,7 @@ deps = httpx-latest: httpx # Langchain + langchain: pytest-asyncio langchain-v0.1: openai~=1.0.0 langchain-v0.1: langchain~=0.1.11 langchain-v0.1: tiktoken~=0.6.0