Skip to content
Merged
166 changes: 163 additions & 3 deletions packages/interceptors-opentelemetry/src/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
import * as otel from '@opentelemetry/api';
import type { Next, WorkflowSignalInput, WorkflowStartInput, WorkflowClientInterceptor } from '@temporalio/client';
import { instrument, headersWithContext, RUN_ID_ATTR_KEY } from '../instrumentation';
import type {
Next,
WorkflowSignalInput,
WorkflowSignalWithStartInput,
WorkflowStartInput,
WorkflowStartOutput,
WorkflowStartUpdateInput,
WorkflowStartUpdateOutput,
WorkflowStartUpdateWithStartInput,
WorkflowStartUpdateWithStartOutput,
WorkflowQueryInput,
WorkflowTerminateInput,
WorkflowCancelInput,
WorkflowDescribeInput,
WorkflowClientInterceptor,
TerminateWorkflowExecutionResponse,
RequestCancelWorkflowExecutionResponse,
DescribeWorkflowExecutionResponse,
} from '@temporalio/client';
import {
instrument,
headersWithContext,
RUN_ID_ATTR_KEY,
WORKFLOW_ID_ATTR_KEY,
TERMINATE_REASON_ATTR_KEY,
} from '../instrumentation';
import { SpanName, SPAN_DELIMITER } from '../workflow';

export interface InterceptorOptions {
Expand All @@ -12,7 +36,7 @@ export interface InterceptorOptions {
*
* Wraps the operation in an opentelemetry Span and passes it to the Workflow via headers.
*/
export class OpenTelemetryWorkflowClientInterceptor implements WorkflowClientInterceptor {
export class OpenTelemetryWorkflowClientInterceptor implements Required<WorkflowClientInterceptor> {
protected readonly tracer: otel.Tracer;

constructor(options?: InterceptorOptions) {
Expand Down Expand Up @@ -42,4 +66,140 @@ export class OpenTelemetryWorkflowClientInterceptor implements WorkflowClientInt
},
});
}

async startWithDetails(
input: WorkflowStartInput,
next: Next<WorkflowClientInterceptor, 'startWithDetails'>
): Promise<WorkflowStartOutput> {
return await instrument({
tracer: this.tracer,
spanName: `${SpanName.WORKFLOW_START}${SPAN_DELIMITER}${input.workflowType}`,
fn: async (span) => {
const headers = headersWithContext(input.headers);
const output = await next({ ...input, headers });
span.setAttribute(RUN_ID_ATTR_KEY, output.runId);
return output;
},
});
}

async startUpdate(
input: WorkflowStartUpdateInput,
next: Next<WorkflowClientInterceptor, 'startUpdate'>
): Promise<WorkflowStartUpdateOutput> {
return await instrument({
tracer: this.tracer,
spanName: `${SpanName.WORKFLOW_UPDATE}${SPAN_DELIMITER}${input.updateName}`,
fn: async (span) => {
const headers = headersWithContext(input.headers);
const output = await next({ ...input, headers });
span.setAttribute(RUN_ID_ATTR_KEY, output.workflowRunId);
return output;
},
});
}

async startUpdateWithStart(
input: WorkflowStartUpdateWithStartInput,
next: Next<WorkflowClientInterceptor, 'startUpdateWithStart'>
): Promise<WorkflowStartUpdateWithStartOutput> {
return await instrument({
tracer: this.tracer,
spanName: `${SpanName.WORKFLOW_UPDATE_WITH_START}${SPAN_DELIMITER}${input.workflowType}${SPAN_DELIMITER}${input.updateName}`,
fn: async (span) => {
const workflowStartHeaders = headersWithContext(input.workflowStartHeaders);
const updateHeaders = headersWithContext(input.updateHeaders);
const output = await next({ ...input, workflowStartHeaders, updateHeaders });
if (output.workflowExecution.runId) {
span.setAttribute(RUN_ID_ATTR_KEY, output.workflowExecution.runId);
}
return output;
},
});
}

async signalWithStart(
input: WorkflowSignalWithStartInput,
next: Next<WorkflowClientInterceptor, 'signalWithStart'>
): Promise<string> {
return await instrument({
tracer: this.tracer,
spanName: `${SpanName.WORKFLOW_SIGNAL_WITH_START}${SPAN_DELIMITER}${input.workflowType}${SPAN_DELIMITER}${input.signalName}`,
fn: async (span) => {
const headers = headersWithContext(input.headers);
const runId = await next({ ...input, headers });
span.setAttribute(RUN_ID_ATTR_KEY, runId);
return runId;
},
});
}

async query(input: WorkflowQueryInput, next: Next<WorkflowClientInterceptor, 'query'>): Promise<unknown> {
return await instrument({
tracer: this.tracer,
spanName: `${SpanName.WORKFLOW_QUERY}${SPAN_DELIMITER}${input.queryType}`,
fn: async (span) => {
const headers = headersWithContext(input.headers);
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
if (input.workflowExecution.runId) {
span.setAttribute(RUN_ID_ATTR_KEY, input.workflowExecution.runId);
}
return await next({ ...input, headers });
},
});
}

async terminate(
input: WorkflowTerminateInput,
next: Next<WorkflowClientInterceptor, 'terminate'>
): Promise<TerminateWorkflowExecutionResponse> {
return await instrument({
tracer: this.tracer,
spanName: SpanName.WORKFLOW_TERMINATE,
fn: async (span) => {
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
if (input.workflowExecution.runId) {
span.setAttribute(RUN_ID_ATTR_KEY, input.workflowExecution.runId);
}
if (input.reason) {
span.setAttribute(TERMINATE_REASON_ATTR_KEY, input.reason);
}
return await next(input);
},
});
}

async cancel(
input: WorkflowCancelInput,
next: Next<WorkflowClientInterceptor, 'cancel'>
): Promise<RequestCancelWorkflowExecutionResponse> {
return await instrument({
tracer: this.tracer,
spanName: SpanName.WORKFLOW_CANCEL,
fn: async (span) => {
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
if (input.workflowExecution.runId) {
span.setAttribute(RUN_ID_ATTR_KEY, input.workflowExecution.runId);
}
return await next(input);
},
});
}

async describe(
input: WorkflowDescribeInput,
next: Next<WorkflowClientInterceptor, 'describe'>
): Promise<DescribeWorkflowExecutionResponse> {
return await instrument({
tracer: this.tracer,
spanName: SpanName.WORKFLOW_DESCRIBE,
fn: async (span) => {
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
if (input.workflowExecution.runId) {
span.setAttribute(RUN_ID_ATTR_KEY, input.workflowExecution.runId);
}
return await next(input);
},
});
}
}
60 changes: 52 additions & 8 deletions packages/interceptors-opentelemetry/src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ import {
export const TRACE_HEADER = '_tracer-data';
/** As in workflow run id */
export const RUN_ID_ATTR_KEY = 'run_id';
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ruby OTEL uses temporalRunId

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh? Have you compared with other SDKs, beside Ruby? We'd ideally want cross-SDKs compatibility of OTel tracing, as a customer could be operating different languages within a single Temporal application.

Not saying we'll prioritize, of course, but at the very least we should settle on what names we want to normalize to across the board, so that we eventually converge to something consistent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Java: runId
  • Python temporalRunId
  • Go: Doesn't have these
  • PHP: N/A
  • .NET: temporalRunId

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept run_id as I wasn't sure if moving to temporalRunId could be considered a breaking change. I could see the case that users have dashboard/queries using run_id that would break if we changed this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair. We may however consider changing these while transitioning to OTel 2. I would mind being breaking at that point.

/** As in workflow id */
export const WORKFLOW_ID_ATTR_KEY = 'workflow_id';
/** As in termination reason */
export const TERMINATE_REASON_ATTR_KEY = 'terminate_reason';
/** As in timer duration */
export const TIMER_DURATION_MS_ATTR_KEY = 'timer_duration_ms';
/** As in Nexus service */
export const NEXUS_SERVICE_ATTR_KEY = 'nexus_service';
/** As in Nexus operation */
export const NEXUS_OPERATION_ATTR_KEY = 'nexus_operation';
/** As in Nexus endpoint */
export const NEXUS_ENDPOINT_ATTR_KEY = 'nexus_endpoint';

const payloadConverter = defaultPayloadConverter;

Expand Down Expand Up @@ -48,20 +60,41 @@ async function wrapWithSpan<T>(
span.setStatus({ code: otel.SpanStatusCode.OK });
return ret;
} catch (err: any) {
const isBenignErr = err instanceof ApplicationFailure && err.category === ApplicationFailureCategory.BENIGN;
if (acceptableErrors === undefined || !acceptableErrors(err)) {
const statusCode = isBenignErr ? otel.SpanStatusCode.UNSET : otel.SpanStatusCode.ERROR;
span.setStatus({ code: statusCode, message: (err as Error).message ?? String(err) });
span.recordException(err);
} else {
span.setStatus({ code: otel.SpanStatusCode.OK });
}
handleError(err, span, acceptableErrors);
throw err;
} finally {
span.end();
}
}

function wrapWithSpanSync<T>(
span: otel.Span,
fn: (span: otel.Span) => T,
acceptableErrors?: (err: unknown) => boolean
): T {
try {
const ret = fn(span);
span.setStatus({ code: otel.SpanStatusCode.OK });
return ret;
} catch (err: any) {
handleError(err, span, acceptableErrors);
throw err;
} finally {
span.end();
}
}

function handleError(err: any, span: otel.Span, acceptableErrors?: (err: unknown) => boolean): void {
const isBenignErr = err instanceof ApplicationFailure && err.category === ApplicationFailureCategory.BENIGN;
if (acceptableErrors === undefined || !acceptableErrors(err)) {
const statusCode = isBenignErr ? otel.SpanStatusCode.UNSET : otel.SpanStatusCode.ERROR;
span.setStatus({ code: statusCode, message: (err as Error).message ?? String(err) });
span.recordException(err);
} else {
span.setStatus({ code: otel.SpanStatusCode.OK });
}
}

export interface InstrumentOptions<T> {
tracer: otel.Tracer;
spanName: string;
Expand All @@ -70,6 +103,8 @@ export interface InstrumentOptions<T> {
acceptableErrors?: (err: unknown) => boolean;
}

export type InstrumentOptionsSync<T> = Omit<InstrumentOptions<T>, 'fn'> & { fn: (span: otel.Span) => T };

/**
* Wraps `fn` in a span which ends when function returns or throws
*/
Expand All @@ -87,3 +122,12 @@ export async function instrument<T>({
}
return await tracer.startActiveSpan(spanName, async (span) => await wrapWithSpan(span, fn, acceptableErrors));
}

export function instrumentSync<T>({ tracer, spanName, fn, context, acceptableErrors }: InstrumentOptionsSync<T>): T {
if (context) {
return otel.context.with(context, () => {
return tracer.startActiveSpan(spanName, (span) => wrapWithSpanSync(span, fn, acceptableErrors));
});
}
return tracer.startActiveSpan(spanName, (span) => wrapWithSpanSync(span, fn, acceptableErrors));
}
27 changes: 23 additions & 4 deletions packages/interceptors-opentelemetry/src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type {
ActivityOutboundCallsInterceptor,
InjectedSink,
GetLogAttributesInput,
GetMetricTagsInput,
ActivityExecuteInput,
} from '@temporalio/worker';
import { instrument, extractContextFromHeaders } from '../instrumentation';
Expand All @@ -23,7 +24,7 @@ export interface InterceptorOptions {
* Wraps the operation in an opentelemetry Span and links it to a parent Span context if one is
* provided in the Activity input headers.
*/
export class OpenTelemetryActivityInboundInterceptor implements ActivityInboundCallsInterceptor {
export class OpenTelemetryActivityInboundInterceptor implements Required<ActivityInboundCallsInterceptor> {
protected readonly tracer: otel.Tracer;

constructor(
Expand All @@ -41,11 +42,11 @@ export class OpenTelemetryActivityInboundInterceptor implements ActivityInboundC
}

/**
* Intercepts calls to emit logs from an Activity.
* Intercepts calls to emit logs and metrics from an Activity.
*
* Attach OpenTelemetry context tracing attributes to emitted log messages, if appropriate.
* Attach OpenTelemetry context tracing attributes to emitted log messages and metrics, if appropriate.
*/
export class OpenTelemetryActivityOutboundInterceptor implements ActivityOutboundCallsInterceptor {
export class OpenTelemetryActivityOutboundInterceptor implements Required<ActivityOutboundCallsInterceptor> {
constructor(protected readonly ctx: ActivityContext) {}

public getLogAttributes(
Expand All @@ -65,6 +66,24 @@ export class OpenTelemetryActivityOutboundInterceptor implements ActivityOutboun
return next(input);
}
}

public getMetricTags(
input: GetMetricTagsInput,
next: Next<ActivityOutboundCallsInterceptor, 'getMetricTags'>
): GetMetricTagsInput {
const span = otel.trace.getSpan(otel.context.active());
const spanContext = span?.spanContext();
if (spanContext && otel.isSpanContextValid(spanContext)) {
return next({
trace_id: spanContext.traceId,
span_id: spanContext.spanId,
trace_flags: `0${spanContext.traceFlags.toString(16)}`,
...input,
});
} else {
return next(input);
}
}
}

/**
Expand Down
38 changes: 38 additions & 0 deletions packages/interceptors-opentelemetry/src/workflow/definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,36 @@ export enum SpanName {
*/
WORKFLOW_SIGNAL_WITH_START = 'SignalWithStartWorkflow',

/**
* Workflow is queried
*/
WORKFLOW_QUERY = 'QueryWorkflow',

/**
* Workflow is updated
*/
WORKFLOW_UPDATE = 'UpdateWorkflow',

/**
* Workflow is started with an update
*/
WORKFLOW_UPDATE_WITH_START = 'UpdateWithStartWorkflow',

/**
* Workflow is terminated
*/
WORKFLOW_TERMINATE = 'TerminateWorkflow',

/**
* Workflow is cancelled
*/
WORKFLOW_CANCEL = 'CancelWorkflow',

/**
* Workflow is described
*/
WORKFLOW_DESCRIBE = 'DescribeWorkflow',

/**
* Workflow run is executing
*/
Expand All @@ -74,6 +104,14 @@ export enum SpanName {
* Workflow is continuing as new
*/
CONTINUE_AS_NEW = 'ContinueAsNew',
/**
* Workflow timer is started
*/
WORKFLOW_TIMER = 'StartTimer',
/**
* Nexus operation is started
*/
NEXUS_OPERATION_START = 'StartNexusOperation',
}

export const SPAN_DELIMITER = ':';
Loading