diff --git a/core/src/agents/base_agent.ts b/core/src/agents/base_agent.ts index 658cda9..ae5d2bc 100644 --- a/core/src/agents/base_agent.ts +++ b/core/src/agents/base_agent.ts @@ -5,10 +5,15 @@ */ import {Content} from '@google/genai'; -import {trace} from '@opentelemetry/api'; +import {context, trace} from '@opentelemetry/api'; import {createEvent, Event} from '../events/event.js'; +import { + runAsyncGeneratorWithOtelContext, + traceAgentInvocation, + tracer, +} from '../telemetry/tracing.js'; import {CallbackContext} from './callback_context.js'; import {InvocationContext} from './invocation_context.js'; @@ -164,35 +169,41 @@ export abstract class BaseAgent { async *runAsync( parentContext: InvocationContext, ): AsyncGenerator { - const span = trace - .getTracer('gcp.vertex.agent') - .startSpan(`agent_run [${this.name}]`); + const span = tracer.startSpan(`invoke_agent ${this.name}`); + const ctx = trace.setSpan(context.active(), span); try { - const context = this.createInvocationContext(parentContext); - - const beforeAgentCallbackEvent = - await this.handleBeforeAgentCallback(context); - if (beforeAgentCallbackEvent) { - yield beforeAgentCallbackEvent; - } - - if (context.endInvocation) { - return; - } - - for await (const event of this.runAsyncImpl(context)) { - yield event; - } - - if (context.endInvocation) { - return; - } - - const afterAgentCallbackEvent = - await this.handleAfterAgentCallback(context); - if (afterAgentCallbackEvent) { - yield afterAgentCallbackEvent; - } + yield* runAsyncGeneratorWithOtelContext( + ctx, + this, + async function* () { + const context = this.createInvocationContext(parentContext); + + const beforeAgentCallbackEvent = + await this.handleBeforeAgentCallback(context); + if (beforeAgentCallbackEvent) { + yield beforeAgentCallbackEvent; + } + + if (context.endInvocation) { + return; + } + + traceAgentInvocation({agent: this, invocationContext: context}); + for await (const event of this.runAsyncImpl(context)) { + yield event; + } + + if (context.endInvocation) { + return; + } + + const afterAgentCallbackEvent = + await this.handleAfterAgentCallback(context); + if (afterAgentCallbackEvent) { + yield afterAgentCallbackEvent; + } + }, + ); } finally { span.end(); } @@ -205,15 +216,19 @@ export abstract class BaseAgent { * @yields The events generated by the agent. * @returns An AsyncGenerator that yields the events generated by the agent. */ - // eslint-disable-next-line require-yield async *runLive( parentContext: InvocationContext, // eslint-disable-line @typescript-eslint/no-unused-vars ): AsyncGenerator { - const span = trace - .getTracer('gcp.vertex.agent') - .startSpan(`agent_run [${this.name}]`); + const span = tracer.startSpan(`invoke_agent ${this.name}`); + const ctx = trace.setSpan(context.active(), span); try { - // TODO(b/425992518): Implement live mode. + yield* runAsyncGeneratorWithOtelContext( + ctx, + this, + async function* () { + // TODO(b/425992518): Implement live mode. + }, + ); throw new Error('Live mode is not implemented yet.'); } finally { span.end(); diff --git a/core/src/agents/functions.ts b/core/src/agents/functions.ts index aedd82a..9277cfb 100644 --- a/core/src/agents/functions.ts +++ b/core/src/agents/functions.ts @@ -4,7 +4,6 @@ * SPDX-License-Identifier: Apache-2.0 */ -// TODO - b/436079721: implement traceMergedToolCalls, traceToolCall, tracer. import {Content, createUserContent, FunctionCall, Part} from '@google/genai'; import {isEmpty} from 'lodash-es'; @@ -17,6 +16,11 @@ import {ToolContext} from '../tools/tool_context.js'; import {randomUUID} from '../utils/env_aware_utils.js'; import {logger} from '../utils/logger.js'; +import { + traceMergedToolCalls, + tracer, + traceToolCall, +} from '../telemetry/tracing.js'; import { SingleAfterToolCallback, SingleBeforeToolCallback, @@ -212,11 +216,61 @@ async function callToolAsync( toolContext: ToolContext, // eslint-disable-next-line @typescript-eslint/no-explicit-any ): Promise { - // TODO - b/436079721: implement [tracer.start_as_current_span] - logger.debug(`callToolAsync ${tool.name}`); - return await tool.runAsync({args, toolContext}); + return tracer.startActiveSpan(`execute_tool ${tool.name}`, async (span) => { + try { + logger.debug(`callToolAsync ${tool.name}`); + const result = await tool.runAsync({args, toolContext}); + traceToolCall({ + tool, + args, + functionResponseEvent: buildResponseEvent( + tool, + result, + toolContext, + toolContext.invocationContext, + ), + }); + return result; + } finally { + span.end(); + } + }); } +function buildResponseEvent( + tool: BaseTool, + functionResult: unknown, + toolContext: ToolContext, + invocationContext: InvocationContext, +): Event { + let responseResult: Record; + if (typeof functionResult !== 'object' || functionResult == null) { + responseResult = {result: functionResult}; + } else { + responseResult = functionResult as Record; + } + + const partFunctionResponse: Part = { + functionResponse: { + name: tool.name, + response: responseResult, + id: toolContext.functionCallId, + }, + }; + + const content: Content = { + role: 'user', + parts: [partFunctionResponse], + }; + + return createEvent({ + invocationId: invocationContext.invocationId, + author: invocationContext.agent.name, + content: content, + actions: toolContext.actions, + branch: invocationContext.branch, + }); +} /** * Handles function calls. * Runtime behavior to pay attention to: @@ -444,12 +498,21 @@ export async function handleFunctionCallList({ ); if (functionResponseEvents.length > 1) { - // TODO - b/436079721: implement [tracer.start_as_current_span] - logger.debug('execute_tool (merged)'); - // TODO - b/436079721: implement [traceMergedToolCalls] - logger.debug('traceMergedToolCalls', { - responseEventId: mergedEvent.id, - functionResponseEvent: mergedEvent.id, + tracer.startActiveSpan('execute_tool (merged)', (span) => { + try { + logger.debug('execute_tool (merged)'); + // TODO - b/436079721: implement [traceMergedToolCalls] + logger.debug('traceMergedToolCalls', { + responseEventId: mergedEvent.id, + functionResponseEvent: mergedEvent.id, + }); + traceMergedToolCalls({ + responseEventId: mergedEvent.id, + functionResponseEvent: mergedEvent, + }); + } finally { + span.end(); + } }); } return mergedEvent; diff --git a/core/src/agents/llm_agent.ts b/core/src/agents/llm_agent.ts index a20e33e..acc6146 100644 --- a/core/src/agents/llm_agent.ts +++ b/core/src/agents/llm_agent.ts @@ -11,6 +11,7 @@ import { Part, Schema, } from '@google/genai'; +import {context, trace} from '@opentelemetry/api'; import {cloneDeep} from 'lodash-es'; import {z} from 'zod'; @@ -56,6 +57,11 @@ import {ToolContext} from '../tools/tool_context.js'; import {base64Decode} from '../utils/env_aware_utils.js'; import {logger} from '../utils/logger.js'; +import { + runAsyncGeneratorWithOtelContext, + traceCallLlm, + tracer, +} from '../telemetry/tracing.js'; import {BaseAgent, BaseAgentConfig} from './base_agent.js'; import { BaseLlmRequestProcessor, @@ -1707,26 +1713,35 @@ export class LlmAgent extends BaseAgent { author: this.name, branch: invocationContext.branch, }); - for await (const llmResponse of this.callLlmAsync( - invocationContext, - llmRequest, - modelResponseEvent, - )) { - // ====================================================================== - // Postprocess after calling the LLM - // ====================================================================== - for await (const event of this.postprocess( - invocationContext, - llmRequest, - llmResponse, - modelResponseEvent, - )) { - // Update the mutable event id to avoid conflict - modelResponseEvent.id = createNewEventId(); - modelResponseEvent.timestamp = new Date().getTime(); - yield event; - } - } + const span = tracer.startSpan('call_llm'); + const ctx = trace.setSpan(context.active(), span); + yield* runAsyncGeneratorWithOtelContext( + ctx, + this, + async function* () { + for await (const llmResponse of this.callLlmAsync( + invocationContext, + llmRequest, + modelResponseEvent, + )) { + // ====================================================================== + // Postprocess after calling the LLM + // ====================================================================== + for await (const event of this.postprocess( + invocationContext, + llmRequest, + llmResponse, + modelResponseEvent, + )) { + // Update the mutable event id to avoid conflict + modelResponseEvent.id = createNewEventId(); + modelResponseEvent.timestamp = new Date().getTime(); + yield event; + } + } + }, + ); + span.end(); } private async *postprocess( @@ -1885,7 +1900,6 @@ export class LlmAgent extends BaseAgent { // Calls the LLM. const llm = this.canonicalModel; - // TODO - b/436079721: Add tracer.start_as_current_span('call_llm') if (invocationContext.runConfig?.supportCfc) { // TODO - b/425992518: Implement CFC call path // This is a hack, underneath it calls runLive. Which makes @@ -1905,8 +1919,12 @@ export class LlmAgent extends BaseAgent { llmRequest, modelResponseEvent, )) { - // TODO - b/436079721: Add trace_call_llm - + traceCallLlm({ + invocationContext, + eventId: modelResponseEvent.id, + llmRequest, + llmResponse, + }); // Runs after_model_callback if it exists. const alteredLlmResponse = await this.handleAfterModelCallback( invocationContext, diff --git a/core/src/runner/runner.ts b/core/src/runner/runner.ts index 09f3da6..6117f02 100644 --- a/core/src/runner/runner.ts +++ b/core/src/runner/runner.ts @@ -5,7 +5,7 @@ */ import {Content, createPartFromText} from '@google/genai'; -import {trace} from '@opentelemetry/api'; +import {context, trace} from '@opentelemetry/api'; import {BaseAgent} from '../agents/base_agent.js'; import { @@ -27,6 +27,10 @@ import {BasePlugin} from '../plugins/base_plugin.js'; import {PluginManager} from '../plugins/plugin_manager.js'; import {BaseSessionService} from '../sessions/base_session_service.js'; import {Session} from '../sessions/session.js'; +import { + runAsyncGeneratorWithOtelContext, + tracer, +} from '../telemetry/tracing.js'; import {logger} from '../utils/logger.js'; import {isGemini2OrAbove} from '../utils/model_name.js'; @@ -95,144 +99,161 @@ export class Runner { // ========================================================================= // Setup the session and invocation context // ========================================================================= - const span = trace.getTracer('gcp.vertex.agent').startSpan('invocation'); + const span = tracer.startSpan('invocation'); + const ctx = trace.setSpan(context.active(), span); try { - const session = await this.sessionService.getSession({ - appName: this.appName, - userId, - sessionId, - }); + yield* runAsyncGeneratorWithOtelContext( + ctx, + this, + async function* () { + const session = await this.sessionService.getSession({ + appName: this.appName, + userId, + sessionId, + }); - if (!session) { - if (!this.appName) { - throw new Error( - `Session lookup failed: appName must be provided in runner constructor`, - ); - } - throw new Error(`Session not found: ${sessionId}`); - } + if (!session) { + if (!this.appName) { + throw new Error( + `Session lookup failed: appName must be provided in runner constructor`, + ); + } + throw new Error(`Session not found: ${sessionId}`); + } - if (runConfig.supportCfc && isLlmAgent(this.agent)) { - const modelName = this.agent.canonicalModel.model; - if (!isGemini2OrAbove(modelName)) { - throw new Error( - `CFC is not supported for model: ${ - modelName - } in agent: ${this.agent.name}`, - ); - } + if (runConfig.supportCfc && isLlmAgent(this.agent)) { + const modelName = this.agent.canonicalModel.model; + if (!isGemini2OrAbove(modelName)) { + throw new Error( + `CFC is not supported for model: ${ + modelName + } in agent: ${this.agent.name}`, + ); + } - if (!isBuiltInCodeExecutor(this.agent.codeExecutor)) { - this.agent.codeExecutor = new BuiltInCodeExecutor(); - } - } + if (!isBuiltInCodeExecutor(this.agent.codeExecutor)) { + this.agent.codeExecutor = new BuiltInCodeExecutor(); + } + } - const invocationContext = new InvocationContext({ - artifactService: this.artifactService, - sessionService: this.sessionService, - memoryService: this.memoryService, - credentialService: this.credentialService, - invocationId: newInvocationContextId(), - agent: this.agent, - session, - userContent: newMessage, - runConfig, - pluginManager: this.pluginManager, - }); + const invocationContext = new InvocationContext({ + artifactService: this.artifactService, + sessionService: this.sessionService, + memoryService: this.memoryService, + credentialService: this.credentialService, + invocationId: newInvocationContextId(), + agent: this.agent, + session, + userContent: newMessage, + runConfig, + pluginManager: this.pluginManager, + }); - // ========================================================================= - // Preprocess plugins on user message - // ========================================================================= - const pluginUserMessage = - await this.pluginManager.runOnUserMessageCallback({ - userMessage: newMessage, - invocationContext, - }); - if (pluginUserMessage) { - newMessage = pluginUserMessage as Content; - } + // ========================================================================= + // Preprocess plugins on user message + // ========================================================================= + const pluginUserMessage = + await this.pluginManager.runOnUserMessageCallback({ + userMessage: newMessage, + invocationContext, + }); + if (pluginUserMessage) { + newMessage = pluginUserMessage as Content; + } - // ========================================================================= - // Append user message to session - // ========================================================================= - if (newMessage) { - if (!newMessage.parts?.length) { - throw new Error('No parts in the newMessage.'); - } + // ========================================================================= + // Append user message to session + // ========================================================================= + if (newMessage) { + if (!newMessage.parts?.length) { + throw new Error('No parts in the newMessage.'); + } - // Directly saves the artifacts (if applicable) in the user message and - // replaces the artifact data with a file name placeholder. - // TODO - b/425992518: fix Runner<>>ArtifactService leaky abstraction. - if (runConfig.saveInputBlobsAsArtifacts) { - await this.saveArtifacts( - invocationContext.invocationId, - session.userId, - session.id, - newMessage, - ); - } - // Append the user message to the session with optional state delta. - await this.sessionService.appendEvent({ - session, - event: createEvent({ - invocationId: invocationContext.invocationId, - author: 'user', - actions: stateDelta ? createEventActions({stateDelta}) : undefined, - content: newMessage, - }), - }); - } + // Directly saves the artifacts (if applicable) in the user message and + // replaces the artifact data with a file name placeholder. + // TODO - b/425992518: fix Runner<>>ArtifactService leaky abstraction. + if (runConfig.saveInputBlobsAsArtifacts) { + await this.saveArtifacts( + invocationContext.invocationId, + session.userId, + session.id, + newMessage, + ); + } + // Append the user message to the session with optional state delta. + await this.sessionService.appendEvent({ + session, + event: createEvent({ + invocationId: invocationContext.invocationId, + author: 'user', + actions: stateDelta + ? createEventActions({stateDelta}) + : undefined, + content: newMessage, + }), + }); + } - // ========================================================================= - // Determine which agent should handle the workflow resumption. - // ========================================================================= - invocationContext.agent = this.determineAgentForResumption( - session, - this.agent, - ); + // ========================================================================= + // Determine which agent should handle the workflow resumption. + // ========================================================================= + invocationContext.agent = this.determineAgentForResumption( + session, + this.agent, + ); - // ========================================================================= - // Run the agent with the plugins (aka hooks to apply in the lifecycle) - // ========================================================================= - // Step 1: Run the before_run callbacks to see if we should early exit. - const beforeRunCallbackResponse = - await this.pluginManager.runBeforeRunCallback({invocationContext}); + // ========================================================================= + // Run the agent with the plugins (aka hooks to apply in the lifecycle) + // ========================================================================= + if (newMessage) { + // ========================================================================= + // Run the agent with the plugins (aka hooks to apply in the lifecycle) + // ========================================================================= + // Step 1: Run the before_run callbacks to see if we should early exit. + const beforeRunCallbackResponse = + await this.pluginManager.runBeforeRunCallback({ + invocationContext, + }); - if (beforeRunCallbackResponse) { - const earlyExitEvent = createEvent({ - invocationId: invocationContext.invocationId, - author: 'model', - content: beforeRunCallbackResponse, - }); - // TODO: b/447446338 - In the future, do *not* save live call audio - // content to session This is a feature in Python ADK - await this.sessionService.appendEvent({ - session, - event: earlyExitEvent, - }); - yield earlyExitEvent; - } else { - // Step 2: Otherwise continue with normal execution - for await (const event of invocationContext.agent.runAsync( - invocationContext, - )) { - if (!event.partial) { - await this.sessionService.appendEvent({session, event}); - } - // Step 3: Run the on_event callbacks to optionally modify the event. - const modifiedEvent = await this.pluginManager.runOnEventCallback({ - invocationContext, - event, - }); - if (modifiedEvent) { - yield modifiedEvent; - } else { - yield event; + if (beforeRunCallbackResponse) { + const earlyExitEvent = createEvent({ + invocationId: invocationContext.invocationId, + author: 'model', + content: beforeRunCallbackResponse, + }); + // TODO: b/447446338 - In the future, do *not* save live call audio + // content to session This is a feature in Python ADK + await this.sessionService.appendEvent({ + session, + event: earlyExitEvent, + }); + yield earlyExitEvent; + } else { + // Step 2: Otherwise continue with normal execution + for await (const event of invocationContext.agent.runAsync( + invocationContext, + )) { + if (!event.partial) { + await this.sessionService.appendEvent({session, event}); + } + // Step 3: Run the on_event callbacks to optionally modify the event. + const modifiedEvent = + await this.pluginManager.runOnEventCallback({ + invocationContext, + event, + }); + if (modifiedEvent) { + yield modifiedEvent; + } else { + yield event; + } + } + // Step 4: Run the after_run callbacks to optionally modify the context. + await this.pluginManager.runAfterRunCallback({invocationContext}); + } } - } - } - // Step 4: Run the after_run callbacks to optionally modify the context. - await this.pluginManager.runAfterRunCallback({invocationContext}); + }, + ); } finally { span.end(); } diff --git a/core/src/telemetry/tracing.ts b/core/src/telemetry/tracing.ts index 8ae9c8c..4d97440 100644 --- a/core/src/telemetry/tracing.ts +++ b/core/src/telemetry/tracing.ts @@ -352,11 +352,10 @@ function buildLlmRequestForTrace( * * @returns A new async generator that executes all operations within the provided context */ -// eslint-disable-next-line @typescript-eslint/no-explicit-any -export function bindAsyncGenerator( +function bindOtelContextToAsyncGenerator( ctx: Context, - generator: AsyncGenerator, -): AsyncGenerator { + generator: AsyncGenerator, +): AsyncGenerator { return { // Bind the next() method to execute within the provided context next: context.bind(ctx, generator.next.bind(generator)), @@ -369,11 +368,32 @@ export function bindAsyncGenerator( // Ensure the async iterator symbol also returns a context-bound generator [Symbol.asyncIterator]() { - return bindAsyncGenerator(ctx, generator[Symbol.asyncIterator]()); + return bindOtelContextToAsyncGenerator( + ctx, + generator[Symbol.asyncIterator](), + ); }, }; } +/** + * Runs an async generator function with both OTEL context and JavaScript 'this' context. + * + * @param otelContext - The OpenTelemetry context to bind the generator to + * @param generatorFnContext - The 'this' context to bind to the generator function + * @param generatorFn - The generator function to execute + * + * @returns A new async generator that executes within both contexts + */ +export function runAsyncGeneratorWithOtelContext( + otelContext: Context, + generatorFnContext: TThis, + generatorFn: (this: TThis) => AsyncGenerator, +): AsyncGenerator { + const generator = generatorFn.call(generatorFnContext); + return bindOtelContextToAsyncGenerator(otelContext, generator); +} + /** * Determines whether to add request/response content to spans. * diff --git a/dev/src/cli/cli.ts b/dev/src/cli/cli.ts index 0ac48ee..c281aca 100644 --- a/dev/src/cli/cli.ts +++ b/dev/src/cli/cli.ts @@ -86,6 +86,10 @@ const LOG_LEVEL_OPTION = new Option( const ARTIFACT_SERVICE_URI_OPTION = new Option( '--artifact_service_uri , Optional. The URI of the artifact service, supported URIs: gs:// for GCS artifact service.', ); +const OTEL_TO_CLOUD_OPTION = new Option( + '--otel_to_cloud [boolean]', + 'Optional. Whether to send otel traces to cloud.', +).default(false); const program = new Command('ADK CLI'); @@ -99,6 +103,7 @@ program .addOption(VERBOSE_OPTION) .addOption(LOG_LEVEL_OPTION) .addOption(ARTIFACT_SERVICE_URI_OPTION) + .addOption(OTEL_TO_CLOUD_OPTION) .action((agentsDir: string, options: Record) => { setLogLevel(getLogLevelFromOptions(options)); @@ -111,6 +116,7 @@ program artifactService: options['artifact_service_uri'] ? getArtifactServiceFromUri(options['artifact_service_uri']) : undefined, + otelToCloud: options['otel_to_cloud'] ? true : false, }); server.start(); @@ -126,6 +132,7 @@ program .addOption(VERBOSE_OPTION) .addOption(LOG_LEVEL_OPTION) .addOption(ARTIFACT_SERVICE_URI_OPTION) + .addOption(OTEL_TO_CLOUD_OPTION) .action((agentsDir: string, options: Record) => { setLogLevel(getLogLevelFromOptions(options)); @@ -138,6 +145,7 @@ program artifactService: options['artifact_service_uri'] ? getArtifactServiceFromUri(options['artifact_service_uri']) : undefined, + otelToCloud: options['otel_to_cloud'] ? true : false, }); server.start(); }); @@ -200,6 +208,7 @@ program .addOption(VERBOSE_OPTION) .addOption(LOG_LEVEL_OPTION) .addOption(ARTIFACT_SERVICE_URI_OPTION) + .addOption(OTEL_TO_CLOUD_OPTION) .action((agentPath: string, options: Record) => { setLogLevel(getLogLevelFromOptions(options)); @@ -212,6 +221,7 @@ program artifactService: options['artifact_service_uri'] ? getArtifactServiceFromUri(options['artifact_service_uri']) : undefined, + otelToCloud: options['otel_to_cloud'] ? true : false, }); }); diff --git a/dev/src/cli/cli_deploy.ts b/dev/src/cli/cli_deploy.ts index b4e10b6..33f3aad 100644 --- a/dev/src/cli/cli_deploy.ts +++ b/dev/src/cli/cli_deploy.ts @@ -32,6 +32,7 @@ export interface CreateDockerFileContentOptions { logLevel: string; allowOrigins?: string; artifactServiceUri?: string; + otelToCloud?: boolean; } export interface DeployToCloudRunOptions extends CreateDockerFileContentOptions { @@ -40,6 +41,7 @@ export interface DeployToCloudRunOptions extends CreateDockerFileContentOptions tempFolder: string; adkVersion: string; extraGcloudArgs?: string[]; + otelToCloud?: boolean; } function validateGcloudExtraArgs( @@ -183,6 +185,10 @@ function createDockerFileContent( ); } + if (options.otelToCloud) { + adkServerOptions.push('--otel_to_cloud'); + } + return ` FROM node:lts-alpine WORKDIR /app @@ -312,6 +318,7 @@ export async function deployToCloudRun(options: DeployToCloudRunOptions) { withUi: options.withUi, logLevel: options.logLevel, allowOrigins: options.allowOrigins, + otelToCloud: options.otelToCloud, }); console.info('Deploying to Cloud Run...'); diff --git a/dev/src/cli/cli_run.ts b/dev/src/cli/cli_run.ts index dd9e84c..ac4002d 100644 --- a/dev/src/cli/cli_run.ts +++ b/dev/src/cli/cli_run.ts @@ -153,6 +153,7 @@ export interface RunAgentOptions { artifactService?: BaseArtifactService; sessionService?: BaseSessionService; memoryService?: BaseMemoryService; + otelToCloud?: boolean; } export async function runAgent(options: RunAgentOptions): Promise { try { diff --git a/dev/src/server/adk_web_server.ts b/dev/src/server/adk_web_server.ts index d45c06a..f48010d 100644 --- a/dev/src/server/adk_web_server.ts +++ b/dev/src/server/adk_web_server.ts @@ -18,12 +18,20 @@ import { Runner, StreamingMode, } from '@google/adk'; +import {trace, TracerProvider} from '@opentelemetry/api'; +import {SimpleSpanProcessor} from '@opentelemetry/sdk-trace-base'; import cors from 'cors'; import express, {Request, Response} from 'express'; import * as http from 'http'; import * as path from 'path'; import {AgentLoader} from '../utils/agent_loader.js'; +import { + ApiServerSpanExporter, + hrTimeToNanoseconds, + InMemoryExporter, + setupTelemetry, +} from '../utils/telemetry_utils.js'; import {getAgentGraphAsDot} from './agent_graph.js'; @@ -37,6 +45,8 @@ interface ServerOptions { agentLoader?: AgentLoader; serveDebugUI?: boolean; allowOrigins?: string; + otelToCloud?: boolean; + registerProcessors?: (tracerProvider: TracerProvider) => void; } export class AdkWebServer { @@ -50,7 +60,14 @@ export class AdkWebServer { private readonly artifactService: BaseArtifactService; private readonly serveDebugUI: boolean; private readonly allowOrigins?: string; + private readonly otelToCloud: boolean; + private readonly registerProcessors?: ( + tracerProvider: TracerProvider, + ) => void; private server?: http.Server; + private readonly traceDict: Record> = {}; + private readonly sessionTraceDict: Record = {}; + private memoryExporter: InMemoryExporter; constructor(options: ServerOptions) { this.host = options.host ?? 'localhost'; @@ -64,14 +81,32 @@ export class AdkWebServer { options.agentLoader ?? new AgentLoader(options.agentsDir); this.serveDebugUI = options.serveDebugUI ?? false; this.allowOrigins = options.allowOrigins; + this.otelToCloud = options.otelToCloud ?? false; + this.registerProcessors = options.registerProcessors; + this.memoryExporter = new InMemoryExporter(this.sessionTraceDict); this.app = express(); this.init(); } + private async setupTelemetry(): Promise { + const internalExporters = [ + new SimpleSpanProcessor(new ApiServerSpanExporter(this.traceDict)), + new SimpleSpanProcessor(this.memoryExporter), + ]; + + await setupTelemetry(this.otelToCloud, internalExporters); + + if (this.registerProcessors) { + const tracerProvider = trace.getTracerProvider(); + this.registerProcessors(tracerProvider); + } + } + private init() { const app = this.app; + this.setupTelemetry(); if (this.serveDebugUI) { app.get('/', (req: Request, res: Response) => { @@ -106,7 +141,6 @@ export class AdkWebServer { app.get('/list-apps', async (req: Request, res: Response) => { try { const apps = await this.agentLoader.listAgents(); - res.json(apps); } catch (e: unknown) { res.status(500).json({error: (e as Error).message}); @@ -114,13 +148,35 @@ export class AdkWebServer { }); app.get('/debug/trace/:eventId', (req: Request, res: Response) => { - return res.status(501).json({error: 'Not implemented'}); + const eventId = req.params['eventId']; + const eventDict = this.traceDict[eventId]; + + if (!eventDict) { + return res.status(404).json({error: 'Trace not found'}); + } + + return res.json(eventDict); }); app.get( '/debug/trace/session/:sessionId', (req: Request, res: Response) => { - return res.status(501).json({error: 'Not implemented'}); + const sessionId = req.params['sessionId']; + const spans = this.memoryExporter.getFinishedSpans(sessionId); + if (spans.length === 0) { + return res.json([]); + } + const spanData = spans.map((span) => ({ + name: span.name, + span_id: span.spanContext().spanId, + trace_id: span.spanContext().traceId, + start_time: hrTimeToNanoseconds(span.startTime), + end_time: hrTimeToNanoseconds(span.endTime), + attributes: {...span.attributes}, + parent_span_id: span.parentSpanContext?.spanId || null, + })); + + return res.json(spanData); }, ); diff --git a/dev/src/utils/telemetry_utils.ts b/dev/src/utils/telemetry_utils.ts new file mode 100644 index 0000000..f9a874d --- /dev/null +++ b/dev/src/utils/telemetry_utils.ts @@ -0,0 +1,184 @@ +import { + getGcpExporters, + getGcpResource, + maybeSetOtelProviders, + OTelHooks, +} from '@google/adk'; +import {HrTime} from '@opentelemetry/api'; +import { + ReadableSpan, + SpanExporter, + SpanProcessor, +} from '@opentelemetry/sdk-trace-base'; + +/** + * Converts HrTime to nanoseconds timestamp + * + * @param hrTime The HrTime array [seconds, nanoseconds] + * @returns Time in nanoseconds as a number + */ +export function hrTimeToNanoseconds(hrTime: HrTime): number { + if (!hrTime || !Array.isArray(hrTime) || hrTime.length !== 2) { + return 0; + } + + const [seconds, nanoseconds] = hrTime; + + return seconds * 1e9 + nanoseconds; +} + +export class ApiServerSpanExporter implements SpanExporter { + private traceDict: Record>; + + constructor(traceDict: Record>) { + this.traceDict = traceDict; + } + + export( + spans: ReadableSpan[], + resultCallback: (result: {code: number}) => void, + ): void { + for (const span of spans) { + if ( + span.name === 'call_llm' || + span.name === 'send_data' || + span.name.startsWith('execute_tool') + ) { + const attributes = {...span.attributes}; + attributes['trace_id'] = span.spanContext().traceId; + attributes['span_id'] = span.spanContext().spanId; + + const eventId = attributes['gcp.vertex.agent.event_id']; + if (eventId) { + this.traceDict[eventId as string] = attributes; + } + } + } + resultCallback({code: 0}); + } + + forceFlush(): Promise { + return Promise.resolve(); + } + + shutdown(): Promise { + return Promise.resolve(); + } +} + +export class InMemoryExporter implements SpanExporter { + private spans: ReadableSpan[] = []; + private traceDict: Record; + + constructor(traceDict: Record) { + this.traceDict = traceDict; + } + + export( + spans: ReadableSpan[], + resultCallback: (result: {code: number}) => void, + ): void { + for (const span of spans) { + const traceId = span.spanContext().traceId; + if (span.name === 'call_llm') { + const attributes = {...span.attributes}; + const sessionId = attributes['gcp.vertex.agent.session_id'] as string; + if (sessionId) { + if (!this.traceDict[sessionId]) { + this.traceDict[sessionId] = [traceId]; + } else { + this.traceDict[sessionId].push(traceId); + } + } + } + } + this.spans.push(...spans); + resultCallback({code: 0}); + } + + forceFlush(): Promise { + return Promise.resolve(); + } + + shutdown(): Promise { + return Promise.resolve(); + } + + getFinishedSpans(sessionId: string): ReadableSpan[] { + const traceIds = this.traceDict[sessionId]; + if (!traceIds || traceIds.length === 0) { + return []; + } + return this.spans.filter((span) => + traceIds.includes(span.spanContext().traceId), + ); + } + + clear(): void { + this.spans = []; + } +} + +function otelEnvVarsEnabled(): boolean { + const endpointVars = [ + 'OTEL_EXPORTER_OTLP_ENDPOINT', + 'OTEL_EXPORTER_OTLP_TRACES_ENDPOINT', + 'OTEL_EXPORTER_OTLP_METRICS_ENDPOINT', + 'OTEL_EXPORTER_OTLP_LOGS_ENDPOINT', + ]; + + return endpointVars.some((varName) => process.env[varName]); +} + +export async function setupTelemetry( + otelToCloud: boolean = false, + internalExporters: SpanProcessor[] = [], +): Promise { + if (otelToCloud) { + await setupGcpTelemetryExperimental(internalExporters); + } else if (otelEnvVarsEnabled()) { + await setupTelemetryFromEnvExperimental(internalExporters); + } else { + const otelHooks: OTelHooks = { + spanProcessors: internalExporters, + }; + maybeSetOtelProviders([otelHooks]); + } +} + +async function setupGcpTelemetryExperimental( + internalExporters: SpanProcessor[] = [], +): Promise { + const otelHooksToAdd: OTelHooks[] = []; + + if (internalExporters.length > 0) { + otelHooksToAdd.push({ + spanProcessors: internalExporters, + }); + } + + const gcpExporters = await getGcpExporters({ + enableTracing: true, + enableLogging: false, + enableMetrics: true, + }); + otelHooksToAdd.push(gcpExporters); + + const otelResource = getGcpResource(); + + maybeSetOtelProviders(otelHooksToAdd, otelResource); +} + +async function setupTelemetryFromEnvExperimental( + internalExporters: SpanProcessor[] = [], +): Promise { + const otelHooksToAdd: OTelHooks[] = []; + + if (internalExporters.length > 0) { + otelHooksToAdd.push({ + spanProcessors: internalExporters, + }); + } + + maybeSetOtelProviders(otelHooksToAdd); +}