Skip to content
Open
183 changes: 178 additions & 5 deletions packages/interceptors-opentelemetry/src/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,31 @@
import * as otel from '@opentelemetry/api';
import type { Next, WorkflowSignalInput, WorkflowStartInput, WorkflowClientInterceptor } from '@temporalio/client';
import { instrument, headersWithContext, RUN_ID_ATTR_KEY } from '../instrumentation';
import type {
Next,
WorkflowSignalInput,
WorkflowSignalWithStartInput,
WorkflowStartInput,
WorkflowStartOutput,
WorkflowStartUpdateInput,
WorkflowStartUpdateOutput,
WorkflowStartUpdateWithStartInput,
WorkflowStartUpdateWithStartOutput,
WorkflowQueryInput,
WorkflowTerminateInput,
WorkflowCancelInput,
WorkflowDescribeInput,
WorkflowClientInterceptor,
TerminateWorkflowExecutionResponse,
RequestCancelWorkflowExecutionResponse,
DescribeWorkflowExecutionResponse,
} from '@temporalio/client';
import {
instrument,
headersWithContext,
RUN_ID_ATTR_KEY,
WORKFLOW_ID_ATTR_KEY,
UPDATE_ID_ATTR_KEY,
TERMINATE_REASON_ATTR_KEY,
} from '../instrumentation';
import { SpanName, SPAN_DELIMITER } from '../workflow';

export interface InterceptorOptions {
Expand All @@ -24,7 +49,8 @@ export class OpenTelemetryWorkflowClientInterceptor implements WorkflowClientInt
tracer: this.tracer,
spanName: `${SpanName.WORKFLOW_START}${SPAN_DELIMITER}${input.workflowType}`,
fn: async (span) => {
const headers = await headersWithContext(input.headers);
const headers = headersWithContext(input.headers);
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.options.workflowId);
const runId = await next({ ...input, headers });
span.setAttribute(RUN_ID_ATTR_KEY, runId);
return runId;
Expand All @@ -36,10 +62,157 @@ export class OpenTelemetryWorkflowClientInterceptor implements WorkflowClientInt
return await instrument({
tracer: this.tracer,
spanName: `${SpanName.WORKFLOW_SIGNAL}${SPAN_DELIMITER}${input.signalName}`,
fn: async () => {
const headers = await headersWithContext(input.headers);
fn: async (span) => {
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
const headers = headersWithContext(input.headers);
await next({ ...input, headers });
},
});
}

async startWithDetails(
input: WorkflowStartInput,
next: Next<WorkflowClientInterceptor, 'startWithDetails'>
): Promise<WorkflowStartOutput> {
return await instrument({
tracer: this.tracer,
spanName: `${SpanName.WORKFLOW_START}${SPAN_DELIMITER}${input.workflowType}`,
fn: async (span) => {
const headers = headersWithContext(input.headers);
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.options.workflowId);
const output = await next({ ...input, headers });
span.setAttribute(RUN_ID_ATTR_KEY, output.runId);
return output;
},
});
}

async startUpdate(
input: WorkflowStartUpdateInput,
next: Next<WorkflowClientInterceptor, 'startUpdate'>
): Promise<WorkflowStartUpdateOutput> {
return await instrument({
tracer: this.tracer,
spanName: `${SpanName.WORKFLOW_START_UPDATE}${SPAN_DELIMITER}${input.updateName}`,
fn: async (span) => {
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
if (input.options.updateId) {
span.setAttribute(UPDATE_ID_ATTR_KEY, input.options.updateId);
}
const headers = headersWithContext(input.headers);
const output = await next({ ...input, headers });
span.setAttribute(RUN_ID_ATTR_KEY, output.workflowRunId);
return output;
},
});
}

async startUpdateWithStart(
input: WorkflowStartUpdateWithStartInput,
next: Next<WorkflowClientInterceptor, 'startUpdateWithStart'>
): Promise<WorkflowStartUpdateWithStartOutput> {
return await instrument({
tracer: this.tracer,
spanName: `${SpanName.WORKFLOW_UPDATE_WITH_START}${SPAN_DELIMITER}${input.updateName}`,
fn: async (span) => {
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowStartOptions.workflowId);
if (input.updateOptions.updateId) {
span.setAttribute(UPDATE_ID_ATTR_KEY, input.updateOptions.updateId);
}
const workflowStartHeaders = headersWithContext(input.workflowStartHeaders);
const updateHeaders = headersWithContext(input.updateHeaders);
const output = await next({ ...input, workflowStartHeaders, updateHeaders });
if (output.workflowExecution.runId) {
span.setAttribute(RUN_ID_ATTR_KEY, output.workflowExecution.runId);
}
return output;
},
});
}

async signalWithStart(
input: WorkflowSignalWithStartInput,
next: Next<WorkflowClientInterceptor, 'signalWithStart'>
): Promise<string> {
return await instrument({
tracer: this.tracer,
spanName: `${SpanName.WORKFLOW_SIGNAL_WITH_START}${SPAN_DELIMITER}${input.workflowType}`,
fn: async (span) => {
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.options.workflowId);
const headers = headersWithContext(input.headers);
const runId = await next({ ...input, headers });
span.setAttribute(RUN_ID_ATTR_KEY, runId);
return runId;
},
});
}

async query(input: WorkflowQueryInput, next: Next<WorkflowClientInterceptor, 'query'>): Promise<unknown> {
return await instrument({
tracer: this.tracer,
spanName: `${SpanName.WORKFLOW_QUERY}${SPAN_DELIMITER}${input.queryType}`,
fn: async (span) => {
const headers = headersWithContext(input.headers);
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
if (input.workflowExecution.runId) {
span.setAttribute(RUN_ID_ATTR_KEY, input.workflowExecution.runId);
}
return await next({ ...input, headers });
},
});
}

async terminate(
input: WorkflowTerminateInput,
next: Next<WorkflowClientInterceptor, 'terminate'>
): Promise<TerminateWorkflowExecutionResponse> {
return await instrument({
tracer: this.tracer,
spanName: SpanName.WORKFLOW_TERMINATE,
fn: async (span) => {
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
if (input.workflowExecution.runId) {
span.setAttribute(RUN_ID_ATTR_KEY, input.workflowExecution.runId);
}
if (input.reason) {
span.setAttribute(TERMINATE_REASON_ATTR_KEY, input.reason);
}
return await next(input);
},
});
}

async cancel(
input: WorkflowCancelInput,
next: Next<WorkflowClientInterceptor, 'cancel'>
): Promise<RequestCancelWorkflowExecutionResponse> {
return await instrument({
tracer: this.tracer,
spanName: SpanName.WORKFLOW_CANCEL,
fn: async (span) => {
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
if (input.workflowExecution.runId) {
span.setAttribute(RUN_ID_ATTR_KEY, input.workflowExecution.runId);
}
return await next(input);
},
});
}

async describe(
input: WorkflowDescribeInput,
next: Next<WorkflowClientInterceptor, 'describe'>
): Promise<DescribeWorkflowExecutionResponse> {
return await instrument({
tracer: this.tracer,
spanName: SpanName.WORKFLOW_DESCRIBE,
fn: async (span) => {
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
if (input.workflowExecution.runId) {
span.setAttribute(RUN_ID_ATTR_KEY, input.workflowExecution.runId);
}
return await next(input);
},
});
}
}
62 changes: 54 additions & 8 deletions packages/interceptors-opentelemetry/src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@ import {
export const TRACE_HEADER = '_tracer-data';
/** As in workflow run id */
export const RUN_ID_ATTR_KEY = 'run_id';
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ruby OTEL uses temporalRunId

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh? Have you compared with other SDKs, beside Ruby? We'd ideally want cross-SDKs compatibility of OTel tracing, as a customer could be operating different languages within a single Temporal application.

Not saying we'll prioritize, of course, but at the very least we should settle on what names we want to normalize to across the board, so that we eventually converge to something consistent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Java: runId
  • Python temporalRunId
  • Go: Doesn't have these
  • PHP: N/A
  • .NET: temporalRunId

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept run_id as I wasn't sure if moving to temporalRunId could be considered a breaking change. I could see the case that users have dashboard/queries using run_id that would break if we changed this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair. We may however consider changing these while transitioning to OTel 2. I would mind being breaking at that point.

/** As in workflow id */
export const WORKFLOW_ID_ATTR_KEY = 'temporalWorkflowId';
/** As in activity id */
export const ACTIVITY_ID_ATTR_KEY = 'temporalActivityId';
/** As in update id */
export const UPDATE_ID_ATTR_KEY = 'temporalUpdateId';
/** As in termination reason */
export const TERMINATE_REASON_ATTR_KEY = 'temporalTerminateReason';
/** As in Nexus service */
export const NEXUS_SERVICE_ATTR_KEY = 'temporalNexusService';
/** As in Nexus operation */
export const NEXUS_OPERATION_ATTR_KEY = 'temporalNexusOperation';
/** As in Nexus endpoint */
export const NEXUS_ENDPOINT_ATTR_KEY = 'temporalNexusEndpoint';

const payloadConverter = defaultPayloadConverter;

Expand Down Expand Up @@ -48,20 +62,41 @@ async function wrapWithSpan<T>(
span.setStatus({ code: otel.SpanStatusCode.OK });
return ret;
} catch (err: any) {
const isBenignErr = err instanceof ApplicationFailure && err.category === ApplicationFailureCategory.BENIGN;
if (acceptableErrors === undefined || !acceptableErrors(err)) {
const statusCode = isBenignErr ? otel.SpanStatusCode.UNSET : otel.SpanStatusCode.ERROR;
span.setStatus({ code: statusCode, message: (err as Error).message ?? String(err) });
span.recordException(err);
} else {
span.setStatus({ code: otel.SpanStatusCode.OK });
}
maybeAddErrorToSpan(err, span, acceptableErrors);
throw err;
} finally {
span.end();
}
}

function wrapWithSpanSync<T>(
span: otel.Span,
fn: (span: otel.Span) => T,
acceptableErrors?: (err: unknown) => boolean
): T {
try {
const ret = fn(span);
span.setStatus({ code: otel.SpanStatusCode.OK });
return ret;
} catch (err: any) {
maybeAddErrorToSpan(err, span, acceptableErrors);
throw err;
} finally {
span.end();
}
}

function maybeAddErrorToSpan(err: any, span: otel.Span, acceptableErrors?: (err: unknown) => boolean): void {
const isBenignErr = err instanceof ApplicationFailure && err.category === ApplicationFailureCategory.BENIGN;
if (acceptableErrors === undefined || !acceptableErrors(err)) {
const statusCode = isBenignErr ? otel.SpanStatusCode.UNSET : otel.SpanStatusCode.ERROR;
span.setStatus({ code: statusCode, message: (err as Error).message ?? String(err) });
span.recordException(err);
} else {
span.setStatus({ code: otel.SpanStatusCode.OK });
}
}

export interface InstrumentOptions<T> {
tracer: otel.Tracer;
spanName: string;
Expand All @@ -70,6 +105,8 @@ export interface InstrumentOptions<T> {
acceptableErrors?: (err: unknown) => boolean;
}

export type InstrumentOptionsSync<T> = Omit<InstrumentOptions<T>, 'fn'> & { fn: (span: otel.Span) => T };

/**
* Wraps `fn` in a span which ends when function returns or throws
*/
Expand All @@ -87,3 +124,12 @@ export async function instrument<T>({
}
return await tracer.startActiveSpan(spanName, async (span) => await wrapWithSpan(span, fn, acceptableErrors));
}

export function instrumentSync<T>({ tracer, spanName, fn, context, acceptableErrors }: InstrumentOptionsSync<T>): T {
if (context) {
return otel.context.with(context, () => {
return tracer.startActiveSpan(spanName, (span) => wrapWithSpanSync(span, fn, acceptableErrors));
});
}
return tracer.startActiveSpan(spanName, (span) => wrapWithSpanSync(span, fn, acceptableErrors));
}
43 changes: 39 additions & 4 deletions packages/interceptors-opentelemetry/src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@ import type {
ActivityOutboundCallsInterceptor,
InjectedSink,
GetLogAttributesInput,
GetMetricTagsInput,
ActivityExecuteInput,
} from '@temporalio/worker';
import { instrument, extractContextFromHeaders } from '../instrumentation';
import {
instrument,
extractContextFromHeaders,
WORKFLOW_ID_ATTR_KEY,
RUN_ID_ATTR_KEY,
ACTIVITY_ID_ATTR_KEY,
} from '../instrumentation';
import { type OpenTelemetryWorkflowExporter, type SerializableSpan, SpanName, SPAN_DELIMITER } from '../workflow';

export interface InterceptorOptions {
Expand All @@ -36,14 +43,24 @@ export class OpenTelemetryActivityInboundInterceptor implements ActivityInboundC
async execute(input: ActivityExecuteInput, next: Next<ActivityInboundCallsInterceptor, 'execute'>): Promise<unknown> {
const context = extractContextFromHeaders(input.headers);
const spanName = `${SpanName.ACTIVITY_EXECUTE}${SPAN_DELIMITER}${this.ctx.info.activityType}`;
return await instrument({ tracer: this.tracer, spanName, fn: () => next(input), context });
return await instrument({
tracer: this.tracer,
spanName,
fn: (span) => {
span.setAttribute(WORKFLOW_ID_ATTR_KEY, this.ctx.info.workflowExecution.workflowId);
span.setAttribute(RUN_ID_ATTR_KEY, this.ctx.info.workflowExecution.runId);
span.setAttribute(ACTIVITY_ID_ATTR_KEY, this.ctx.info.activityId);
return next(input);
},
context,
});
}
}

/**
* Intercepts calls to emit logs from an Activity.
* Intercepts calls to emit logs and metrics from an Activity.
*
* Attach OpenTelemetry context tracing attributes to emitted log messages, if appropriate.
* Attach OpenTelemetry context tracing attributes to emitted log messages and metrics, if appropriate.
*/
export class OpenTelemetryActivityOutboundInterceptor implements ActivityOutboundCallsInterceptor {
constructor(protected readonly ctx: ActivityContext) {}
Expand All @@ -65,6 +82,24 @@ export class OpenTelemetryActivityOutboundInterceptor implements ActivityOutboun
return next(input);
}
}

public getMetricTags(
input: GetMetricTagsInput,
next: Next<ActivityOutboundCallsInterceptor, 'getMetricTags'>
): GetMetricTagsInput {
const span = otel.trace.getSpan(otel.context.active());
const spanContext = span?.spanContext();
if (spanContext && otel.isSpanContextValid(spanContext)) {
return next({
trace_id: spanContext.traceId,
span_id: spanContext.spanId,
trace_flags: `0${spanContext.traceFlags.toString(16)}`,
...input,
});
} else {
return next(input);
}
}
}

/**
Expand Down
Loading