Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions js/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
"author": "genkit",
"license": "Apache-2.0",
"dependencies": {
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/context-async-hooks": "~1.25.0",
"@opentelemetry/core": "~1.25.0",
"@opentelemetry/sdk-metrics": "~1.25.0",
"@opentelemetry/sdk-node": "^0.52.0",
"@opentelemetry/sdk-trace-base": "~1.25.0",
"@opentelemetry/api": "^1.9.1",
"@opentelemetry/context-async-hooks": "^2.6.1",
"@opentelemetry/core": "^2.6.1",
"@opentelemetry/sdk-metrics": "^2.6.1",
"@opentelemetry/sdk-node": "^0.214.0",
"@opentelemetry/sdk-trace-base": "^2.6.1",
"@types/json-schema": "^7.0.15",
"ajv": "^8.12.0",
"ajv-formats": "^3.0.1",
Expand Down
7 changes: 4 additions & 3 deletions js/core/src/tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ export async function enableTelemetry(
if (isOTelInitializationDisabled()) {
return;
}
global[instrumentationKey] =
telemetryConfig instanceof Promise ? telemetryConfig : Promise.resolve();
return getTelemetryProvider().enableTelemetry(telemetryConfig);
const instrumentationPromise =
getTelemetryProvider().enableTelemetry(telemetryConfig);
global[instrumentationKey] = instrumentationPromise;
return instrumentationPromise;
}

/**
Expand Down
14 changes: 7 additions & 7 deletions js/core/src/tracing/exporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export class TraceServerExporter implements SpanExporter {
displayName: span.name,
links: span.links,
spanKind: SpanKind[span.kind],
parentSpanId: span.parentSpanId,
parentSpanId: span.parentSpanContext?.spanId,
sameProcessAsParentSpan: { value: !span.spanContext().isRemote },
status: span.status,
timeEvents: {
Expand All @@ -86,17 +86,17 @@ export class TraceServerExporter implements SpanExporter {
})),
},
};
if (span.instrumentationLibrary !== undefined) {
if (span.instrumentationScope !== undefined) {
spanData.instrumentationLibrary = {
name: span.instrumentationLibrary.name,
name: span.instrumentationScope.name,
};
if (span.instrumentationLibrary.schemaUrl !== undefined) {
if (span.instrumentationScope.schemaUrl !== undefined) {
spanData.instrumentationLibrary.schemaUrl =
span.instrumentationLibrary.schemaUrl;
span.instrumentationScope.schemaUrl;
}
if (span.instrumentationLibrary.version !== undefined) {
if (span.instrumentationScope.version !== undefined) {
spanData.instrumentationLibrary.version =
span.instrumentationLibrary.version;
span.instrumentationScope.version;
}
}
deleteUndefinedProps(spanData);
Expand Down
73 changes: 65 additions & 8 deletions js/core/src/tracing/node-telemetry-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
* limitations under the License.
*/

import { Context } from '@opentelemetry/api';
import { NodeSDK } from '@opentelemetry/sdk-node';
import {
BatchSpanProcessor,
SimpleSpanProcessor,
type ReadableSpan,
type SpanProcessor,
} from '@opentelemetry/sdk-trace-base';
import { logger } from '../logging.js';
Expand All @@ -29,6 +31,7 @@ import { RealtimeSpanProcessor } from './realtime-span-processor.js';

let telemetrySDK: NodeSDK | null = null;
let nodeOtelConfig: TelemetryConfig | null = null;
let isSigtermHandlerRegistered = false;

export function initNodeTelemetryProvider() {
setTelemetryProvider({
Expand All @@ -37,6 +40,35 @@ export function initNodeTelemetryProvider() {
});
}

/**
* MultiSpanProcessor is a multiplexer that allows Genkit to register multiple
* span processors reliably.
*
* It is used instead of providing a `spanProcessors` array to the OpenTelemetry NodeSDK
* because the SDK's internal logic for merging `traceExporter`, `spanProcessor` (singular),
* and `spanProcessors` (plural) varies across versions and can lead to exporters being
* silently overwritten or ignored.
*
* By wrapping all processors into this single delegate, we guarantee that Genkit's
* internal telemetry and any user-provided exporters both receive every start and
* end event, providing a robust compatibility layer during the OTel 2.0 upgrade.
*/
class MultiSpanProcessor implements SpanProcessor {
constructor(private processors: SpanProcessor[]) {}
onStart(span: any, parentContext: Context) {
this.processors.forEach((p) => p.onStart?.(span, parentContext));
}
onEnd(span: ReadableSpan) {
this.processors.forEach((p) => p.onEnd(span));
}
async forceFlush() {
await Promise.all(this.processors.map((p) => p.forceFlush()));
}
async shutdown() {
await Promise.all(this.processors.map((p) => p.shutdown()));
}
}

/**
* Enables tracing and metrics open telemetry configuration.
*/
Expand All @@ -52,23 +84,48 @@ async function enableTelemetry(
? await telemetryConfig
: telemetryConfig;

// If already initialized and new config is empty, skip to avoid unnecessary restarts
if (
telemetrySDK &&
(!telemetryConfig ||
(!telemetryConfig.spanProcessors &&
!telemetryConfig.spanProcessor &&
!telemetryConfig.traceExporter &&
!telemetryConfig.metricReader))
) {
return;
}

nodeOtelConfig = telemetryConfig || {};

const processors: SpanProcessor[] = [createTelemetryServerProcessor()];
if (nodeOtelConfig.traceExporter) {
throw new Error('Please specify spanProcessors instead.');
}
const processors: SpanProcessor[] = [];
if (nodeOtelConfig.spanProcessors) {
processors.push(...nodeOtelConfig.spanProcessors);
}
if (nodeOtelConfig.spanProcessor) {
processors.push(nodeOtelConfig.spanProcessor);
delete nodeOtelConfig.spanProcessor;
}
nodeOtelConfig.spanProcessors = processors;
processors.push(createTelemetryServerProcessor());

if (processors.length > 1) {
nodeOtelConfig.spanProcessor = new MultiSpanProcessor(processors);
} else {
nodeOtelConfig.spanProcessor = processors[0];
}
delete nodeOtelConfig.spanProcessors;

if (telemetrySDK) {
await cleanUpTracing();
}

telemetrySDK = new NodeSDK(nodeOtelConfig);
telemetrySDK.start();
process.on('SIGTERM', async () => await cleanUpTracing());

if (!isSigtermHandlerRegistered) {
process.on('SIGTERM', async () => await cleanUpTracing());
isSigtermHandlerRegistered = true;
}
}

async function cleanUpTracing(): Promise<void> {
Expand Down Expand Up @@ -113,7 +170,7 @@ function maybeFlushMetrics(): Promise<void> {
* Flushes all configured span processors.
*/
async function flushTracing() {
if (nodeOtelConfig?.spanProcessors) {
await Promise.all(nodeOtelConfig.spanProcessors.map((p) => p.forceFlush()));
if (nodeOtelConfig?.spanProcessor) {
await nodeOtelConfig.spanProcessor.forceFlush();
}
}
6 changes: 4 additions & 2 deletions js/core/tests/action_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { z } from 'zod';
import { action, defineAction } from '../src/action.js';
import { initNodeFeatures } from '../src/node.js';
import { Registry } from '../src/registry.js';
import { enableTelemetry } from '../src/tracing.js';
import { enableTelemetry, flushTracing } from '../src/tracing.js';
import { TestSpanExporter } from './utils.js';

initNodeFeatures();
Expand All @@ -33,8 +33,9 @@ enableTelemetry({

describe('action', () => {
var registry: Registry;
beforeEach(() => {
beforeEach(async () => {
registry = new Registry();
await flushTracing();
spanExporter.exportedSpans = [];
});

Expand Down Expand Up @@ -289,6 +290,7 @@ describe('action', () => {
act.__action.key = 'some-custom-key';

await act();
await flushTracing();

assert.strictEqual(spanExporter.exportedSpans.length, 1);
assert.strictEqual(
Expand Down
12 changes: 10 additions & 2 deletions js/core/tests/flow_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { defineFlow, run } from '../src/flow.js';
import { defineAction, getContext, z } from '../src/index.js';
import { initNodeFeatures } from '../src/node.js';
import { Registry } from '../src/registry.js';
import { enableTelemetry } from '../src/tracing.js';
import { enableTelemetry, flushTracing } from '../src/tracing.js';
import { TestSpanExporter } from './utils.js';

initNodeFeatures();
Expand All @@ -48,10 +48,12 @@ function createTestFlow(registry: Registry) {
describe('flow', () => {
let registry: Registry;

beforeEach(() => {
beforeEach(async () => {
// Skips starting reflection server.
delete process.env.GENKIT_ENV;
registry = new Registry();
await flushTracing();
spanExporter.exportedSpans = [];
});

describe('runFlow', () => {
Expand Down Expand Up @@ -370,6 +372,8 @@ describe('flow', () => {
telemetryLabels: { custom: 'label' },
});

await flushTracing();

assert.equal(result, 'bar foo');
assert.strictEqual(spanExporter.exportedSpans.length, 1);
assert.strictEqual(spanExporter.exportedSpans[0].displayName, 'testFlow');
Expand All @@ -394,6 +398,8 @@ describe('flow', () => {
});
const result = await output;

await flushTracing();

assert.equal(result, 'bar foo');
assert.strictEqual(spanExporter.exportedSpans.length, 1);
assert.strictEqual(spanExporter.exportedSpans[0].displayName, 'testFlow');
Expand Down Expand Up @@ -445,6 +451,8 @@ describe('flow', () => {
context: { user: 'pavel' },
});

await flushTracing();

assert.equal(result, 'foo bar');
assert.strictEqual(spanExporter.exportedSpans.length, 3);

Expand Down
2 changes: 1 addition & 1 deletion js/core/tests/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class TestSpanExporter implements SpanExporter {
displayName: span.name,
links: span.links,
spanKind: SpanKind[span.kind],
parentSpanId: span.parentSpanId,
parentSpanId: span.parentSpanContext?.spanId,
sameProcessAsParentSpan: { value: !span.spanContext().isRemote },
status: span.status,
timeEvents: {
Expand Down
26 changes: 13 additions & 13 deletions js/plugins/google-cloud/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@
"dependencies": {
"@google-cloud/logging-winston": "^6.0.0",
"@google-cloud/modelarmor": "^0.4.1",
"@google-cloud/opentelemetry-cloud-monitoring-exporter": "^0.19.0",
"@google-cloud/opentelemetry-cloud-trace-exporter": "^2.4.1",
"@google-cloud/opentelemetry-resource-util": "^2.4.0",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/auto-instrumentations-node": "^0.49.1",
"@opentelemetry/core": "~1.25.0",
"@opentelemetry/instrumentation": "^0.52.0",
"@opentelemetry/instrumentation-pino": "^0.41.0",
"@opentelemetry/instrumentation-winston": "^0.39.0",
"@opentelemetry/resources": "~1.25.0",
"@opentelemetry/sdk-metrics": "~1.25.0",
"@opentelemetry/sdk-node": "^0.52.0",
"@opentelemetry/sdk-trace-base": "~1.25.0",
"@google-cloud/opentelemetry-cloud-monitoring-exporter": "^0.21.0",
"@google-cloud/opentelemetry-cloud-trace-exporter": "^3.0.0",
"@google-cloud/opentelemetry-resource-util": "^3.0.0",
"@opentelemetry/api": "^1.9.1",
"@opentelemetry/auto-instrumentations-node": "^0.72.0",
"@opentelemetry/core": "^2.6.1",
"@opentelemetry/instrumentation": "^0.214.0",
"@opentelemetry/instrumentation-pino": "^0.60.0",
"@opentelemetry/instrumentation-winston": "^0.58.0",
"@opentelemetry/resources": "^2.6.1",
"@opentelemetry/sdk-metrics": "^2.6.1",
"@opentelemetry/sdk-node": "^0.214.0",
"@opentelemetry/sdk-trace-base": "^2.6.1",
"google-auth-library": "^9.6.3",
"node-fetch": "^3.3.2",
"winston": "^3.12.0"
Expand Down
Loading
Loading