From b3cef72584b59ecab64382b721fd83aa48696baf Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Sat, 18 Oct 2025 19:23:33 -0400 Subject: [PATCH 1/6] feat(telemetry-server): implemented OTLP receiver (experimental) --- genkit-tools/telemetry-server/src/index.ts | 23 +++ .../telemetry-server/src/utils/otlp.ts | 138 ++++++++++++++++ .../telemetry-server/src/utils/trace.ts | 156 ++++++++++++++++++ .../telemetry-server/tests/file_store_test.ts | 10 +- .../telemetry-server/tests/otlp_test.ts | 151 +++++++++++++++++ 5 files changed, 473 insertions(+), 5 deletions(-) create mode 100644 genkit-tools/telemetry-server/src/utils/otlp.ts create mode 100644 genkit-tools/telemetry-server/src/utils/trace.ts create mode 100644 genkit-tools/telemetry-server/tests/otlp_test.ts diff --git a/genkit-tools/telemetry-server/src/index.ts b/genkit-tools/telemetry-server/src/index.ts index d934fb9968..39ce8222e6 100644 --- a/genkit-tools/telemetry-server/src/index.ts +++ b/genkit-tools/telemetry-server/src/index.ts @@ -22,6 +22,7 @@ import { logger } from '@genkit-ai/tools-common/utils'; import express from 'express'; import type * as http from 'http'; import type { TraceStore } from './types'; +import { tracesDataFromOtlp } from './utils/otlp'; export { LocalFileTraceStore } from './file-trace-store.js'; export { TraceQuerySchema, type TraceQuery, type TraceStore } from './types'; @@ -90,6 +91,28 @@ export async function startTelemetryServer(params: { } }); + api.post('/api/otlp', async (request, response) => { + try { + if (!request.body.resourceSpans?.length) { + // Acknowledge and ignore empty payloads. + response.status(200).json({}); + return; + } + const traces = tracesDataFromOtlp(request.body); + for (const trace of traces) { + const traceData = TraceDataSchema.parse(trace); + await params.traceStore.save(traceData.traceId, traceData); + } + response.status(200).json({}); + } catch (err) { + logger.error(`Error processing OTLP payload: ${err}`); + response.status(500).json({ + code: 13, // INTERNAL + message: 'An internal error occurred while processing the OTLP payload.', + }); + } + }); + api.use((err: any, req: any, res: any, next: any) => { logger.error(err.stack); const error = err as Error; diff --git a/genkit-tools/telemetry-server/src/utils/otlp.ts b/genkit-tools/telemetry-server/src/utils/otlp.ts new file mode 100644 index 0000000000..d8b6573b22 --- /dev/null +++ b/genkit-tools/telemetry-server/src/utils/otlp.ts @@ -0,0 +1,138 @@ +import { SpanData, TraceData } from './trace'; + +// These interfaces are based on the OTLP JSON format. +// A full definition can be found at: +// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto + +interface OtlpValue { + stringValue?: string; + intValue?: number; + boolValue?: boolean; + arrayValue?: { + values: OtlpValue[]; + }; +} + +interface OtlpAttribute { + key: string; + value: OtlpValue; +} + +interface OtlpSpan { + traceId: string; + spanId: string; + parentSpanId?: string; + name: string; + kind: number; + startTimeUnixNano: string; + endTimeUnixNano: string; + attributes: OtlpAttribute[]; + droppedAttributesCount: number; + events: any[]; + droppedEventsCount: number; + status: { + code: number; + }; + links: any[]; + droppedLinksCount: number; +} + +interface OtlpScopeSpan { + scope: { + name: string; + version: string; + }; + spans: OtlpSpan[]; +} + +interface OtlpResourceSpan { + resource: { + attributes: OtlpAttribute[]; + droppedAttributesCount: number; + }; + scopeSpans: OtlpScopeSpan[]; +} + +interface OtlpPayload { + resourceSpans: OtlpResourceSpan[]; +} + +function toMillis(nano: string): number { + return Math.round(parseInt(nano) / 1_000_000); +} + +function toSpanData( + span: OtlpSpan, + scope: OtlpScopeSpan['scope'] +): SpanData { + const attributes: Record = {}; + span.attributes.forEach((attr) => { + if (attr.value.stringValue) { + attributes[attr.key] = attr.value.stringValue; + } else if (attr.value.intValue) { + attributes[attr.key] = attr.value.intValue; + } else if (attr.value.boolValue) { + attributes[attr.key] = attr.value.boolValue; + } + }); + + let spanKind: string; + switch (span.kind) { + case 1: + spanKind = 'INTERNAL'; + break; + case 2: + spanKind = 'SERVER'; + break; + case 3: + spanKind = 'CLIENT'; + break; + case 4: + spanKind = 'PRODUCER'; + break; + case 5: + spanKind = 'CONSUMER'; + break; + default: + spanKind = 'UNSPECIFIED'; + break; + } + + return { + traceId: span.traceId, + spanId: span.spanId, + parentSpanId: span.parentSpanId, + startTime: toMillis(span.startTimeUnixNano), + endTime: toMillis(span.endTimeUnixNano), + displayName: span.name, + attributes, + instrumentationLibrary: { + name: scope.name, + version: scope.version, + }, + spanKind, + }; +} + +export function tracesDataFromOtlp(otlpData: OtlpPayload): TraceData[] { + const traces: Record = {}; + + otlpData.resourceSpans.forEach((resourceSpan) => { + resourceSpan.scopeSpans.forEach((scopeSpan) => { + scopeSpan.spans.forEach((span) => { + if (!traces[span.traceId]) { + traces[span.traceId] = { + traceId: span.traceId, + spans: {}, + }; + } + traces[span.traceId].spans[span.spanId] = toSpanData( + span, + scopeSpan.scope + ); + }); + }); + }); + + return Object.values(traces); +} diff --git a/genkit-tools/telemetry-server/src/utils/trace.ts b/genkit-tools/telemetry-server/src/utils/trace.ts new file mode 100644 index 0000000000..61e3537f9c --- /dev/null +++ b/genkit-tools/telemetry-server/src/utils/trace.ts @@ -0,0 +1,156 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as z from 'zod'; + +// NOTE: Keep this file in sync with js/core/src/tracing/types.ts! +// Eventually tools will be source of truth for these types (by generating a +// JSON schema) but until then this file must be manually kept in sync + +/** + * Zod schema for Path metadata. + */ +export const PathMetadataSchema = z.object({ + path: z.string(), + status: z.string(), + error: z.string().optional(), + latency: z.number(), +}); +export type PathMetadata = z.infer; + +/** + * Zod schema for Trace metadata. + */ +export const TraceMetadataSchema = z.object({ + featureName: z.string().optional(), + paths: z.set(PathMetadataSchema).optional(), + timestamp: z.number(), +}); +export type TraceMetadata = z.infer; + +/** + * Zod schema for span metadata. + */ +export const SpanMetadataSchema = z.object({ + name: z.string(), + state: z.enum(['success', 'error']).optional(), + input: z.any().optional(), + output: z.any().optional(), + isRoot: z.boolean().optional(), + metadata: z.record(z.string(), z.string()).optional(), + path: z.string().optional(), +}); +export type SpanMetadata = z.infer; + +/** + * Zod schema for span status. + */ +export const SpanStatusSchema = z.object({ + code: z.number(), + message: z.string().optional(), +}); + +/** + * Zod schema for time event. + */ +export const TimeEventSchema = z.object({ + time: z.number(), + annotation: z.object({ + attributes: z.record(z.string(), z.unknown()), + description: z.string(), + }), +}); + +/** + * Zod schema for span context. + */ +export const SpanContextSchema = z.object({ + traceId: z.string(), + spanId: z.string(), + isRemote: z.boolean().optional(), + traceFlags: z.number(), +}); + +/** + * Zod schema for Link. + */ +export const LinkSchema = z.object({ + context: SpanContextSchema.optional(), + attributes: z.record(z.string(), z.unknown()).optional(), + droppedAttributesCount: z.number().optional(), +}); + +/** + * Zod schema for instrumentation library. + */ +export const InstrumentationLibrarySchema = z.object({ + name: z.string().readonly(), + version: z.string().optional().readonly(), + schemaUrl: z.string().optional().readonly(), +}); + +/** + * Zod schema for span data. + */ +export const SpanDataSchema = z + .object({ + spanId: z.string(), + traceId: z.string(), + parentSpanId: z.string().optional(), + startTime: z.number(), + endTime: z.number(), + attributes: z.record(z.string(), z.unknown()), + displayName: z.string(), + links: z.array(LinkSchema).optional(), + instrumentationLibrary: InstrumentationLibrarySchema, + spanKind: z.string(), + sameProcessAsParentSpan: z.object({ value: z.boolean() }).optional(), + status: SpanStatusSchema.optional(), + timeEvents: z + .object({ + timeEvent: z.array(TimeEventSchema).optional(), + }) + .optional(), + truncated: z.boolean().optional(), + }); +export type SpanData = z.infer; + +/** + * Zod schema for trace metadata. + */ +export const TraceDataSchema = z + .object({ + traceId: z.string(), + displayName: z.string().optional(), + startTime: z + .number() + .optional() + .describe('trace start time in milliseconds since the epoch'), + endTime: z + .number() + .optional() + .describe('end time in milliseconds since the epoch'), + spans: z.record(z.string(), SpanDataSchema), + }); +export type TraceData = z.infer; + +export const NestedSpanDataSchema = SpanDataSchema.extend({ + spans: z.lazy(() => z.array(SpanDataSchema)), +}); + +export type NestedSpanData = z.infer & { + spans?: SpanData[]; +}; diff --git a/genkit-tools/telemetry-server/tests/file_store_test.ts b/genkit-tools/telemetry-server/tests/file_store_test.ts index 4a422538ca..6cce34196b 100644 --- a/genkit-tools/telemetry-server/tests/file_store_test.ts +++ b/genkit-tools/telemetry-server/tests/file_store_test.ts @@ -25,7 +25,7 @@ import { startTelemetryServer, stopTelemetryApi, } from '../src/index'; -import { Index } from '../src/localFileTraceStore'; +import { Index } from '../src/file-trace-store'; import { sleep, span } from './utils'; const TRACE_ID = '1234'; @@ -37,10 +37,10 @@ const SPAN_B = 'bcd'; const SPAN_C = 'cde'; describe('local-file-store', () => { - let port; - let storeRoot; - let indexRoot; - let url; + let port: number; + let storeRoot: string; + let indexRoot: string; + let url: string; beforeEach(async () => { port = await getPort(); diff --git a/genkit-tools/telemetry-server/tests/otlp_test.ts b/genkit-tools/telemetry-server/tests/otlp_test.ts new file mode 100644 index 0000000000..e9525c4ad4 --- /dev/null +++ b/genkit-tools/telemetry-server/tests/otlp_test.ts @@ -0,0 +1,151 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import { describe, it } from 'node:test'; +import { tracesDataFromOtlp } from '../src/utils/otlp'; + +describe('otlp-traces', () => { + it('should transform OTLP payload to TraceData', () => { + const otlpPayload = { + resourceSpans: [ + { + resource: { + attributes: [], + droppedAttributesCount: 0, + }, + scopeSpans: [ + { + scope: { name: 'genkit-tracer', version: 'v1' }, + spans: [ + { + traceId: 'c5892692eb25cce482eb13587b73c425', + spanId: '86dc3d35cc11e336', + parentSpanId: 'd05557675cb95b72', + name: 'generateContentStream', + kind: 1, + startTimeUnixNano: '1760827335359000000', + endTimeUnixNano: '1760827336695073000', + attributes: [ + { + key: 'genkit:name', + value: { stringValue: 'generateContentStream' }, + }, + { + key: 'genkit:path', + value: { + stringValue: + '/{geminiStream_submitQuery}/{generateContentStream}', + }, + }, + { + key: 'genkit:input', + value: { + stringValue: '.....', + }, + }, + { + key: 'genkit:state', + value: { stringValue: 'success' }, + }, + ], + droppedAttributesCount: 0, + events: [], + droppedEventsCount: 0, + status: { code: 0 }, + links: [], + droppedLinksCount: 0, + }, + { + traceId: 'c5892692eb25cce482eb13587b73c425', + spanId: 'd05557675cb95b72', + name: 'geminiStream_submitQuery', + kind: 2, + startTimeUnixNano: '1760827334493000000', + endTimeUnixNano: '1760827336711390583', + attributes: [ + { + key: 'genkit:name', + value: { stringValue: 'geminiStream_submitQuery' }, + }, + { + key: 'genkit:isRoot', + value: { boolValue: true }, + }, + ], + droppedAttributesCount: 0, + events: [], + droppedEventsCount: 0, + status: { code: 0 }, + links: [], + droppedLinksCount: 0, + }, + ], + }, + ], + }, + ], + }; + + const expectedTraceData = [ + { + traceId: 'c5892692eb25cce482eb13587b73c425', + spans: { + '86dc3d35cc11e336': { + traceId: 'c5892692eb25cce482eb13587b73c425', + spanId: '86dc3d35cc11e336', + parentSpanId: 'd05557675cb95b72', + startTime: 1760827335359, + endTime: 1760827336695, + displayName: 'generateContentStream', + attributes: { + 'genkit:name': 'generateContentStream', + 'genkit:path': + '/{geminiStream_submitQuery}/{generateContentStream}', + 'genkit:input': '.....', + 'genkit:state': 'success', + }, + instrumentationLibrary: { + name: 'genkit-tracer', + version: 'v1', + }, + spanKind: 'INTERNAL', + }, + d05557675cb95b72: { + traceId: 'c5892692eb25cce482eb13587b73c425', + spanId: 'd05557675cb95b72', + parentSpanId: undefined, + startTime: 1760827334493, + endTime: 1760827336711, + displayName: 'geminiStream_submitQuery', + attributes: { + 'genkit:name': 'geminiStream_submitQuery', + 'genkit:isRoot': true, + }, + instrumentationLibrary: { + name: 'genkit-tracer', + version: 'v1', + }, + spanKind: 'SERVER', + }, + }, + }, + ]; + + const result = tracesDataFromOtlp(otlpPayload as any); + assert.deepStrictEqual(result, expectedTraceData); + }); +}); From 79e1b6302236e71adb7bd55ea566e7962d312473 Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Sat, 18 Oct 2025 19:26:34 -0400 Subject: [PATCH 2/6] fmt --- genkit-tools/telemetry-server/src/index.ts | 3 +- .../telemetry-server/src/utils/otlp.ts | 23 ++- .../telemetry-server/src/utils/trace.ts | 156 ------------------ .../telemetry-server/tests/file_store_test.ts | 2 +- 4 files changed, 21 insertions(+), 163 deletions(-) delete mode 100644 genkit-tools/telemetry-server/src/utils/trace.ts diff --git a/genkit-tools/telemetry-server/src/index.ts b/genkit-tools/telemetry-server/src/index.ts index 39ce8222e6..d04b108343 100644 --- a/genkit-tools/telemetry-server/src/index.ts +++ b/genkit-tools/telemetry-server/src/index.ts @@ -108,7 +108,8 @@ export async function startTelemetryServer(params: { logger.error(`Error processing OTLP payload: ${err}`); response.status(500).json({ code: 13, // INTERNAL - message: 'An internal error occurred while processing the OTLP payload.', + message: + 'An internal error occurred while processing the OTLP payload.', }); } }); diff --git a/genkit-tools/telemetry-server/src/utils/otlp.ts b/genkit-tools/telemetry-server/src/utils/otlp.ts index d8b6573b22..9a3c74d02b 100644 --- a/genkit-tools/telemetry-server/src/utils/otlp.ts +++ b/genkit-tools/telemetry-server/src/utils/otlp.ts @@ -1,4 +1,20 @@ -import { SpanData, TraceData } from './trace'; +/** + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { SpanData, TraceData } from '@genkit-ai/tools-common'; // These interfaces are based on the OTLP JSON format. // A full definition can be found at: @@ -61,10 +77,7 @@ function toMillis(nano: string): number { return Math.round(parseInt(nano) / 1_000_000); } -function toSpanData( - span: OtlpSpan, - scope: OtlpScopeSpan['scope'] -): SpanData { +function toSpanData(span: OtlpSpan, scope: OtlpScopeSpan['scope']): SpanData { const attributes: Record = {}; span.attributes.forEach((attr) => { if (attr.value.stringValue) { diff --git a/genkit-tools/telemetry-server/src/utils/trace.ts b/genkit-tools/telemetry-server/src/utils/trace.ts deleted file mode 100644 index 61e3537f9c..0000000000 --- a/genkit-tools/telemetry-server/src/utils/trace.ts +++ /dev/null @@ -1,156 +0,0 @@ -/** - * Copyright 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import * as z from 'zod'; - -// NOTE: Keep this file in sync with js/core/src/tracing/types.ts! -// Eventually tools will be source of truth for these types (by generating a -// JSON schema) but until then this file must be manually kept in sync - -/** - * Zod schema for Path metadata. - */ -export const PathMetadataSchema = z.object({ - path: z.string(), - status: z.string(), - error: z.string().optional(), - latency: z.number(), -}); -export type PathMetadata = z.infer; - -/** - * Zod schema for Trace metadata. - */ -export const TraceMetadataSchema = z.object({ - featureName: z.string().optional(), - paths: z.set(PathMetadataSchema).optional(), - timestamp: z.number(), -}); -export type TraceMetadata = z.infer; - -/** - * Zod schema for span metadata. - */ -export const SpanMetadataSchema = z.object({ - name: z.string(), - state: z.enum(['success', 'error']).optional(), - input: z.any().optional(), - output: z.any().optional(), - isRoot: z.boolean().optional(), - metadata: z.record(z.string(), z.string()).optional(), - path: z.string().optional(), -}); -export type SpanMetadata = z.infer; - -/** - * Zod schema for span status. - */ -export const SpanStatusSchema = z.object({ - code: z.number(), - message: z.string().optional(), -}); - -/** - * Zod schema for time event. - */ -export const TimeEventSchema = z.object({ - time: z.number(), - annotation: z.object({ - attributes: z.record(z.string(), z.unknown()), - description: z.string(), - }), -}); - -/** - * Zod schema for span context. - */ -export const SpanContextSchema = z.object({ - traceId: z.string(), - spanId: z.string(), - isRemote: z.boolean().optional(), - traceFlags: z.number(), -}); - -/** - * Zod schema for Link. - */ -export const LinkSchema = z.object({ - context: SpanContextSchema.optional(), - attributes: z.record(z.string(), z.unknown()).optional(), - droppedAttributesCount: z.number().optional(), -}); - -/** - * Zod schema for instrumentation library. - */ -export const InstrumentationLibrarySchema = z.object({ - name: z.string().readonly(), - version: z.string().optional().readonly(), - schemaUrl: z.string().optional().readonly(), -}); - -/** - * Zod schema for span data. - */ -export const SpanDataSchema = z - .object({ - spanId: z.string(), - traceId: z.string(), - parentSpanId: z.string().optional(), - startTime: z.number(), - endTime: z.number(), - attributes: z.record(z.string(), z.unknown()), - displayName: z.string(), - links: z.array(LinkSchema).optional(), - instrumentationLibrary: InstrumentationLibrarySchema, - spanKind: z.string(), - sameProcessAsParentSpan: z.object({ value: z.boolean() }).optional(), - status: SpanStatusSchema.optional(), - timeEvents: z - .object({ - timeEvent: z.array(TimeEventSchema).optional(), - }) - .optional(), - truncated: z.boolean().optional(), - }); -export type SpanData = z.infer; - -/** - * Zod schema for trace metadata. - */ -export const TraceDataSchema = z - .object({ - traceId: z.string(), - displayName: z.string().optional(), - startTime: z - .number() - .optional() - .describe('trace start time in milliseconds since the epoch'), - endTime: z - .number() - .optional() - .describe('end time in milliseconds since the epoch'), - spans: z.record(z.string(), SpanDataSchema), - }); -export type TraceData = z.infer; - -export const NestedSpanDataSchema = SpanDataSchema.extend({ - spans: z.lazy(() => z.array(SpanDataSchema)), -}); - -export type NestedSpanData = z.infer & { - spans?: SpanData[]; -}; diff --git a/genkit-tools/telemetry-server/tests/file_store_test.ts b/genkit-tools/telemetry-server/tests/file_store_test.ts index 6cce34196b..48a26bafe0 100644 --- a/genkit-tools/telemetry-server/tests/file_store_test.ts +++ b/genkit-tools/telemetry-server/tests/file_store_test.ts @@ -20,12 +20,12 @@ import getPort from 'get-port'; import { afterEach, beforeEach, describe, it } from 'node:test'; import os from 'os'; import path from 'path'; +import { Index } from '../src/file-trace-store'; import { LocalFileTraceStore, startTelemetryServer, stopTelemetryApi, } from '../src/index'; -import { Index } from '../src/file-trace-store'; import { sleep, span } from './utils'; const TRACE_ID = '1234'; From 38fb830a5a20c9d2d7b6ea86dd08bfd54a214297 Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Sat, 18 Oct 2025 19:35:12 -0400 Subject: [PATCH 3/6] test fixes --- .../telemetry-server/tests/file_store_test.ts | 9 +++++++-- genkit-tools/telemetry-server/tests/utils.ts | 12 +++++++----- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/genkit-tools/telemetry-server/tests/file_store_test.ts b/genkit-tools/telemetry-server/tests/file_store_test.ts index 48a26bafe0..d6e47f5997 100644 --- a/genkit-tools/telemetry-server/tests/file_store_test.ts +++ b/genkit-tools/telemetry-server/tests/file_store_test.ts @@ -16,6 +16,7 @@ import type { TraceData, TraceQueryFilter } from '@genkit-ai/tools-common'; import * as assert from 'assert'; +import fs from 'fs'; import getPort from 'get-port'; import { afterEach, beforeEach, describe, it } from 'node:test'; import os from 'os'; @@ -276,18 +277,22 @@ describe('local-file-store', () => { }); describe('index', () => { - let indexRoot; + let indexRoot: string; let index: Index; beforeEach(async () => { indexRoot = path.resolve( os.tmpdir(), - `./telemetry-server-api-test-${Date.now()}/traces_idx` + `./telemetry-server-api-test-${Date.now()}-${Math.floor(Math.random() * 1000)}/traces_idx` ); index = new Index(indexRoot); }); + afterEach(() => { + fs.rmSync(indexRoot, { recursive: true, force: true }); + }); + it('should index and search spans', () => { const spanA = span(TRACE_ID_1, SPAN_A, 100, 100); spanA.displayName = 'spanA'; diff --git a/genkit-tools/telemetry-server/tests/utils.ts b/genkit-tools/telemetry-server/tests/utils.ts index 2410919db8..6d9fff878c 100644 --- a/genkit-tools/telemetry-server/tests/utils.ts +++ b/genkit-tools/telemetry-server/tests/utils.ts @@ -20,9 +20,11 @@ export function span( traceId: string, id: string, inputLength: number | undefined, - outputLength: number | undefined + outputLength: number | undefined, + startTime?: number, + endTime?: number ): SpanData { - const attributes = { + const attributes: Record = { 'genkit:type': 'flow', }; if (inputLength) { @@ -35,8 +37,8 @@ export function span( traceId: traceId, spanId: id, displayName: `Span ${id}`, - startTime: 1, - endTime: 2, + startTime: startTime || 1, + endTime: endTime || 2, instrumentationLibrary: { name: 'genkit' }, spanKind: 'INTERNAL', attributes, @@ -52,6 +54,6 @@ function generateString(length: number) { return str.substring(0, length); } -export async function sleep(ms) { +export async function sleep(ms: number) { await new Promise((resolve) => setTimeout(resolve, ms)); } From afe1822c218ad3945b34cfc303287a74541c5e06 Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Thu, 23 Oct 2025 09:54:36 -0400 Subject: [PATCH 4/6] added disableGenkitOTelInitialization --- .../telemetry-server/src/utils/otlp.ts | 15 ++++- .../telemetry-server/tests/otlp_test.ts | 65 +++++++++++++++++++ js/core/src/tracing.ts | 22 +++++++ js/genkit/src/tracing.ts | 1 + 4 files changed, 101 insertions(+), 2 deletions(-) diff --git a/genkit-tools/telemetry-server/src/utils/otlp.ts b/genkit-tools/telemetry-server/src/utils/otlp.ts index 9a3c74d02b..79b445fb03 100644 --- a/genkit-tools/telemetry-server/src/utils/otlp.ts +++ b/genkit-tools/telemetry-server/src/utils/otlp.ts @@ -46,8 +46,9 @@ interface OtlpSpan { droppedAttributesCount: number; events: any[]; droppedEventsCount: number; - status: { + status?: { code: number; + message?: string; }; links: any[]; droppedLinksCount: number; @@ -111,7 +112,7 @@ function toSpanData(span: OtlpSpan, scope: OtlpScopeSpan['scope']): SpanData { break; } - return { + const spanData: SpanData = { traceId: span.traceId, spanId: span.spanId, parentSpanId: span.parentSpanId, @@ -125,6 +126,16 @@ function toSpanData(span: OtlpSpan, scope: OtlpScopeSpan['scope']): SpanData { }, spanKind, }; + if (span.status && span.status.code !== 0) { + const status: { code: number; message?: string } = { + code: span.status.code, + }; + if (span.status.message) { + status.message = span.status.message; + } + spanData.status = status; + } + return spanData; } export function tracesDataFromOtlp(otlpData: OtlpPayload): TraceData[] { diff --git a/genkit-tools/telemetry-server/tests/otlp_test.ts b/genkit-tools/telemetry-server/tests/otlp_test.ts index e9525c4ad4..565ea721a4 100644 --- a/genkit-tools/telemetry-server/tests/otlp_test.ts +++ b/genkit-tools/telemetry-server/tests/otlp_test.ts @@ -148,4 +148,69 @@ describe('otlp-traces', () => { const result = tracesDataFromOtlp(otlpPayload as any); assert.deepStrictEqual(result, expectedTraceData); }); + + it('should transform OTLP payload with non-zero status to TraceData', () => { + const otlpPayload = { + resourceSpans: [ + { + resource: { + attributes: [], + droppedAttributesCount: 0, + }, + scopeSpans: [ + { + scope: { name: 'genkit-tracer', version: 'v1' }, + spans: [ + { + traceId: 'c5892692eb25cce482eb13587b73c425', + spanId: '86dc3d35cc11e336', + parentSpanId: 'd05557675cb95b72', + name: 'generateContentStream', + kind: 1, + startTimeUnixNano: '1760827335359000000', + endTimeUnixNano: '1760827336695073000', + attributes: [], + droppedAttributesCount: 0, + events: [], + droppedEventsCount: 0, + status: { code: 2, message: 'An error occurred' }, + links: [], + droppedLinksCount: 0, + }, + ], + }, + ], + }, + ], + }; + + const expectedTraceData = [ + { + traceId: 'c5892692eb25cce482eb13587b73c425', + spans: { + '86dc3d35cc11e336': { + traceId: 'c5892692eb25cce482eb13587b73c425', + spanId: '86dc3d35cc11e336', + parentSpanId: 'd05557675cb95b72', + startTime: 1760827335359, + endTime: 1760827336695, + displayName: 'generateContentStream', + attributes: {}, + instrumentationLibrary: { + name: 'genkit-tracer', + version: 'v1', + }, + spanKind: 'INTERNAL', + status: { + code: 2, + message: 'An error occurred', + }, + }, + }, + }, + ]; + + const result = tracesDataFromOtlp(otlpPayload as any); + assert.deepStrictEqual(result, expectedTraceData); + }); }); diff --git a/js/core/src/tracing.ts b/js/core/src/tracing.ts index 8db4f79754..48886f08e0 100644 --- a/js/core/src/tracing.ts +++ b/js/core/src/tracing.ts @@ -22,6 +22,7 @@ export * from './tracing/exporter.js'; export * from './tracing/instrumentation.js'; export * from './tracing/types.js'; +const oTelInitializationKey = '__GENKIT_DISABLE_GENKIT_OTEL_INITIALIZATION'; const instrumentationKey = '__GENKIT_TELEMETRY_INSTRUMENTED'; const telemetryProviderKey = '__GENKIT_TELEMETRY_PROVIDER'; @@ -96,6 +97,9 @@ export function setTelemetryProvider(provider: TelemetryProvider) { export async function enableTelemetry( telemetryConfig: TelemetryConfig | Promise ) { + if (isOTelInitializationDisabled()) { + return; + } global[instrumentationKey] = telemetryConfig instanceof Promise ? telemetryConfig : Promise.resolve(); return getTelemetryProvider().enableTelemetry(telemetryConfig); @@ -109,3 +113,21 @@ export async function enableTelemetry( export async function flushTracing() { return getTelemetryProvider().flushTracing(); } + +function isOTelInitializationDisabled(): boolean { + return global[oTelInitializationKey] === true; +} + +/** + * Disables Genkit's OTel initialization. This is useful when you want to + * control the OTel initialization yourself. + * + * This function attempts to control Genkit's internal OTel instrumentation behaviour, + * since internal implementation details are subject to change at any time consider + * this function "unstable" and subject to breaking changes as well. + * + * @hidden + */ +export function disableGenkitOTelInitialization() { + global[oTelInitializationKey] = true; +} diff --git a/js/genkit/src/tracing.ts b/js/genkit/src/tracing.ts index 58503dacf8..2539f7fe07 100644 --- a/js/genkit/src/tracing.ts +++ b/js/genkit/src/tracing.ts @@ -26,6 +26,7 @@ export { TraceServerExporter, appendSpan, disableOTelRootSpanDetection, + disableGenkitOTelInitialization, enableTelemetry, flushTracing, runInNewSpan, From 6ea9c8c82e02da4616f469347459b86f880a1f34 Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Thu, 23 Oct 2025 09:54:49 -0400 Subject: [PATCH 5/6] fmt --- js/genkit/src/tracing.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js/genkit/src/tracing.ts b/js/genkit/src/tracing.ts index 2539f7fe07..a616383389 100644 --- a/js/genkit/src/tracing.ts +++ b/js/genkit/src/tracing.ts @@ -25,8 +25,8 @@ export { TraceMetadataSchema, TraceServerExporter, appendSpan, - disableOTelRootSpanDetection, disableGenkitOTelInitialization, + disableOTelRootSpanDetection, enableTelemetry, flushTracing, runInNewSpan, From 48433f4db82672927861ecfbf734bda10039aa83 Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Thu, 23 Oct 2025 09:57:09 -0400 Subject: [PATCH 6/6] rename traceDataFromOtlp --- genkit-tools/telemetry-server/src/index.ts | 4 ++-- genkit-tools/telemetry-server/src/utils/otlp.ts | 2 +- genkit-tools/telemetry-server/tests/otlp_test.ts | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/genkit-tools/telemetry-server/src/index.ts b/genkit-tools/telemetry-server/src/index.ts index d04b108343..26fba05238 100644 --- a/genkit-tools/telemetry-server/src/index.ts +++ b/genkit-tools/telemetry-server/src/index.ts @@ -22,7 +22,7 @@ import { logger } from '@genkit-ai/tools-common/utils'; import express from 'express'; import type * as http from 'http'; import type { TraceStore } from './types'; -import { tracesDataFromOtlp } from './utils/otlp'; +import { traceDataFromOtlp } from './utils/otlp'; export { LocalFileTraceStore } from './file-trace-store.js'; export { TraceQuerySchema, type TraceQuery, type TraceStore } from './types'; @@ -98,7 +98,7 @@ export async function startTelemetryServer(params: { response.status(200).json({}); return; } - const traces = tracesDataFromOtlp(request.body); + const traces = traceDataFromOtlp(request.body); for (const trace of traces) { const traceData = TraceDataSchema.parse(trace); await params.traceStore.save(traceData.traceId, traceData); diff --git a/genkit-tools/telemetry-server/src/utils/otlp.ts b/genkit-tools/telemetry-server/src/utils/otlp.ts index 79b445fb03..af98cb68d1 100644 --- a/genkit-tools/telemetry-server/src/utils/otlp.ts +++ b/genkit-tools/telemetry-server/src/utils/otlp.ts @@ -138,7 +138,7 @@ function toSpanData(span: OtlpSpan, scope: OtlpScopeSpan['scope']): SpanData { return spanData; } -export function tracesDataFromOtlp(otlpData: OtlpPayload): TraceData[] { +export function traceDataFromOtlp(otlpData: OtlpPayload): TraceData[] { const traces: Record = {}; otlpData.resourceSpans.forEach((resourceSpan) => { diff --git a/genkit-tools/telemetry-server/tests/otlp_test.ts b/genkit-tools/telemetry-server/tests/otlp_test.ts index 565ea721a4..d1af719c4c 100644 --- a/genkit-tools/telemetry-server/tests/otlp_test.ts +++ b/genkit-tools/telemetry-server/tests/otlp_test.ts @@ -16,7 +16,7 @@ import * as assert from 'assert'; import { describe, it } from 'node:test'; -import { tracesDataFromOtlp } from '../src/utils/otlp'; +import { traceDataFromOtlp } from '../src/utils/otlp'; describe('otlp-traces', () => { it('should transform OTLP payload to TraceData', () => { @@ -145,7 +145,7 @@ describe('otlp-traces', () => { }, ]; - const result = tracesDataFromOtlp(otlpPayload as any); + const result = traceDataFromOtlp(otlpPayload as any); assert.deepStrictEqual(result, expectedTraceData); }); @@ -210,7 +210,7 @@ describe('otlp-traces', () => { }, ]; - const result = tracesDataFromOtlp(otlpPayload as any); + const result = traceDataFromOtlp(otlpPayload as any); assert.deepStrictEqual(result, expectedTraceData); }); });