Skip to content

Commit 6b89dfc

Browse files
feat(@temporalio/interceptors-opentelemetry): implement all interceptors (#1835)
1 parent 47fb852 commit 6b89dfc

File tree

10 files changed

+1355
-23
lines changed

10 files changed

+1355
-23
lines changed

packages/interceptors-opentelemetry/src/client/index.ts

Lines changed: 178 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,31 @@
11
import * as otel from '@opentelemetry/api';
2-
import type { Next, WorkflowSignalInput, WorkflowStartInput, WorkflowClientInterceptor } from '@temporalio/client';
3-
import { instrument, headersWithContext, RUN_ID_ATTR_KEY } from '../instrumentation';
2+
import type {
3+
Next,
4+
WorkflowSignalInput,
5+
WorkflowSignalWithStartInput,
6+
WorkflowStartInput,
7+
WorkflowStartOutput,
8+
WorkflowStartUpdateInput,
9+
WorkflowStartUpdateOutput,
10+
WorkflowStartUpdateWithStartInput,
11+
WorkflowStartUpdateWithStartOutput,
12+
WorkflowQueryInput,
13+
WorkflowTerminateInput,
14+
WorkflowCancelInput,
15+
WorkflowDescribeInput,
16+
WorkflowClientInterceptor,
17+
TerminateWorkflowExecutionResponse,
18+
RequestCancelWorkflowExecutionResponse,
19+
DescribeWorkflowExecutionResponse,
20+
} from '@temporalio/client';
21+
import {
22+
instrument,
23+
headersWithContext,
24+
RUN_ID_ATTR_KEY,
25+
WORKFLOW_ID_ATTR_KEY,
26+
UPDATE_ID_ATTR_KEY,
27+
TERMINATE_REASON_ATTR_KEY,
28+
} from '../instrumentation';
429
import { SpanName, SPAN_DELIMITER } from '../workflow';
530

631
export interface InterceptorOptions {
@@ -24,7 +49,8 @@ export class OpenTelemetryWorkflowClientInterceptor implements WorkflowClientInt
2449
tracer: this.tracer,
2550
spanName: `${SpanName.WORKFLOW_START}${SPAN_DELIMITER}${input.workflowType}`,
2651
fn: async (span) => {
27-
const headers = await headersWithContext(input.headers);
52+
const headers = headersWithContext(input.headers);
53+
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.options.workflowId);
2854
const runId = await next({ ...input, headers });
2955
span.setAttribute(RUN_ID_ATTR_KEY, runId);
3056
return runId;
@@ -36,10 +62,157 @@ export class OpenTelemetryWorkflowClientInterceptor implements WorkflowClientInt
3662
return await instrument({
3763
tracer: this.tracer,
3864
spanName: `${SpanName.WORKFLOW_SIGNAL}${SPAN_DELIMITER}${input.signalName}`,
39-
fn: async () => {
40-
const headers = await headersWithContext(input.headers);
65+
fn: async (span) => {
66+
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
67+
const headers = headersWithContext(input.headers);
4168
await next({ ...input, headers });
4269
},
4370
});
4471
}
72+
73+
async startWithDetails(
74+
input: WorkflowStartInput,
75+
next: Next<WorkflowClientInterceptor, 'startWithDetails'>
76+
): Promise<WorkflowStartOutput> {
77+
return await instrument({
78+
tracer: this.tracer,
79+
spanName: `${SpanName.WORKFLOW_START}${SPAN_DELIMITER}${input.workflowType}`,
80+
fn: async (span) => {
81+
const headers = headersWithContext(input.headers);
82+
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.options.workflowId);
83+
const output = await next({ ...input, headers });
84+
span.setAttribute(RUN_ID_ATTR_KEY, output.runId);
85+
return output;
86+
},
87+
});
88+
}
89+
90+
async startUpdate(
91+
input: WorkflowStartUpdateInput,
92+
next: Next<WorkflowClientInterceptor, 'startUpdate'>
93+
): Promise<WorkflowStartUpdateOutput> {
94+
return await instrument({
95+
tracer: this.tracer,
96+
spanName: `${SpanName.WORKFLOW_START_UPDATE}${SPAN_DELIMITER}${input.updateName}`,
97+
fn: async (span) => {
98+
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
99+
if (input.options.updateId) {
100+
span.setAttribute(UPDATE_ID_ATTR_KEY, input.options.updateId);
101+
}
102+
const headers = headersWithContext(input.headers);
103+
const output = await next({ ...input, headers });
104+
span.setAttribute(RUN_ID_ATTR_KEY, output.workflowRunId);
105+
return output;
106+
},
107+
});
108+
}
109+
110+
async startUpdateWithStart(
111+
input: WorkflowStartUpdateWithStartInput,
112+
next: Next<WorkflowClientInterceptor, 'startUpdateWithStart'>
113+
): Promise<WorkflowStartUpdateWithStartOutput> {
114+
return await instrument({
115+
tracer: this.tracer,
116+
spanName: `${SpanName.WORKFLOW_UPDATE_WITH_START}${SPAN_DELIMITER}${input.updateName}`,
117+
fn: async (span) => {
118+
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowStartOptions.workflowId);
119+
if (input.updateOptions.updateId) {
120+
span.setAttribute(UPDATE_ID_ATTR_KEY, input.updateOptions.updateId);
121+
}
122+
const workflowStartHeaders = headersWithContext(input.workflowStartHeaders);
123+
const updateHeaders = headersWithContext(input.updateHeaders);
124+
const output = await next({ ...input, workflowStartHeaders, updateHeaders });
125+
if (output.workflowExecution.runId) {
126+
span.setAttribute(RUN_ID_ATTR_KEY, output.workflowExecution.runId);
127+
}
128+
return output;
129+
},
130+
});
131+
}
132+
133+
async signalWithStart(
134+
input: WorkflowSignalWithStartInput,
135+
next: Next<WorkflowClientInterceptor, 'signalWithStart'>
136+
): Promise<string> {
137+
return await instrument({
138+
tracer: this.tracer,
139+
spanName: `${SpanName.WORKFLOW_SIGNAL_WITH_START}${SPAN_DELIMITER}${input.workflowType}`,
140+
fn: async (span) => {
141+
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.options.workflowId);
142+
const headers = headersWithContext(input.headers);
143+
const runId = await next({ ...input, headers });
144+
span.setAttribute(RUN_ID_ATTR_KEY, runId);
145+
return runId;
146+
},
147+
});
148+
}
149+
150+
async query(input: WorkflowQueryInput, next: Next<WorkflowClientInterceptor, 'query'>): Promise<unknown> {
151+
return await instrument({
152+
tracer: this.tracer,
153+
spanName: `${SpanName.WORKFLOW_QUERY}${SPAN_DELIMITER}${input.queryType}`,
154+
fn: async (span) => {
155+
const headers = headersWithContext(input.headers);
156+
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
157+
if (input.workflowExecution.runId) {
158+
span.setAttribute(RUN_ID_ATTR_KEY, input.workflowExecution.runId);
159+
}
160+
return await next({ ...input, headers });
161+
},
162+
});
163+
}
164+
165+
async terminate(
166+
input: WorkflowTerminateInput,
167+
next: Next<WorkflowClientInterceptor, 'terminate'>
168+
): Promise<TerminateWorkflowExecutionResponse> {
169+
return await instrument({
170+
tracer: this.tracer,
171+
spanName: SpanName.WORKFLOW_TERMINATE,
172+
fn: async (span) => {
173+
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
174+
if (input.workflowExecution.runId) {
175+
span.setAttribute(RUN_ID_ATTR_KEY, input.workflowExecution.runId);
176+
}
177+
if (input.reason) {
178+
span.setAttribute(TERMINATE_REASON_ATTR_KEY, input.reason);
179+
}
180+
return await next(input);
181+
},
182+
});
183+
}
184+
185+
async cancel(
186+
input: WorkflowCancelInput,
187+
next: Next<WorkflowClientInterceptor, 'cancel'>
188+
): Promise<RequestCancelWorkflowExecutionResponse> {
189+
return await instrument({
190+
tracer: this.tracer,
191+
spanName: SpanName.WORKFLOW_CANCEL,
192+
fn: async (span) => {
193+
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
194+
if (input.workflowExecution.runId) {
195+
span.setAttribute(RUN_ID_ATTR_KEY, input.workflowExecution.runId);
196+
}
197+
return await next(input);
198+
},
199+
});
200+
}
201+
202+
async describe(
203+
input: WorkflowDescribeInput,
204+
next: Next<WorkflowClientInterceptor, 'describe'>
205+
): Promise<DescribeWorkflowExecutionResponse> {
206+
return await instrument({
207+
tracer: this.tracer,
208+
spanName: SpanName.WORKFLOW_DESCRIBE,
209+
fn: async (span) => {
210+
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
211+
if (input.workflowExecution.runId) {
212+
span.setAttribute(RUN_ID_ATTR_KEY, input.workflowExecution.runId);
213+
}
214+
return await next(input);
215+
},
216+
});
217+
}
45218
}

packages/interceptors-opentelemetry/src/instrumentation.ts

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,20 @@ import {
1414
export const TRACE_HEADER = '_tracer-data';
1515
/** As in workflow run id */
1616
export const RUN_ID_ATTR_KEY = 'run_id';
17+
/** As in workflow id */
18+
export const WORKFLOW_ID_ATTR_KEY = 'temporalWorkflowId';
19+
/** As in activity id */
20+
export const ACTIVITY_ID_ATTR_KEY = 'temporalActivityId';
21+
/** As in update id */
22+
export const UPDATE_ID_ATTR_KEY = 'temporalUpdateId';
23+
/** As in termination reason */
24+
export const TERMINATE_REASON_ATTR_KEY = 'temporalTerminateReason';
25+
/** As in Nexus service */
26+
export const NEXUS_SERVICE_ATTR_KEY = 'temporalNexusService';
27+
/** As in Nexus operation */
28+
export const NEXUS_OPERATION_ATTR_KEY = 'temporalNexusOperation';
29+
/** As in Nexus endpoint */
30+
export const NEXUS_ENDPOINT_ATTR_KEY = 'temporalNexusEndpoint';
1731

1832
const payloadConverter = defaultPayloadConverter;
1933

@@ -48,20 +62,41 @@ async function wrapWithSpan<T>(
4862
span.setStatus({ code: otel.SpanStatusCode.OK });
4963
return ret;
5064
} catch (err: any) {
51-
const isBenignErr = err instanceof ApplicationFailure && err.category === ApplicationFailureCategory.BENIGN;
52-
if (acceptableErrors === undefined || !acceptableErrors(err)) {
53-
const statusCode = isBenignErr ? otel.SpanStatusCode.UNSET : otel.SpanStatusCode.ERROR;
54-
span.setStatus({ code: statusCode, message: (err as Error).message ?? String(err) });
55-
span.recordException(err);
56-
} else {
57-
span.setStatus({ code: otel.SpanStatusCode.OK });
58-
}
65+
maybeAddErrorToSpan(err, span, acceptableErrors);
5966
throw err;
6067
} finally {
6168
span.end();
6269
}
6370
}
6471

72+
function wrapWithSpanSync<T>(
73+
span: otel.Span,
74+
fn: (span: otel.Span) => T,
75+
acceptableErrors?: (err: unknown) => boolean
76+
): T {
77+
try {
78+
const ret = fn(span);
79+
span.setStatus({ code: otel.SpanStatusCode.OK });
80+
return ret;
81+
} catch (err: any) {
82+
maybeAddErrorToSpan(err, span, acceptableErrors);
83+
throw err;
84+
} finally {
85+
span.end();
86+
}
87+
}
88+
89+
function maybeAddErrorToSpan(err: any, span: otel.Span, acceptableErrors?: (err: unknown) => boolean): void {
90+
const isBenignErr = err instanceof ApplicationFailure && err.category === ApplicationFailureCategory.BENIGN;
91+
if (acceptableErrors === undefined || !acceptableErrors(err)) {
92+
const statusCode = isBenignErr ? otel.SpanStatusCode.UNSET : otel.SpanStatusCode.ERROR;
93+
span.setStatus({ code: statusCode, message: (err as Error).message ?? String(err) });
94+
span.recordException(err);
95+
} else {
96+
span.setStatus({ code: otel.SpanStatusCode.OK });
97+
}
98+
}
99+
65100
export interface InstrumentOptions<T> {
66101
tracer: otel.Tracer;
67102
spanName: string;
@@ -70,6 +105,8 @@ export interface InstrumentOptions<T> {
70105
acceptableErrors?: (err: unknown) => boolean;
71106
}
72107

108+
export type InstrumentOptionsSync<T> = Omit<InstrumentOptions<T>, 'fn'> & { fn: (span: otel.Span) => T };
109+
73110
/**
74111
* Wraps `fn` in a span which ends when function returns or throws
75112
*/
@@ -87,3 +124,12 @@ export async function instrument<T>({
87124
}
88125
return await tracer.startActiveSpan(spanName, async (span) => await wrapWithSpan(span, fn, acceptableErrors));
89126
}
127+
128+
export function instrumentSync<T>({ tracer, spanName, fn, context, acceptableErrors }: InstrumentOptionsSync<T>): T {
129+
if (context) {
130+
return otel.context.with(context, () => {
131+
return tracer.startActiveSpan(spanName, (span) => wrapWithSpanSync(span, fn, acceptableErrors));
132+
});
133+
}
134+
return tracer.startActiveSpan(spanName, (span) => wrapWithSpanSync(span, fn, acceptableErrors));
135+
}

packages/interceptors-opentelemetry/src/worker/index.ts

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,16 @@ import type {
88
ActivityOutboundCallsInterceptor,
99
InjectedSink,
1010
GetLogAttributesInput,
11+
GetMetricTagsInput,
1112
ActivityExecuteInput,
1213
} from '@temporalio/worker';
13-
import { instrument, extractContextFromHeaders } from '../instrumentation';
14+
import {
15+
instrument,
16+
extractContextFromHeaders,
17+
WORKFLOW_ID_ATTR_KEY,
18+
RUN_ID_ATTR_KEY,
19+
ACTIVITY_ID_ATTR_KEY,
20+
} from '../instrumentation';
1421
import { type OpenTelemetryWorkflowExporter, type SerializableSpan, SpanName, SPAN_DELIMITER } from '../workflow';
1522

1623
export interface InterceptorOptions {
@@ -36,14 +43,24 @@ export class OpenTelemetryActivityInboundInterceptor implements ActivityInboundC
3643
async execute(input: ActivityExecuteInput, next: Next<ActivityInboundCallsInterceptor, 'execute'>): Promise<unknown> {
3744
const context = extractContextFromHeaders(input.headers);
3845
const spanName = `${SpanName.ACTIVITY_EXECUTE}${SPAN_DELIMITER}${this.ctx.info.activityType}`;
39-
return await instrument({ tracer: this.tracer, spanName, fn: () => next(input), context });
46+
return await instrument({
47+
tracer: this.tracer,
48+
spanName,
49+
fn: (span) => {
50+
span.setAttribute(WORKFLOW_ID_ATTR_KEY, this.ctx.info.workflowExecution.workflowId);
51+
span.setAttribute(RUN_ID_ATTR_KEY, this.ctx.info.workflowExecution.runId);
52+
span.setAttribute(ACTIVITY_ID_ATTR_KEY, this.ctx.info.activityId);
53+
return next(input);
54+
},
55+
context,
56+
});
4057
}
4158
}
4259

4360
/**
44-
* Intercepts calls to emit logs from an Activity.
61+
* Intercepts calls to emit logs and metrics from an Activity.
4562
*
46-
* Attach OpenTelemetry context tracing attributes to emitted log messages, if appropriate.
63+
* Attach OpenTelemetry context tracing attributes to emitted log messages and metrics, if appropriate.
4764
*/
4865
export class OpenTelemetryActivityOutboundInterceptor implements ActivityOutboundCallsInterceptor {
4966
constructor(protected readonly ctx: ActivityContext) {}
@@ -65,6 +82,24 @@ export class OpenTelemetryActivityOutboundInterceptor implements ActivityOutboun
6582
return next(input);
6683
}
6784
}
85+
86+
public getMetricTags(
87+
input: GetMetricTagsInput,
88+
next: Next<ActivityOutboundCallsInterceptor, 'getMetricTags'>
89+
): GetMetricTagsInput {
90+
const span = otel.trace.getSpan(otel.context.active());
91+
const spanContext = span?.spanContext();
92+
if (spanContext && otel.isSpanContextValid(spanContext)) {
93+
return next({
94+
trace_id: spanContext.traceId,
95+
span_id: spanContext.spanId,
96+
trace_flags: `0${spanContext.traceFlags.toString(16)}`,
97+
...input,
98+
});
99+
} else {
100+
return next(input);
101+
}
102+
}
68103
}
69104

70105
/**

0 commit comments

Comments
 (0)