diff --git a/genkit-tools/telemetry-server/src/index.ts b/genkit-tools/telemetry-server/src/index.ts index d934fb9968..26fba05238 100644 --- a/genkit-tools/telemetry-server/src/index.ts +++ b/genkit-tools/telemetry-server/src/index.ts @@ -22,6 +22,7 @@ import { logger } from '@genkit-ai/tools-common/utils'; import express from 'express'; import type * as http from 'http'; import type { TraceStore } from './types'; +import { traceDataFromOtlp } from './utils/otlp'; export { LocalFileTraceStore } from './file-trace-store.js'; export { TraceQuerySchema, type TraceQuery, type TraceStore } from './types'; @@ -90,6 +91,29 @@ export async function startTelemetryServer(params: { } }); + api.post('/api/otlp', async (request, response) => { + try { + if (!request.body.resourceSpans?.length) { + // Acknowledge and ignore empty payloads. + response.status(200).json({}); + return; + } + const traces = traceDataFromOtlp(request.body); + for (const trace of traces) { + const traceData = TraceDataSchema.parse(trace); + await params.traceStore.save(traceData.traceId, traceData); + } + response.status(200).json({}); + } catch (err) { + logger.error(`Error processing OTLP payload: ${err}`); + response.status(500).json({ + code: 13, // INTERNAL + message: + 'An internal error occurred while processing the OTLP payload.', + }); + } + }); + api.use((err: any, req: any, res: any, next: any) => { logger.error(err.stack); const error = err as Error; diff --git a/genkit-tools/telemetry-server/src/utils/otlp.ts b/genkit-tools/telemetry-server/src/utils/otlp.ts new file mode 100644 index 0000000000..af98cb68d1 --- /dev/null +++ b/genkit-tools/telemetry-server/src/utils/otlp.ts @@ -0,0 +1,162 @@ +/** + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { SpanData, TraceData } from '@genkit-ai/tools-common'; + +// These interfaces are based on the OTLP JSON format. +// A full definition can be found at: +// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto + +interface OtlpValue { + stringValue?: string; + intValue?: number; + boolValue?: boolean; + arrayValue?: { + values: OtlpValue[]; + }; +} + +interface OtlpAttribute { + key: string; + value: OtlpValue; +} + +interface OtlpSpan { + traceId: string; + spanId: string; + parentSpanId?: string; + name: string; + kind: number; + startTimeUnixNano: string; + endTimeUnixNano: string; + attributes: OtlpAttribute[]; + droppedAttributesCount: number; + events: any[]; + droppedEventsCount: number; + status?: { + code: number; + message?: string; + }; + links: any[]; + droppedLinksCount: number; +} + +interface OtlpScopeSpan { + scope: { + name: string; + version: string; + }; + spans: OtlpSpan[]; +} + +interface OtlpResourceSpan { + resource: { + attributes: OtlpAttribute[]; + droppedAttributesCount: number; + }; + scopeSpans: OtlpScopeSpan[]; +} + +interface OtlpPayload { + resourceSpans: OtlpResourceSpan[]; +} + +function toMillis(nano: string): number { + return Math.round(parseInt(nano) / 1_000_000); +} + +function toSpanData(span: OtlpSpan, scope: OtlpScopeSpan['scope']): SpanData { + const attributes: Record = {}; + span.attributes.forEach((attr) => { + if (attr.value.stringValue) { + attributes[attr.key] = attr.value.stringValue; + } else if (attr.value.intValue) { + attributes[attr.key] = attr.value.intValue; + } else if (attr.value.boolValue) { + attributes[attr.key] = attr.value.boolValue; + } + }); + + let spanKind: string; + switch (span.kind) { + case 1: + spanKind = 'INTERNAL'; + break; + case 2: + spanKind = 'SERVER'; + break; + case 3: + spanKind = 'CLIENT'; + break; + case 4: + spanKind = 'PRODUCER'; + break; + case 5: + spanKind = 'CONSUMER'; + break; + default: + spanKind = 'UNSPECIFIED'; + break; + } + + const spanData: SpanData = { + traceId: span.traceId, + spanId: span.spanId, + parentSpanId: span.parentSpanId, + startTime: toMillis(span.startTimeUnixNano), + endTime: toMillis(span.endTimeUnixNano), + displayName: span.name, + attributes, + instrumentationLibrary: { + name: scope.name, + version: scope.version, + }, + spanKind, + }; + if (span.status && span.status.code !== 0) { + const status: { code: number; message?: string } = { + code: span.status.code, + }; + if (span.status.message) { + status.message = span.status.message; + } + spanData.status = status; + } + return spanData; +} + +export function traceDataFromOtlp(otlpData: OtlpPayload): TraceData[] { + const traces: Record = {}; + + otlpData.resourceSpans.forEach((resourceSpan) => { + resourceSpan.scopeSpans.forEach((scopeSpan) => { + scopeSpan.spans.forEach((span) => { + if (!traces[span.traceId]) { + traces[span.traceId] = { + traceId: span.traceId, + spans: {}, + }; + } + traces[span.traceId].spans[span.spanId] = toSpanData( + span, + scopeSpan.scope + ); + }); + }); + }); + + return Object.values(traces); +} diff --git a/genkit-tools/telemetry-server/tests/file_store_test.ts b/genkit-tools/telemetry-server/tests/file_store_test.ts index 4a422538ca..d6e47f5997 100644 --- a/genkit-tools/telemetry-server/tests/file_store_test.ts +++ b/genkit-tools/telemetry-server/tests/file_store_test.ts @@ -16,16 +16,17 @@ import type { TraceData, TraceQueryFilter } from '@genkit-ai/tools-common'; import * as assert from 'assert'; +import fs from 'fs'; import getPort from 'get-port'; import { afterEach, beforeEach, describe, it } from 'node:test'; import os from 'os'; import path from 'path'; +import { Index } from '../src/file-trace-store'; import { LocalFileTraceStore, startTelemetryServer, stopTelemetryApi, } from '../src/index'; -import { Index } from '../src/localFileTraceStore'; import { sleep, span } from './utils'; const TRACE_ID = '1234'; @@ -37,10 +38,10 @@ const SPAN_B = 'bcd'; const SPAN_C = 'cde'; describe('local-file-store', () => { - let port; - let storeRoot; - let indexRoot; - let url; + let port: number; + let storeRoot: string; + let indexRoot: string; + let url: string; beforeEach(async () => { port = await getPort(); @@ -276,18 +277,22 @@ describe('local-file-store', () => { }); describe('index', () => { - let indexRoot; + let indexRoot: string; let index: Index; beforeEach(async () => { indexRoot = path.resolve( os.tmpdir(), - `./telemetry-server-api-test-${Date.now()}/traces_idx` + `./telemetry-server-api-test-${Date.now()}-${Math.floor(Math.random() * 1000)}/traces_idx` ); index = new Index(indexRoot); }); + afterEach(() => { + fs.rmSync(indexRoot, { recursive: true, force: true }); + }); + it('should index and search spans', () => { const spanA = span(TRACE_ID_1, SPAN_A, 100, 100); spanA.displayName = 'spanA'; diff --git a/genkit-tools/telemetry-server/tests/otlp_test.ts b/genkit-tools/telemetry-server/tests/otlp_test.ts new file mode 100644 index 0000000000..d1af719c4c --- /dev/null +++ b/genkit-tools/telemetry-server/tests/otlp_test.ts @@ -0,0 +1,216 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import { describe, it } from 'node:test'; +import { traceDataFromOtlp } from '../src/utils/otlp'; + +describe('otlp-traces', () => { + it('should transform OTLP payload to TraceData', () => { + const otlpPayload = { + resourceSpans: [ + { + resource: { + attributes: [], + droppedAttributesCount: 0, + }, + scopeSpans: [ + { + scope: { name: 'genkit-tracer', version: 'v1' }, + spans: [ + { + traceId: 'c5892692eb25cce482eb13587b73c425', + spanId: '86dc3d35cc11e336', + parentSpanId: 'd05557675cb95b72', + name: 'generateContentStream', + kind: 1, + startTimeUnixNano: '1760827335359000000', + endTimeUnixNano: '1760827336695073000', + attributes: [ + { + key: 'genkit:name', + value: { stringValue: 'generateContentStream' }, + }, + { + key: 'genkit:path', + value: { + stringValue: + '/{geminiStream_submitQuery}/{generateContentStream}', + }, + }, + { + key: 'genkit:input', + value: { + stringValue: '.....', + }, + }, + { + key: 'genkit:state', + value: { stringValue: 'success' }, + }, + ], + droppedAttributesCount: 0, + events: [], + droppedEventsCount: 0, + status: { code: 0 }, + links: [], + droppedLinksCount: 0, + }, + { + traceId: 'c5892692eb25cce482eb13587b73c425', + spanId: 'd05557675cb95b72', + name: 'geminiStream_submitQuery', + kind: 2, + startTimeUnixNano: '1760827334493000000', + endTimeUnixNano: '1760827336711390583', + attributes: [ + { + key: 'genkit:name', + value: { stringValue: 'geminiStream_submitQuery' }, + }, + { + key: 'genkit:isRoot', + value: { boolValue: true }, + }, + ], + droppedAttributesCount: 0, + events: [], + droppedEventsCount: 0, + status: { code: 0 }, + links: [], + droppedLinksCount: 0, + }, + ], + }, + ], + }, + ], + }; + + const expectedTraceData = [ + { + traceId: 'c5892692eb25cce482eb13587b73c425', + spans: { + '86dc3d35cc11e336': { + traceId: 'c5892692eb25cce482eb13587b73c425', + spanId: '86dc3d35cc11e336', + parentSpanId: 'd05557675cb95b72', + startTime: 1760827335359, + endTime: 1760827336695, + displayName: 'generateContentStream', + attributes: { + 'genkit:name': 'generateContentStream', + 'genkit:path': + '/{geminiStream_submitQuery}/{generateContentStream}', + 'genkit:input': '.....', + 'genkit:state': 'success', + }, + instrumentationLibrary: { + name: 'genkit-tracer', + version: 'v1', + }, + spanKind: 'INTERNAL', + }, + d05557675cb95b72: { + traceId: 'c5892692eb25cce482eb13587b73c425', + spanId: 'd05557675cb95b72', + parentSpanId: undefined, + startTime: 1760827334493, + endTime: 1760827336711, + displayName: 'geminiStream_submitQuery', + attributes: { + 'genkit:name': 'geminiStream_submitQuery', + 'genkit:isRoot': true, + }, + instrumentationLibrary: { + name: 'genkit-tracer', + version: 'v1', + }, + spanKind: 'SERVER', + }, + }, + }, + ]; + + const result = traceDataFromOtlp(otlpPayload as any); + assert.deepStrictEqual(result, expectedTraceData); + }); + + it('should transform OTLP payload with non-zero status to TraceData', () => { + const otlpPayload = { + resourceSpans: [ + { + resource: { + attributes: [], + droppedAttributesCount: 0, + }, + scopeSpans: [ + { + scope: { name: 'genkit-tracer', version: 'v1' }, + spans: [ + { + traceId: 'c5892692eb25cce482eb13587b73c425', + spanId: '86dc3d35cc11e336', + parentSpanId: 'd05557675cb95b72', + name: 'generateContentStream', + kind: 1, + startTimeUnixNano: '1760827335359000000', + endTimeUnixNano: '1760827336695073000', + attributes: [], + droppedAttributesCount: 0, + events: [], + droppedEventsCount: 0, + status: { code: 2, message: 'An error occurred' }, + links: [], + droppedLinksCount: 0, + }, + ], + }, + ], + }, + ], + }; + + const expectedTraceData = [ + { + traceId: 'c5892692eb25cce482eb13587b73c425', + spans: { + '86dc3d35cc11e336': { + traceId: 'c5892692eb25cce482eb13587b73c425', + spanId: '86dc3d35cc11e336', + parentSpanId: 'd05557675cb95b72', + startTime: 1760827335359, + endTime: 1760827336695, + displayName: 'generateContentStream', + attributes: {}, + instrumentationLibrary: { + name: 'genkit-tracer', + version: 'v1', + }, + spanKind: 'INTERNAL', + status: { + code: 2, + message: 'An error occurred', + }, + }, + }, + }, + ]; + + const result = traceDataFromOtlp(otlpPayload as any); + assert.deepStrictEqual(result, expectedTraceData); + }); +}); diff --git a/genkit-tools/telemetry-server/tests/utils.ts b/genkit-tools/telemetry-server/tests/utils.ts index 2410919db8..6d9fff878c 100644 --- a/genkit-tools/telemetry-server/tests/utils.ts +++ b/genkit-tools/telemetry-server/tests/utils.ts @@ -20,9 +20,11 @@ export function span( traceId: string, id: string, inputLength: number | undefined, - outputLength: number | undefined + outputLength: number | undefined, + startTime?: number, + endTime?: number ): SpanData { - const attributes = { + const attributes: Record = { 'genkit:type': 'flow', }; if (inputLength) { @@ -35,8 +37,8 @@ export function span( traceId: traceId, spanId: id, displayName: `Span ${id}`, - startTime: 1, - endTime: 2, + startTime: startTime || 1, + endTime: endTime || 2, instrumentationLibrary: { name: 'genkit' }, spanKind: 'INTERNAL', attributes, @@ -52,6 +54,6 @@ function generateString(length: number) { return str.substring(0, length); } -export async function sleep(ms) { +export async function sleep(ms: number) { await new Promise((resolve) => setTimeout(resolve, ms)); } diff --git a/js/core/src/tracing.ts b/js/core/src/tracing.ts index 8db4f79754..48886f08e0 100644 --- a/js/core/src/tracing.ts +++ b/js/core/src/tracing.ts @@ -22,6 +22,7 @@ export * from './tracing/exporter.js'; export * from './tracing/instrumentation.js'; export * from './tracing/types.js'; +const oTelInitializationKey = '__GENKIT_DISABLE_GENKIT_OTEL_INITIALIZATION'; const instrumentationKey = '__GENKIT_TELEMETRY_INSTRUMENTED'; const telemetryProviderKey = '__GENKIT_TELEMETRY_PROVIDER'; @@ -96,6 +97,9 @@ export function setTelemetryProvider(provider: TelemetryProvider) { export async function enableTelemetry( telemetryConfig: TelemetryConfig | Promise ) { + if (isOTelInitializationDisabled()) { + return; + } global[instrumentationKey] = telemetryConfig instanceof Promise ? telemetryConfig : Promise.resolve(); return getTelemetryProvider().enableTelemetry(telemetryConfig); @@ -109,3 +113,21 @@ export async function enableTelemetry( export async function flushTracing() { return getTelemetryProvider().flushTracing(); } + +function isOTelInitializationDisabled(): boolean { + return global[oTelInitializationKey] === true; +} + +/** + * Disables Genkit's OTel initialization. This is useful when you want to + * control the OTel initialization yourself. + * + * This function attempts to control Genkit's internal OTel instrumentation behaviour, + * since internal implementation details are subject to change at any time consider + * this function "unstable" and subject to breaking changes as well. + * + * @hidden + */ +export function disableGenkitOTelInitialization() { + global[oTelInitializationKey] = true; +} diff --git a/js/genkit/src/tracing.ts b/js/genkit/src/tracing.ts index 58503dacf8..a616383389 100644 --- a/js/genkit/src/tracing.ts +++ b/js/genkit/src/tracing.ts @@ -25,6 +25,7 @@ export { TraceMetadataSchema, TraceServerExporter, appendSpan, + disableGenkitOTelInitialization, disableOTelRootSpanDetection, enableTelemetry, flushTracing,