From 67021d4853de4ea653426f44e236855cd87fc8fc Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 2 Dec 2025 13:22:07 -0300 Subject: [PATCH] feat(CLOSES open-8261): enhance Google ADK tracing with callback support - Add token usage capture (prompt_tokens, completion_tokens, total_tokens) - Support all 6 ADK callbacks (before/after agent, model, tool) - Enable Google Cloud Trace coexistence (ADK OTel remains active by default) - Fix callback hierarchy for correct chronological order - Add recursive step sorting by start_time - Update example notebook with comprehensive callback demo --- .../google-adk/google_adk_tracing.ipynb | 328 ++++++- src/openlayer/lib/__init__.py | 22 +- .../lib/integrations/google_adk_tracer.py | 877 ++++++++++++++++-- 3 files changed, 1142 insertions(+), 85 deletions(-) diff --git a/examples/tracing/google-adk/google_adk_tracing.ipynb b/examples/tracing/google-adk/google_adk_tracing.ipynb index 4f57252c..00eb6871 100644 --- a/examples/tracing/google-adk/google_adk_tracing.ipynb +++ b/examples/tracing/google-adk/google_adk_tracing.ipynb @@ -10,6 +10,13 @@ "\n", "This notebook demonstrates how to trace Google Agent Development Kit (ADK) agents with Openlayer.\n", "\n", + "## Features\n", + "\n", + "- **Full Agent Tracing**: Capture agent execution, LLM calls, and tool usage\n", + "- **Token Usage Tracking**: Automatically captures prompt, completion, and total tokens\n", + "- **All 6 ADK Callbacks**: Trace before_agent, after_agent, before_model, after_model, before_tool, after_tool\n", + "- **Google Cloud Coexistence**: Use both Google Cloud telemetry (Cloud Trace) AND Openlayer simultaneously\n", + "\n", "## Prerequisites\n", "\n", "Install the required packages:\n", @@ -18,6 +25,15 @@ "```\n" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!pip install google-adk wrapt" + ] + }, { "cell_type": "markdown", "metadata": {}, @@ -36,12 +52,12 @@ "import os\n", "\n", "# Openlayer configuration\n", - "os.environ[\"OPENLAYER_API_KEY\"] = \"your-api-key-here\"\n", - "os.environ[\"OPENLAYER_INFERENCE_PIPELINE_ID\"] = \"your-pipeline-id-here\"\n", + "os.environ[\"OPENLAYER_API_KEY\"] = \"your-api-key\"\n", + "os.environ[\"OPENLAYER_INFERENCE_PIPELINE_ID\"] = \"your-pipeline-id\"\n", "\n", "# Google AI API configuration (Option 1: Using Google AI Studio)\n", "# Get your API key from: https://aistudio.google.com/apikey\n", - "os.environ[\"GOOGLE_API_KEY\"] = \"your-google-ai-api-key-here\"\n", + "os.environ[\"GOOGLE_API_KEY\"] = \"your-google-api-key\"\n", "\n", "# Google Cloud Vertex AI configuration (Option 2: Using Google Cloud)\n", "# Uncomment these if you're using Vertex AI instead of Google AI\n", @@ -56,7 +72,9 @@ "source": [ "## Enable Google ADK Tracing\n", "\n", - "Enable tracing before creating any agents. This patches Google ADK globally to send traces to Openlayer:\n" + "Enable tracing before creating any agents. This patches Google ADK globally to send traces to Openlayer.\n", + "\n", + "**Note:** By default, ADK's built-in OpenTelemetry tracing remains active, allowing you to send data to both Google Cloud (Cloud Trace, Cloud Monitoring) AND Openlayer. If you only want Openlayer, use `trace_google_adk(disable_adk_otel=True)`.\n" ] }, { @@ -102,7 +120,7 @@ "\n", "# Create a basic agent\n", "agent = LlmAgent(\n", - " model=\"gemini-2.0-flash-exp\",\n", + " model=\"gemini-2.5-flash\",\n", " name=\"Assistant\",\n", " instruction=\"You are a helpful assistant. Provide concise and accurate responses.\"\n", ")\n", @@ -190,7 +208,7 @@ "\n", "# Create agent with tools (pass functions directly)\n", "tool_agent = LlmAgent(\n", - " model=\"gemini-2.0-flash-exp\",\n", + " model=\"gemini-2.5-flash\",\n", " name=\"ToolAgent\",\n", " instruction=\"You are a helpful assistant with access to weather and calculation tools. Use them when appropriate.\",\n", " tools=[get_weather, calculate]\n", @@ -229,6 +247,273 @@ "await run_tool_agent()\n" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Example 3: Agent with All 6 Callbacks\n", + "\n", + "Google ADK supports 6 types of callbacks that allow you to observe, customize, and control agent behavior. Openlayer automatically traces all of them:\n", + "\n", + "| Callback | Description | When Called |\n", + "|----------|-------------|-------------|\n", + "| `before_agent_callback` | Agent pre-processing | Before the agent starts its main work |\n", + "| `after_agent_callback` | Agent post-processing | After the agent finishes all its steps |\n", + "| `before_model_callback` | LLM pre-call | Before sending a request to the LLM |\n", + "| `after_model_callback` | LLM post-call | After receiving a response from the LLM |\n", + "| `before_tool_callback` | Tool pre-execution | Before executing a tool |\n", + "| `after_tool_callback` | Tool post-execution | After a tool finishes |\n", + "\n", + "Reference: https://google.github.io/adk-docs/callbacks/\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from typing import Any, Dict, Optional\n", + "\n", + "from google.adk.tools import ToolContext\n", + "from google.adk.models import LlmRequest, LlmResponse\n", + "from google.adk.tools.base_tool import BaseTool\n", + "from google.adk.agents.callback_context import CallbackContext\n", + "\n", + "# ============================================================================\n", + "# Define all 6 callback functions\n", + "# ============================================================================\n", + "\n", + "# 1. Before Agent Callback\n", + "# Called before the agent starts processing a request\n", + "def before_agent_callback(callback_context: CallbackContext) -> Optional[Any]:\n", + " \"\"\"\n", + " Called before the agent starts its main work.\n", + " \n", + " Use cases:\n", + " - Input validation\n", + " - Session initialization\n", + " - Logging request start\n", + " - Adding default context\n", + " \"\"\"\n", + " print(f\"[before_agent] Agent '{callback_context.agent_name}' starting\") # noqa: T201\n", + " print(f\"[before_agent] Invocation ID: {callback_context.invocation_id}\") # noqa: T201\n", + " # Return None to allow the agent to proceed normally\n", + " # Return a Content object to skip the agent and return that content directly\n", + " return None\n", + "\n", + "\n", + "# 2. After Agent Callback\n", + "# Called after the agent finishes processing\n", + "def after_agent_callback(callback_context: CallbackContext) -> Optional[Any]:\n", + " \"\"\"\n", + " Called after the agent has finished all its steps.\n", + " \n", + " Use cases:\n", + " - Response post-processing\n", + " - Logging request completion\n", + " - Cleanup operations\n", + " - Analytics\n", + " \"\"\"\n", + " print(f\"[after_agent] Agent '{callback_context.agent_name}' finished\") # noqa: T201\n", + " # Return None to use the agent's original response\n", + " # Return a Content object to replace the agent's response\n", + " return None\n", + "\n", + "\n", + "# 3. Before Model Callback\n", + "# Called before each LLM call\n", + "def before_model_callback(\n", + " _callback_context: CallbackContext, \n", + " llm_request: LlmRequest\n", + ") -> Optional[LlmResponse]:\n", + " \"\"\"\n", + " Called before sending a request to the LLM.\n", + " \n", + " Use cases:\n", + " - Request modification (add system prompts)\n", + " - Content filtering / guardrails\n", + " - Caching (return cached response)\n", + " - Rate limiting\n", + " \"\"\"\n", + " print(f\"[before_model] Calling model: {llm_request.model}\") # noqa: T201\n", + " print(f\"[before_model] Request has {len(llm_request.contents)} content items\") # noqa: T201\n", + " # Return None to proceed with the LLM call\n", + " # Return an LlmResponse to skip the LLM and use that response instead\n", + " return None\n", + "\n", + "\n", + "# 4. After Model Callback\n", + "# Called after receiving LLM response\n", + "def after_model_callback(\n", + " _callback_context: CallbackContext, \n", + " llm_response: LlmResponse\n", + ") -> Optional[LlmResponse]:\n", + " \"\"\"\n", + " Called after receiving a response from the LLM.\n", + " \n", + " Use cases:\n", + " - Response validation\n", + " - Content filtering\n", + " - Response transformation\n", + " - Logging/analytics\n", + " \"\"\"\n", + " print(\"[after_model] Received response from LLM\") # noqa: T201\n", + " if hasattr(llm_response, 'usage_metadata') and llm_response.usage_metadata:\n", + " print(f\"[after_model] Tokens used: {llm_response.usage_metadata.total_token_count}\") # noqa: T201\n", + " # Return None to use the original response\n", + " # Return a modified LlmResponse to replace it\n", + " return None\n", + "\n", + "\n", + "# 5. Before Tool Callback\n", + "# Called before tool execution\n", + "def before_tool_callback(\n", + " tool: BaseTool, \n", + " args: Dict[str, Any], \n", + " _tool_context: ToolContext\n", + ") -> Optional[Dict]:\n", + " \"\"\"\n", + " Called before executing a tool.\n", + " \n", + " Use cases:\n", + " - Argument validation\n", + " - Authorization checks\n", + " - Tool call logging\n", + " - Mocking tool responses for testing\n", + " \"\"\"\n", + " print(f\"[before_tool] Executing tool: {tool.name}\") # noqa: T201\n", + " print(f\"[before_tool] Arguments: {args}\") # noqa: T201\n", + " # Return None to proceed with the tool execution\n", + " # Return a dict to skip the tool and use that as the response\n", + " return None\n", + "\n", + "\n", + "# 6. After Tool Callback\n", + "# Called after tool execution\n", + "def after_tool_callback(\n", + " tool: BaseTool, \n", + " _args: Dict[str, Any], \n", + " _tool_context: ToolContext, \n", + " tool_response: Dict\n", + ") -> Optional[Dict]:\n", + " \"\"\"\n", + " Called after a tool finishes execution.\n", + " \n", + " Use cases:\n", + " - Response transformation\n", + " - Error handling\n", + " - Logging tool results\n", + " - Caching responses\n", + " \"\"\"\n", + " print(f\"[after_tool] Tool '{tool.name}' completed\") # noqa: T201\n", + " print(f\"[after_tool] Response: {tool_response}\") # noqa: T201\n", + " # Return None to use the original tool response\n", + " # Return a modified dict to replace the response\n", + " return None\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# ============================================================================\n", + "# Create agent with all callbacks\n", + "# ============================================================================\n", + "\n", + "# Define a tool for the callback agent to use\n", + "def get_current_time() -> str:\n", + " \"\"\"Returns the current time.\n", + " \n", + " Returns:\n", + " str: The current time as a formatted string.\n", + " \"\"\"\n", + " from datetime import datetime\n", + " return f\"The current time is {datetime.now().strftime('%H:%M:%S')}\"\n", + "\n", + "\n", + "# Use different session IDs for callback agent\n", + "CALLBACK_USER_ID = \"user_789\"\n", + "CALLBACK_SESSION_ID = \"session_789\"\n", + "\n", + "# Create agent with ALL 6 callbacks\n", + "callback_agent = LlmAgent(\n", + " model=\"gemini-2.5-flash\",\n", + " name=\"CallbackDemoAgent\",\n", + " instruction=\"You are a helpful assistant. Use the get_current_time tool when asked about time.\",\n", + " tools=[get_current_time],\n", + " # Register all 6 callbacks\n", + " before_agent_callback=before_agent_callback,\n", + " after_agent_callback=after_agent_callback,\n", + " before_model_callback=before_model_callback,\n", + " after_model_callback=after_model_callback,\n", + " before_tool_callback=before_tool_callback,\n", + " after_tool_callback=after_tool_callback,\n", + ")\n", + "\n", + "# Create runner for callback agent\n", + "callback_runner = Runner(\n", + " agent=callback_agent,\n", + " app_name=APP_NAME,\n", + " session_service=session_service\n", + ")\n", + "\n", + "# Define async function to run the callback agent\n", + "async def run_callback_agent():\n", + " # Create session\n", + " await session_service.create_session(\n", + " app_name=APP_NAME,\n", + " user_id=CALLBACK_USER_ID,\n", + " session_id=CALLBACK_SESSION_ID\n", + " )\n", + " \n", + " # Run the agent with a query that will trigger tool use\n", + " query = \"What time is it right now?\"\n", + " content = types.Content(role='user', parts=[types.Part(text=query)])\n", + " \n", + " # Process events and get response\n", + " async for event in callback_runner.run_async(\n", + " user_id=CALLBACK_USER_ID,\n", + " session_id=CALLBACK_SESSION_ID,\n", + " new_message=content\n", + " ):\n", + " if event.is_final_response() and event.content:\n", + " print(f\"Final Response: {event.content.parts[0].text.strip()}\") # noqa: T201\n", + "\n", + "# Run the async function\n", + "await run_callback_agent()\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### What You'll See in Openlayer\n", + "\n", + "After running the callback agent, you'll see in your Openlayer dashboard:\n", + "\n", + "1. **Agent Step** (`CallbackDemoAgent`):\n", + " - Shows which callbacks are registered\n", + " - Contains all nested steps in chronological order\n", + "\n", + "2. **All Callbacks as Siblings** (direct children of Agent):\n", + " - `Callback: before_agent` - First, before any processing\n", + " - `Callback: before_model` - Before each LLM call\n", + " - `LLM Call: gemini-2.0-flash-exp` - The actual LLM invocation\n", + " - `Callback: after_model` - After each LLM call (includes token counts)\n", + " - `Callback: before_tool` - Before tool execution\n", + " - `Tool: get_current_time` - The actual tool execution\n", + " - `Callback: after_tool` - After tool completion\n", + " - `Callback: after_agent` - Last, after all processing complete\n", + "\n", + "3. **Token Usage** (captured on LLM Call steps):\n", + " - Prompt tokens\n", + " - Completion tokens \n", + " - Total tokens\n" + ] + }, { "cell_type": "markdown", "metadata": {}, @@ -240,11 +525,30 @@ "1. Go to https://app.openlayer.com\n", "2. Navigate to your inference pipeline\n", "3. View the traces tab to see:\n", - " - Agent execution steps\n", - " - LLM calls with token counts\n", - " - Tool executions with inputs and outputs\n", - " - Latency for each operation\n", - " - Complete execution hierarchy\n" + " - **Agent execution steps** with nested hierarchy\n", + " - **LLM calls** with token counts (prompt, completion, total)\n", + " - **Tool executions** with inputs and outputs\n", + " - **All 6 callback types** traced as separate steps\n", + " - **Latency** for each operation\n", + " - **Complete execution hierarchy** showing the flow\n", + "\n", + "The traces will show the hierarchy of operations in chronological order:\n", + "```\n", + "Agent: CallbackDemoAgent\n", + "├── Callback: before_agent (CallbackDemoAgent)\n", + "├── Callback: before_model (CallbackDemoAgent)\n", + "├── LLM Call: gemini-2.0-flash-exp\n", + "├── Callback: after_model (CallbackDemoAgent)\n", + "├── Callback: before_tool (CallbackDemoAgent)\n", + "├── Tool: get_current_time\n", + "├── Callback: after_tool (CallbackDemoAgent)\n", + "├── Callback: before_model (CallbackDemoAgent)\n", + "├── LLM Call: gemini-2.0-flash-exp\n", + "├── Callback: after_model (CallbackDemoAgent)\n", + "└── Callback: after_agent (CallbackDemoAgent)\n", + "```\n", + "\n", + "**Note:** All callbacks are direct children of the Agent step, appearing in chronological order alongside LLM calls and tool executions. This provides a clear timeline view of the agent execution flow.\n" ] }, { @@ -285,7 +589,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.18" + "version": "3.12.8" } }, "nbformat": 4, diff --git a/src/openlayer/lib/__init__.py b/src/openlayer/lib/__init__.py index 0be95646..2c86a6bf 100644 --- a/src/openlayer/lib/__init__.py +++ b/src/openlayer/lib/__init__.py @@ -193,12 +193,21 @@ def trace_litellm(): # ------------------------------ Google ADK ---------------------------------- # -def trace_google_adk(): +def trace_google_adk(disable_adk_otel: bool = False): """Enable tracing for Google Agent Development Kit (ADK). This function patches Google ADK to automatically trace agent execution, LLM calls, and tool calls made through the ADK framework. + By default, ADK's built-in OpenTelemetry tracing remains active, allowing + you to send telemetry to both Google Cloud and Openlayer simultaneously. + + Args: + disable_adk_otel: If True, disables ADK's built-in OpenTelemetry tracing. + When False (default), ADK's OTel tracing works alongside Openlayer, + allowing data to be sent to both Google Cloud (Cloud Trace, etc.) + and Openlayer. Set to True only if you want Openlayer exclusively. + Requirements: Google ADK and wrapt must be installed: pip install google-adk wrapt @@ -208,17 +217,22 @@ def trace_google_adk(): >>> os.environ["OPENLAYER_API_KEY"] = "your-api-key" >>> os.environ["OPENLAYER_INFERENCE_PIPELINE_ID"] = "your-pipeline-id" >>> from openlayer.lib import trace_google_adk - >>> # Enable tracing (must be called before creating agents) + >>> + >>> # Enable tracing with both Google Cloud OTel and Openlayer (default) >>> trace_google_adk() + >>> + >>> # OR: Enable tracing with Openlayer only (disable ADK's OTel) + >>> # trace_google_adk(disable_adk_otel=True) + >>> >>> # Now create and run your ADK agents >>> from google.adk.agents import Agent - >>> agent = Agent(name="Assistant", model="gemini-2.0-flash-exp") + >>> agent = Agent(name="Assistant", model="gemini-2.5-flash") >>> result = await agent.run_async(...) """ # pylint: disable=import-outside-toplevel from .integrations import google_adk_tracer - return google_adk_tracer.trace_google_adk() + return google_adk_tracer.trace_google_adk(disable_adk_otel=disable_adk_otel) def unpatch_google_adk(): diff --git a/src/openlayer/lib/integrations/google_adk_tracer.py b/src/openlayer/lib/integrations/google_adk_tracer.py index b3deef84..043c2b92 100644 --- a/src/openlayer/lib/integrations/google_adk_tracer.py +++ b/src/openlayer/lib/integrations/google_adk_tracer.py @@ -1,15 +1,28 @@ """Module with methods used to trace Google Agent Development Kit (ADK). This module provides instrumentation for Google's Agent Development Kit (ADK), -capturing agent execution, LLM calls, tool calls, and other ADK-specific events. +capturing agent execution, LLM calls, tool calls, callbacks, and other +ADK-specific events. + +The following callbacks are traced as Function Call steps: +- before_agent_callback: Called before the agent starts processing a request +- after_agent_callback: Called after the agent finishes processing a request +- before_model_callback: Called before each LLM model invocation +- after_model_callback: Called after each LLM model invocation +- before_tool_callback: Called before each tool execution +- after_tool_callback: Called after each tool execution + +Reference: + https://google.github.io/adk-docs/callbacks/#the-callback-mechanism-interception-and-control """ +import asyncio import contextvars import json import logging import sys import time -from typing import Any, Dict, Optional, TYPE_CHECKING +from typing import Any, Callable, Dict, Optional, TYPE_CHECKING try: import wrapt @@ -31,9 +44,13 @@ HAVE_GOOGLE_ADK = False from ..tracing import tracer, steps, enums +from ..tracing.tracer import _current_step as _tracer_current_step logger = logging.getLogger(__name__) +# Store original callbacks for restoration +_original_callbacks: Dict[str, Any] = {} + # Track wrapped methods for cleanup _wrapped_methods = [] @@ -49,12 +66,34 @@ 'google_adk_transfer_parent', default=None ) +# Context variable to store the current LLM step for updating with response data +_current_llm_step: contextvars.ContextVar[Optional[Any]] = contextvars.ContextVar( + 'google_adk_llm_step', default=None +) + +# Context variable to store the current LLM request for callbacks +_current_llm_request: contextvars.ContextVar[Optional[Any]] = contextvars.ContextVar( + 'google_adk_llm_request', default=None +) + +# Context variable to store the agent step for proper callback hierarchy +# Callbacks should be siblings of LLM calls, not children +_current_agent_step: contextvars.ContextVar[Optional[Any]] = contextvars.ContextVar( + 'google_adk_agent_step', default=None +) + + +# Configuration for whether to disable ADK's built-in OpenTelemetry tracing +# When False (default), ADK's OTel tracing works alongside Openlayer tracing +# When True, ADK's tracing is replaced with no-ops (legacy behavior) +_disable_adk_otel_tracing: bool = False + class NoOpSpan: """A no-op span that does nothing. - This is used to prevent ADK from creating its own telemetry spans - while we create Openlayer steps instead. + This is used when users want to disable ADK's OpenTelemetry tracing + and only use Openlayer's tracing. """ def __init__(self, *args: Any, **kwargs: Any) -> None: @@ -103,10 +142,10 @@ def record_exception(self, *args: Any, **kwargs: Any) -> None: class NoOpTracer: - """A tracer that creates no-op spans to prevent ADK from creating real spans. + """A tracer that creates no-op spans. - ADK has built-in OpenTelemetry tracing. We replace it with this no-op tracer - to prevent duplicate spans and use Openlayer's tracing instead. + This is only used when users explicitly want to disable ADK's + OpenTelemetry tracing via disable_adk_otel=True. """ def start_as_current_span(self, *args: Any, **kwargs: Any) -> NoOpSpan: @@ -122,7 +161,7 @@ def use_span(self, *args: Any, **kwargs: Any) -> NoOpSpan: return NoOpSpan() -def trace_google_adk() -> None: +def trace_google_adk(disable_adk_otel: bool = False) -> None: """Enable tracing for Google Agent Development Kit (ADK). This function patches Google ADK to trace agent execution, LLM calls, @@ -130,12 +169,26 @@ def trace_google_adk() -> None: automatically instruments all ADK agents created after this function is called. + By default, ADK's built-in OpenTelemetry tracing remains active, allowing + you to send telemetry to both Google Cloud (via ADK's OTel integration) + and Openlayer simultaneously. This is useful when you want to use Google + Cloud's tracing features (Cloud Trace, Cloud Monitoring, Cloud Logging) + alongside Openlayer's observability platform. + The following information is collected for each operation: - Agent execution: agent name, tools, handoffs, sub-agents - LLM calls: model, tokens (prompt, completion, total), messages, config - Tool calls: tool name, arguments, results + - All 6 ADK callbacks: before_agent, after_agent, before_model, after_model, + before_tool, after_tool - Start/end times and latency for all operations + Args: + disable_adk_otel: If True, disables ADK's built-in OpenTelemetry tracing. + When False (default), ADK's OTel tracing works alongside Openlayer, + allowing you to send data to both Google Cloud and Openlayer. + Set to True only if you want Openlayer as your sole observability tool. + Note: Agent transfers (handoffs via ``transfer_to_agent``) do not create separate tool steps to avoid excessive nesting. Sub-agent executions @@ -157,20 +210,26 @@ def trace_google_adk() -> None: from openlayer.lib.integrations import trace_google_adk - # Enable tracing (must be called before creating agents) + # Enable tracing with ADK's OTel also active (default) + # Data goes to both Google Cloud (if configured) and Openlayer trace_google_adk() + # OR: Enable tracing with ONLY Openlayer (disable ADK's OTel) + # trace_google_adk(disable_adk_otel=True) + # Now create and run your ADK agents from google.adk.agents import Agent agent = Agent( name="Assistant", - model="gemini-2.0-flash-exp", + model="gemini-2.5-flash", instructions="You are a helpful assistant" ) result = await agent.run_async(...) """ + global _disable_adk_otel_tracing + if not HAVE_GOOGLE_ADK: raise ImportError( "google-adk library is not installed. " @@ -183,7 +242,19 @@ def trace_google_adk() -> None: "Please install it with: pip install wrapt" ) - logger.info("Enabling Google ADK tracing for Openlayer") + _disable_adk_otel_tracing = disable_adk_otel + + if disable_adk_otel: + logger.info( + "Enabling Google ADK tracing for Openlayer " + "(ADK's OpenTelemetry tracing will be disabled)" + ) + else: + logger.info( + "Enabling Google ADK tracing for Openlayer " + "(ADK's OpenTelemetry tracing remains active for Google Cloud)" + ) + _patch_google_adk() @@ -204,6 +275,28 @@ def unpatch_google_adk() -> None: # ----------------------------- Helper Functions ----------------------------- # +def _sort_steps_by_time(step: Any, recursive: bool = True) -> None: + """Sort nested steps by start_time for correct chronological order. + + This ensures that steps appear in the order they were executed, + not the order they were created/added to the parent. + + Args: + step: The step whose nested steps should be sorted. + recursive: If True, also sort nested steps within children. + """ + if not hasattr(step, 'steps') or not step.steps: + return + + # Sort by start_time + step.steps.sort(key=lambda s: getattr(s, 'start_time', 0) or 0) + + # Recursively sort children if requested + if recursive: + for child_step in step.steps: + _sort_steps_by_time(child_step, recursive=True) + + def _build_llm_request_for_trace(llm_request: Any) -> Dict[str, Any]: """Build a dictionary representation of the LLM request for tracing. @@ -426,6 +519,11 @@ def extract_agent_attributes(instance: Any) -> Dict[str, Any]: def _base_agent_run_async_wrapper() -> Any: """Wrapper for BaseAgent.run_async to create agent execution steps. + This wrapper: + - Creates a AgentCallStep for the agent execution + - Automatically wraps agent callbacks for tracing + - Captures the final response and user query + Returns: Decorator function that wraps the original method. """ @@ -433,6 +531,9 @@ def actual_decorator(wrapped: Any, instance: Any, args: tuple, kwargs: dict) -> async def new_function(): agent_name = instance.name if hasattr(instance, "name") else "Unknown Agent" + # Wrap agent callbacks for tracing (if not already wrapped) + _wrap_agent_callbacks(instance) + # Check if this is a sub-agent being called via transfer transfer_parent = _agent_transfer_parent_step.get() @@ -445,6 +546,23 @@ async def new_function(): # Build metadata with session info metadata = {"agent_type": "google_adk"} + + # Add callback info to metadata (all 6 ADK callback types) + has_callbacks = [] + callback_attrs = [ + ("before_agent_callback", "before_agent"), + ("after_agent_callback", "after_agent"), + ("before_model_callback", "before_model"), + ("after_model_callback", "after_model"), + ("before_tool_callback", "before_tool"), + ("after_tool_callback", "after_tool"), + ] + for attr, name in callback_attrs: + if hasattr(instance, attr) and getattr(instance, attr): + has_callbacks.append(name) + if has_callbacks: + metadata["callbacks"] = has_callbacks + if invocation_context: if hasattr(invocation_context, "invocation_id"): metadata["invocation_id"] = invocation_context.invocation_id @@ -465,28 +583,28 @@ async def new_function(): # If we're in a transfer, create the step as a child of the transfer parent # Otherwise, use normal context (child of current step) + transfer_token = None if transfer_parent is not None: logger.debug(f"Creating sub-agent step as sibling: {agent_name}") - # Temporarily set the parent step, create our step, then restore - step_cm = tracer.create_step( - name=f"Agent: {agent_name}", - step_type=enums.StepType.USER_CALL, - inputs=inputs, - metadata=metadata, - parent_step=transfer_parent - ) + # Temporarily set current step to transfer parent so new step becomes its child + transfer_token = _tracer_current_step.set(transfer_parent) # Clear the transfer parent so nested steps work normally _agent_transfer_parent_step.set(None) - else: - step_cm = tracer.create_step( - name=f"Agent: {agent_name}", - step_type=enums.StepType.USER_CALL, - inputs=inputs, - metadata=metadata - ) + + step_cm = tracer.create_step( + name=f"Agent: {agent_name}", + step_type=enums.StepType.AGENT, + inputs=inputs, + metadata=metadata + ) # Use the step as a context manager and capture the actual step object + # Note: The step is created when entering the with block with the correct parent with step_cm as step: + # Store the agent step so callbacks can use it as parent + # This ensures callbacks are siblings of LLM calls, not children + _current_agent_step.set(step) + try: # Execute the agent async_gen = wrapped(*args, **kwargs) @@ -520,15 +638,138 @@ async def new_function(): step.output = f"Error: {str(e)}" logger.error(f"Error in agent execution: {e}") raise + finally: + # Sort all nested steps recursively by start_time to ensure chronological order + # This fixes the issue where callbacks appear after LLM calls/tools + # even though they executed before/after them + _sort_steps_by_time(step, recursive=True) + logger.debug(f"Sorted nested steps by start_time (recursive)") + + # Clear the agent step context + _current_agent_step.set(None) + + # Restore the current step context if we changed it for transfer + # This must be done AFTER the with block exits + if transfer_token is not None: + _tracer_current_step.reset(transfer_token) return new_function() return actual_decorator +def _extract_usage_from_response(response: Any) -> Dict[str, int]: + """Extract token usage from an LLM response object. + + Args: + response: The LLM response object (can be various types). + + Returns: + Dictionary with prompt_tokens, completion_tokens, total_tokens. + """ + usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} + + try: + # Check if response has usage_metadata attribute directly + if hasattr(response, "usage_metadata"): + usage_metadata = response.usage_metadata + if usage_metadata: + usage["prompt_tokens"] = getattr(usage_metadata, "prompt_token_count", 0) or 0 + usage["completion_tokens"] = getattr(usage_metadata, "candidates_token_count", 0) or 0 + usage["total_tokens"] = getattr(usage_metadata, "total_token_count", 0) or 0 + + # Check for dict-based response + elif isinstance(response, dict): + if "usage_metadata" in response: + um = response["usage_metadata"] + usage["prompt_tokens"] = um.get("prompt_token_count", 0) or 0 + usage["completion_tokens"] = um.get("candidates_token_count", 0) or 0 + usage["total_tokens"] = um.get("total_token_count", 0) or 0 + + # Try to get from model_dump if available (Pydantic model) + elif hasattr(response, "model_dump"): + try: + resp_dict = response.model_dump() + if "usage_metadata" in resp_dict: + um = resp_dict["usage_metadata"] + usage["prompt_tokens"] = um.get("prompt_token_count", 0) or 0 + usage["completion_tokens"] = um.get("candidates_token_count", 0) or 0 + usage["total_tokens"] = um.get("total_token_count", 0) or 0 + except Exception: + pass + except Exception as e: + logger.debug(f"Failed to extract usage metadata: {e}") + + return usage + + +def _extract_output_from_response(response: Any) -> Optional[str]: + """Extract text output from an LLM response. + + Args: + response: The LLM response object. + + Returns: + Extracted text content or None. + """ + try: + # Check for content attribute with parts + if hasattr(response, "content") and response.content: + content = response.content + if hasattr(content, "parts") and content.parts: + text_parts = [] + for part in content.parts: + if hasattr(part, "text") and part.text: + text_parts.append(str(part.text)) + if text_parts: + return "\n".join(text_parts) + + # Check for dict-based response + if isinstance(response, dict): + if "content" in response and "parts" in response.get("content", {}): + parts = response["content"]["parts"] + text_parts = [] + for part in parts: + if "text" in part and part.get("text") is not None: + text_parts.append(str(part["text"])) + if text_parts: + return "\n".join(text_parts) + + # Try model_dump + if hasattr(response, "model_dump"): + try: + resp_dict = response.model_dump() + if "content" in resp_dict and resp_dict["content"]: + content = resp_dict["content"] + if "parts" in content: + text_parts = [] + for part in content["parts"]: + if "text" in part and part.get("text") is not None: + text_parts.append(str(part["text"])) + if text_parts: + return "\n".join(text_parts) + except Exception: + pass + + # Fallback to text attribute + if hasattr(response, "text") and response.text: + return str(response.text) + + except Exception as e: + logger.debug(f"Failed to extract output from response: {e}") + + return None + + def _base_llm_flow_call_llm_async_wrapper() -> Any: """Wrapper for BaseLlmFlow._call_llm_async to create LLM call steps. + This wrapper: + - Creates a ChatCompletionStep for the LLM call + - Captures input messages and model parameters + - Extracts usage metadata (tokens) from the response + - Stores the step in context for callback access + Returns: Decorator function that wraps the original method. """ @@ -555,6 +796,9 @@ async def new_function(): if llm_request and hasattr(llm_request, "model"): model_name = llm_request.model + # Store request in context for callbacks + _current_llm_request.set(llm_request) + # Build request dict llm_request_dict = None if llm_request: @@ -595,21 +839,60 @@ async def new_function(): step.provider = "Google" step.model_parameters = model_parameters + # Store step in context for later updates (e.g., by callbacks) + _current_llm_step.set(step) + try: # Execute LLM call async_gen = wrapped(*args, **kwargs) collected_responses = [] + last_response = None async for item in async_gen: collected_responses.append(item) + last_response = item yield item - # The response will be finalized by _finalize_model_response_event_wrapper + # Extract usage metadata from the last response + if last_response is not None: + usage = _extract_usage_from_response(last_response) + if usage["total_tokens"] > 0 or usage["prompt_tokens"] > 0: + step.prompt_tokens = usage["prompt_tokens"] + step.completion_tokens = usage["completion_tokens"] + step.tokens = usage["total_tokens"] + logger.debug( + f"Captured token usage: prompt={usage['prompt_tokens']}, " + f"completion={usage['completion_tokens']}, " + f"total={usage['total_tokens']}" + ) + + # Extract output text + output_text = _extract_output_from_response(last_response) + if output_text: + step.output = output_text + + # Store raw response for debugging + try: + if hasattr(last_response, "model_dump"): + step.raw_output = json.dumps( + last_response.model_dump(exclude_none=True) + ) + elif isinstance(last_response, dict): + step.raw_output = json.dumps(last_response) + except Exception: + pass except Exception as e: step.output = f"Error: {str(e)}" logger.error(f"Error in LLM call: {e}") raise + finally: + # Sort nested steps by start_time for correct chronological order + _sort_steps_by_time(step, recursive=True) + + # Clear context variables + _current_llm_step.set(None) + _current_llm_request.set(None) return new_function() @@ -705,6 +988,9 @@ async def new_function(): step.output = f"Error: {str(e)}" logger.error(f"Error in tool execution: {e}") raise + finally: + # Sort nested steps by start_time for correct chronological order + _sort_steps_by_time(step, recursive=True) return new_function() @@ -724,19 +1010,421 @@ def actual_decorator(wrapped: Any, instance: Any, args: tuple, kwargs: dict) -> # Call the original method result = wrapped(*args, **kwargs) - # Extract response data - llm_request = args[0] if len(args) > 0 else kwargs.get("llm_request") + # Extract response data and update step if we have one llm_response = args[1] if len(args) > 1 else kwargs.get("llm_response") + current_step = _current_llm_step.get() - # Note: In a real implementation, we would update the current step here - # For now, we just pass through since step management is handled in the - # LLM wrapper itself + if current_step is not None and llm_response is not None: + try: + # Extract and update usage metadata + usage = _extract_usage_from_response(llm_response) + if usage["total_tokens"] > 0 or usage["prompt_tokens"] > 0: + current_step.prompt_tokens = usage["prompt_tokens"] + current_step.completion_tokens = usage["completion_tokens"] + current_step.tokens = usage["total_tokens"] + + # Extract and update output if not already set + if not current_step.output: + output_text = _extract_output_from_response(llm_response) + if output_text: + current_step.output = output_text + except Exception as e: + logger.debug(f"Error updating step from finalize: {e}") return result return actual_decorator +# ----------------------------- Callback Wrappers ----------------------------- # + + +def _extract_callback_inputs(callback_type: str, args: tuple, kwargs: dict) -> Dict[str, Any]: + """Extract inputs for a callback based on its type. + + Args: + callback_type: Type of callback (before_agent, after_agent, before_model, + after_model, before_tool, after_tool). + args: Positional arguments passed to the callback. + kwargs: Keyword arguments passed to the callback. + + Returns: + Dictionary of inputs for tracing. + """ + inputs: Dict[str, Any] = {} + + # Extract callback_context (first arg for most callbacks) + callback_context = args[0] if args else kwargs.get("callback_context") + if callback_context: + if hasattr(callback_context, "agent_name"): + inputs["agent_name"] = callback_context.agent_name + if hasattr(callback_context, "invocation_id"): + inputs["invocation_id"] = callback_context.invocation_id + if hasattr(callback_context, "state") and callback_context.state: + # Include a subset of state keys for debugging + try: + state_keys = list(callback_context.state.keys())[:10] + inputs["state_keys"] = state_keys + except Exception: + pass + + # Type-specific extraction + if callback_type == "before_agent": + # before_agent_callback(callback_context: CallbackContext) -> Optional[types.Content] + pass # callback_context already extracted above + + elif callback_type == "after_agent": + # after_agent_callback(callback_context: CallbackContext) -> Optional[types.Content] + pass # callback_context already extracted above + + elif callback_type == "before_model": + # before_model_callback(callback_context: CallbackContext, llm_request: LlmRequest) + # -> Optional[LlmResponse] + llm_request = args[1] if len(args) > 1 else kwargs.get("llm_request") + if llm_request: + if hasattr(llm_request, "model"): + inputs["model"] = llm_request.model + if hasattr(llm_request, "config"): + try: + inputs["config"] = llm_request.config.model_dump( + exclude_none=True, exclude="response_schema" + ) + except Exception: + pass + + elif callback_type == "after_model": + # after_model_callback(callback_context: CallbackContext, llm_response: LlmResponse) + # -> Optional[LlmResponse] + llm_response = args[1] if len(args) > 1 else kwargs.get("llm_response") + if llm_response: + # Extract usage from response + usage = _extract_usage_from_response(llm_response) + if usage["total_tokens"] > 0: + inputs["usage"] = usage + # Extract output text + output_text = _extract_output_from_response(llm_response) + if output_text: + inputs["response_preview"] = output_text[:200] + "..." if len(output_text) > 200 else output_text + + elif callback_type == "before_tool": + # before_tool_callback(tool: BaseTool, args: dict, tool_context: ToolContext) + # -> Optional[dict] + tool = args[0] if args else kwargs.get("tool") + tool_args = args[1] if len(args) > 1 else kwargs.get("args", {}) + tool_context = args[2] if len(args) > 2 else kwargs.get("tool_context") + + if tool: + if hasattr(tool, "name"): + inputs["tool_name"] = tool.name + if hasattr(tool, "description"): + inputs["tool_description"] = tool.description + if tool_args: + inputs["tool_args"] = tool_args + if tool_context and hasattr(tool_context, "function_call_id"): + inputs["function_call_id"] = tool_context.function_call_id + + elif callback_type == "after_tool": + # after_tool_callback(tool: BaseTool, args: dict, tool_context: ToolContext, + # tool_response: dict) -> Optional[dict] + tool = args[0] if args else kwargs.get("tool") + tool_args = args[1] if len(args) > 1 else kwargs.get("args", {}) + tool_context = args[2] if len(args) > 2 else kwargs.get("tool_context") + tool_response = args[3] if len(args) > 3 else kwargs.get("tool_response") + + if tool and hasattr(tool, "name"): + inputs["tool_name"] = tool.name + if tool_args: + inputs["tool_args"] = tool_args + if tool_response: + # Include a preview of the response + try: + if isinstance(tool_response, dict): + inputs["tool_response"] = tool_response + else: + response_str = str(tool_response) + inputs["tool_response_preview"] = ( + response_str[:200] + "..." if len(response_str) > 200 else response_str + ) + except Exception: + pass + + return inputs + + +def _create_callback_wrapper( + callback_name: str, + callback_type: str +) -> Callable: + """Create a wrapper function for ADK callbacks. + + This creates a wrapper that traces callback execution as a Function Call step. + + Callback hierarchy and timing: + - Model callbacks (before_model, after_model) are placed at the Agent level + as siblings of LLM calls + - Tool callbacks (before_tool, after_tool) are placed at the LLM level + as siblings of Tool steps + - "before_*" callbacks have their start_time adjusted to appear before + their associated operation when sorted + + Supported callback types: + - before_agent: Called before the agent starts processing + - after_agent: Called after the agent finishes processing + - before_model: Called before each LLM model invocation + - after_model: Called after each LLM model invocation + - before_tool: Called before each tool execution + - after_tool: Called after each tool execution + + Reference: + https://google.github.io/adk-docs/callbacks/#the-callback-mechanism-interception-and-control + + Args: + callback_name: Human-readable name for the callback. + callback_type: Type of callback. + + Returns: + A wrapper function that traces the callback. + """ + # Determine the parent step for this callback: + # - Model callbacks (before_model, after_model) → Agent step (siblings of LLM calls) + # - Tool callbacks (before_tool, after_tool) → LLM step (siblings of Tool steps) + use_agent_parent = callback_type in ("before_model", "after_model") + use_llm_parent = callback_type in ("before_tool", "after_tool") + + # "before_*" callbacks need their start_time adjusted to appear before + # their associated operation (since they're actually called after the operation starts) + is_before_callback = callback_type.startswith("before_") + + def wrapper(original_callback: Callable) -> Callable: + """Wrap the original callback with tracing.""" + if original_callback is None: + return None + + # Handle async callbacks + if asyncio.iscoroutinefunction(original_callback): + async def async_traced_callback(*args, **kwargs): + # Extract inputs based on callback type + inputs = _extract_callback_inputs(callback_type, args, kwargs) + + # Determine the parent step and get reference time for ordering + saved_token = None + reference_step = None + + if use_agent_parent: + # Model callbacks → Agent step (siblings of LLM calls) + agent_step = _current_agent_step.get() + if agent_step is not None: + saved_token = _tracer_current_step.set(agent_step) + # Reference for timing is the current LLM step + reference_step = _current_llm_step.get() + elif use_llm_parent: + # Tool callbacks → LLM step (siblings of Tool steps) + llm_step = _current_llm_step.get() + if llm_step is not None: + saved_token = _tracer_current_step.set(llm_step) + reference_step = llm_step + + try: + # Create a step for the callback + with tracer.create_step( + name=f"Callback: {callback_name}", + step_type=enums.StepType.USER_CALL, + inputs=inputs, + metadata={"callback_type": callback_type, "is_callback": True} + ) as step: + # Adjust start_time for "before_*" callbacks to appear before + # their associated operation when sorted by time + if is_before_callback and reference_step is not None: + ref_start = getattr(reference_step, 'start_time', None) + if ref_start is not None: + # Set start_time to be 1ms before the reference operation + step.start_time = ref_start - 0.001 + + try: + result = await original_callback(*args, **kwargs) + + # Set output based on result + if result is not None: + if hasattr(result, "model_dump"): + try: + step.output = result.model_dump(exclude_none=True) + except Exception: + step.output = str(result) + elif isinstance(result, dict): + step.output = result + else: + step.output = str(result) + else: + step.output = "Callback completed (no modification)" + + return result + except Exception as e: + step.output = f"Error: {str(e)}" + raise + finally: + # Restore the previous current step + if saved_token is not None: + _tracer_current_step.reset(saved_token) + + return async_traced_callback + else: + # Handle sync callbacks + def sync_traced_callback(*args, **kwargs): + # Extract inputs based on callback type + inputs = _extract_callback_inputs(callback_type, args, kwargs) + + # Determine the parent step and get reference time for ordering + saved_token = None + reference_step = None + + if use_agent_parent: + # Model callbacks → Agent step (siblings of LLM calls) + agent_step = _current_agent_step.get() + if agent_step is not None: + saved_token = _tracer_current_step.set(agent_step) + # Reference for timing is the current LLM step + reference_step = _current_llm_step.get() + elif use_llm_parent: + # Tool callbacks → LLM step (siblings of Tool steps) + llm_step = _current_llm_step.get() + if llm_step is not None: + saved_token = _tracer_current_step.set(llm_step) + reference_step = llm_step + + try: + # Create a step for the callback + with tracer.create_step( + name=f"Callback: {callback_name}", + step_type=enums.StepType.USER_CALL, + inputs=inputs, + metadata={"callback_type": callback_type, "is_callback": True} + ) as step: + # Adjust start_time for "before_*" callbacks to appear before + # their associated operation when sorted by time + if is_before_callback and reference_step is not None: + ref_start = getattr(reference_step, 'start_time', None) + if ref_start is not None: + # Set start_time to be 1ms before the reference operation + step.start_time = ref_start - 0.001 + + try: + result = original_callback(*args, **kwargs) + + # Set output based on result + if result is not None: + if hasattr(result, "model_dump"): + try: + step.output = result.model_dump(exclude_none=True) + except Exception: + step.output = str(result) + elif isinstance(result, dict): + step.output = result + else: + step.output = str(result) + else: + step.output = "Callback completed (no modification)" + + return result + except Exception as e: + step.output = f"Error: {str(e)}" + raise + finally: + # Restore the previous current step + if saved_token is not None: + _tracer_current_step.reset(saved_token) + + return sync_traced_callback + + return wrapper + + +def _wrap_agent_callbacks(agent: Any) -> None: + """Wrap an agent's callbacks with tracing wrappers. + + This function wraps all 6 ADK callback types on an agent instance: + - before_agent_callback: Called before the agent starts processing + - after_agent_callback: Called after the agent finishes processing + - before_model_callback: Called before each LLM model invocation + - after_model_callback: Called after each LLM model invocation + - before_tool_callback: Called before each tool execution + - after_tool_callback: Called after each tool execution + + Reference: + https://google.github.io/adk-docs/callbacks/#the-callback-mechanism-interception-and-control + + Args: + agent: The ADK agent instance to wrap callbacks for. + """ + agent_name = getattr(agent, "name", "unknown") + agent_id = id(agent) + + # Define all callback types to wrap + callback_configs = [ + ("before_agent_callback", "before_agent"), + ("after_agent_callback", "after_agent"), + ("before_model_callback", "before_model"), + ("after_model_callback", "after_model"), + ("before_tool_callback", "before_tool"), + ("after_tool_callback", "after_tool"), + ] + + for callback_attr, callback_type in callback_configs: + if hasattr(agent, callback_attr): + original = getattr(agent, callback_attr) + if original is not None and not getattr(original, "_openlayer_wrapped", False): + wrapper = _create_callback_wrapper( + f"{callback_type.replace('_', ' ')} ({agent_name})", + callback_type + ) + wrapped = wrapper(original) + wrapped._openlayer_wrapped = True + wrapped._openlayer_original = original + setattr(agent, callback_attr, wrapped) + _original_callbacks[f"{agent_id}_{callback_type}"] = original + logger.debug(f"Wrapped {callback_attr} for agent: {agent_name}") + + # Recursively wrap sub-agents + if hasattr(agent, "sub_agents") and agent.sub_agents: + for sub_agent in agent.sub_agents: + _wrap_agent_callbacks(sub_agent) + + +def _unwrap_agent_callbacks(agent: Any) -> None: + """Remove callback wrappers from an agent. + + Args: + agent: The ADK agent instance to unwrap callbacks for. + """ + agent_id = id(agent) + + # All callback attribute names + callback_attrs = [ + "before_agent_callback", + "after_agent_callback", + "before_model_callback", + "after_model_callback", + "before_tool_callback", + "after_tool_callback", + ] + + # Restore original callbacks + for callback_name in callback_attrs: + if hasattr(agent, callback_name): + callback = getattr(agent, callback_name) + if callback and hasattr(callback, "_openlayer_original"): + setattr(agent, callback_name, callback._openlayer_original) + + # Clean up stored originals + for key in list(_original_callbacks.keys()): + if key.startswith(f"{agent_id}_"): + del _original_callbacks[key] + + # Recursively unwrap sub-agents + if hasattr(agent, "sub_agents") and agent.sub_agents: + for sub_agent in agent.sub_agents: + _unwrap_agent_callbacks(sub_agent) + + # ----------------------------- Patching Functions --------------------------- # @@ -786,35 +1474,61 @@ def _patch_module_function( def _patch_google_adk() -> None: - """Apply all patches to Google ADK modules.""" - logger.debug("Applying Google ADK patches for Openlayer instrumentation") + """Apply all patches to Google ADK modules. - # First, disable ADK's own tracer by replacing it with our NoOpTracer - noop_tracer = NoOpTracer() - try: - import google.adk.telemetry as adk_telemetry - adk_telemetry.tracer = noop_tracer - logger.debug("Replaced ADK's tracer with NoOpTracer") - except Exception as e: - logger.warning(f"Failed to replace ADK tracer: {e}") + This function: + - Optionally disables ADK's built-in OpenTelemetry tracing (if configured) + - Patches agent execution (run_async) + - Patches LLM calls (_call_llm_async) + - Patches LLM response finalization + - Patches tool execution - # Also replace the tracer in modules that have already imported it - modules_to_patch = [ - "google.adk.runners", - "google.adk.agents.base_agent", - "google.adk.flows.llm_flows.base_llm_flow", - "google.adk.flows.llm_flows.functions", - ] + By default, ADK's OpenTelemetry tracing remains active, allowing users + to send telemetry to both Google Cloud and Openlayer. ADK uses OTel + exporters configured via google.adk.telemetry.get_gcp_exporters() or + standard OTEL_EXPORTER_OTLP_* environment variables. - for module_name in modules_to_patch: - if module_name in sys.modules: - try: - module = sys.modules[module_name] - if hasattr(module, "tracer"): - module.tracer = noop_tracer - logger.debug(f"Replaced tracer in {module_name}") - except Exception as e: - logger.warning(f"Failed to replace tracer in {module_name}: {e}") + Callbacks (before_model, after_model, before_tool) are wrapped + dynamically when agents run, not through static patching. + + Reference: + ADK Telemetry: https://github.com/google/adk-python/tree/main/src/google/adk/telemetry + """ + logger.debug("Applying Google ADK patches for Openlayer instrumentation") + + # Only disable ADK's tracer if explicitly requested + # By default, keep ADK's OTel tracing active for Google Cloud integration + if _disable_adk_otel_tracing: + noop_tracer = NoOpTracer() + try: + import google.adk.telemetry as adk_telemetry + adk_telemetry.tracer = noop_tracer + logger.debug("Replaced ADK's tracer with NoOpTracer") + except Exception as e: + logger.warning(f"Failed to replace ADK tracer: {e}") + + # Also replace the tracer in modules that have already imported it + modules_to_patch = [ + "google.adk.runners", + "google.adk.agents.base_agent", + "google.adk.flows.llm_flows.base_llm_flow", + "google.adk.flows.llm_flows.functions", + ] + + for module_name in modules_to_patch: + if module_name in sys.modules: + try: + module = sys.modules[module_name] + if hasattr(module, "tracer"): + module.tracer = noop_tracer + logger.debug(f"Replaced tracer in {module_name}") + except Exception as e: + logger.warning(f"Failed to replace tracer in {module_name}: {e}") + else: + logger.debug( + "Keeping ADK's OpenTelemetry tracing active. " + "Telemetry will be sent to both Google Cloud (if configured) and Openlayer." + ) # Patch agent execution _patch( @@ -847,22 +1561,40 @@ def _patch_google_adk() -> None: _call_tool_async_wrapper ) - logger.info("Google ADK patching complete") + if _disable_adk_otel_tracing: + logger.info( + "Google ADK patching complete. " + "ADK's OTel tracing disabled, using Openlayer only." + ) + else: + logger.info( + "Google ADK patching complete. " + "ADK's OTel tracing active (Google Cloud) + Openlayer tracing enabled." + ) def _unpatch_google_adk() -> None: - """Remove all patches from Google ADK modules.""" + """Remove all patches from Google ADK modules. + + This function: + - Restores ADK's built-in OpenTelemetry tracing (if it was disabled) + - Removes all method patches + - Clears stored original callbacks + """ + global _disable_adk_otel_tracing + logger.debug("Removing Google ADK patches") - # Restore ADK's tracer - try: - import google.adk.telemetry as adk_telemetry - from opentelemetry import trace - - adk_telemetry.tracer = trace.get_tracer("gcp.vertex.agent") - logger.debug("Restored ADK's built-in tracer") - except Exception as e: - logger.warning(f"Failed to restore ADK tracer: {e}") + # Restore ADK's tracer only if we disabled it + if _disable_adk_otel_tracing: + try: + import google.adk.telemetry as adk_telemetry + from opentelemetry import trace + + adk_telemetry.tracer = trace.get_tracer("gcp.vertex.agent") + logger.debug("Restored ADK's built-in tracer") + except Exception as e: + logger.warning(f"Failed to restore ADK tracer: {e}") # Unwrap all methods for obj, method_name in _wrapped_methods: @@ -875,5 +1607,12 @@ def _unpatch_google_adk() -> None: logger.warning(f"Failed to unwrap {obj}.{method_name}: {e}") _wrapped_methods.clear() + + # Clear stored original callbacks + _original_callbacks.clear() + + # Reset the flag + _disable_adk_otel_tracing = False + logger.info("Google ADK unpatching complete")