Skip to content

Commit dad213a

Browse files
feat: implement remaining interceptors with OTEL
1 parent 34ff54b commit dad213a

File tree

6 files changed

+397
-18
lines changed

6 files changed

+397
-18
lines changed

packages/interceptors-opentelemetry/src/client/index.ts

Lines changed: 161 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,30 @@
11
import * as otel from '@opentelemetry/api';
2-
import type { Next, WorkflowSignalInput, WorkflowStartInput, WorkflowClientInterceptor } from '@temporalio/client';
3-
import { instrument, headersWithContext, RUN_ID_ATTR_KEY } from '../instrumentation';
2+
import type {
3+
Next,
4+
WorkflowSignalInput,
5+
WorkflowSignalWithStartInput,
6+
WorkflowStartInput,
7+
WorkflowStartOutput,
8+
WorkflowStartUpdateInput,
9+
WorkflowStartUpdateOutput,
10+
WorkflowStartUpdateWithStartInput,
11+
WorkflowStartUpdateWithStartOutput,
12+
WorkflowQueryInput,
13+
WorkflowTerminateInput,
14+
WorkflowCancelInput,
15+
WorkflowDescribeInput,
16+
WorkflowClientInterceptor,
17+
TerminateWorkflowExecutionResponse,
18+
RequestCancelWorkflowExecutionResponse,
19+
DescribeWorkflowExecutionResponse,
20+
} from '@temporalio/client';
21+
import {
22+
instrument,
23+
headersWithContext,
24+
RUN_ID_ATTR_KEY,
25+
WORKFLOW_ID_ATTR_KEY,
26+
TERMINATE_REASON_ATTR_KEY,
27+
} from '../instrumentation';
428
import { SpanName, SPAN_DELIMITER } from '../workflow';
529

630
export interface InterceptorOptions {
@@ -12,7 +36,7 @@ export interface InterceptorOptions {
1236
*
1337
* Wraps the operation in an opentelemetry Span and passes it to the Workflow via headers.
1438
*/
15-
export class OpenTelemetryWorkflowClientInterceptor implements WorkflowClientInterceptor {
39+
export class OpenTelemetryWorkflowClientInterceptor implements Required<WorkflowClientInterceptor> {
1640
protected readonly tracer: otel.Tracer;
1741

1842
constructor(options?: InterceptorOptions) {
@@ -42,4 +66,138 @@ export class OpenTelemetryWorkflowClientInterceptor implements WorkflowClientInt
4266
},
4367
});
4468
}
69+
70+
async startWithDetails(
71+
input: WorkflowStartInput,
72+
next: Next<WorkflowClientInterceptor, 'startWithDetails'>
73+
): Promise<WorkflowStartOutput> {
74+
return await instrument({
75+
tracer: this.tracer,
76+
spanName: `${SpanName.WORKFLOW_START}${SPAN_DELIMITER}${input.workflowType}`,
77+
fn: async (span) => {
78+
const headers = headersWithContext(input.headers);
79+
const output = await next({ ...input, headers });
80+
span.setAttribute(RUN_ID_ATTR_KEY, output.runId);
81+
return output;
82+
},
83+
});
84+
}
85+
86+
async startUpdate(
87+
input: WorkflowStartUpdateInput,
88+
next: Next<WorkflowClientInterceptor, 'startUpdate'>
89+
): Promise<WorkflowStartUpdateOutput> {
90+
return await instrument({
91+
tracer: this.tracer,
92+
spanName: `${SpanName.WORKFLOW_UPDATE}${SPAN_DELIMITER}${input.updateName}`,
93+
fn: async (span) => {
94+
const headers = headersWithContext(input.headers);
95+
const output = await next({ ...input, headers });
96+
span.setAttribute(RUN_ID_ATTR_KEY, output.workflowRunId);
97+
return output;
98+
},
99+
});
100+
}
101+
102+
async startUpdateWithStart(
103+
input: WorkflowStartUpdateWithStartInput,
104+
next: Next<WorkflowClientInterceptor, 'startUpdateWithStart'>
105+
): Promise<WorkflowStartUpdateWithStartOutput> {
106+
return await instrument({
107+
tracer: this.tracer,
108+
spanName: `${SpanName.WORKFLOW_UPDATE_WITH_START}${SPAN_DELIMITER}${input.workflowType}${SPAN_DELIMITER}${input.updateName}`,
109+
fn: async (span) => {
110+
const workflowStartHeaders = headersWithContext(input.workflowStartHeaders);
111+
const updateHeaders = headersWithContext(input.updateHeaders);
112+
const output = await next({ ...input, workflowStartHeaders, updateHeaders });
113+
span.setAttribute(RUN_ID_ATTR_KEY, output.workflowExecution.runId ?? '');
114+
return output;
115+
},
116+
});
117+
}
118+
119+
async signalWithStart(
120+
input: WorkflowSignalWithStartInput,
121+
next: Next<WorkflowClientInterceptor, 'signalWithStart'>
122+
): Promise<string> {
123+
return await instrument({
124+
tracer: this.tracer,
125+
spanName: `${SpanName.WORKFLOW_SIGNAL_WITH_START}${SPAN_DELIMITER}${input.workflowType}${SPAN_DELIMITER}${input.signalName}`,
126+
fn: async (span) => {
127+
const headers = headersWithContext(input.headers);
128+
const runId = await next({ ...input, headers });
129+
span.setAttribute(RUN_ID_ATTR_KEY, runId);
130+
return runId;
131+
},
132+
});
133+
}
134+
135+
async query(input: WorkflowQueryInput, next: Next<WorkflowClientInterceptor, 'query'>): Promise<unknown> {
136+
return await instrument({
137+
tracer: this.tracer,
138+
spanName: `${SpanName.WORKFLOW_QUERY}${SPAN_DELIMITER}${input.queryType}`,
139+
fn: async (span) => {
140+
const headers = headersWithContext(input.headers);
141+
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
142+
if (input.workflowExecution.runId) {
143+
span.setAttribute(RUN_ID_ATTR_KEY, input.workflowExecution.runId);
144+
}
145+
return await next({ ...input, headers });
146+
},
147+
});
148+
}
149+
150+
async terminate(
151+
input: WorkflowTerminateInput,
152+
next: Next<WorkflowClientInterceptor, 'terminate'>
153+
): Promise<TerminateWorkflowExecutionResponse> {
154+
return await instrument({
155+
tracer: this.tracer,
156+
spanName: SpanName.WORKFLOW_TERMINATE,
157+
fn: async (span) => {
158+
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
159+
if (input.workflowExecution.runId) {
160+
span.setAttribute(RUN_ID_ATTR_KEY, input.workflowExecution.runId);
161+
}
162+
if (input.reason) {
163+
span.setAttribute(TERMINATE_REASON_ATTR_KEY, input.reason);
164+
}
165+
return await next(input);
166+
},
167+
});
168+
}
169+
170+
async cancel(
171+
input: WorkflowCancelInput,
172+
next: Next<WorkflowClientInterceptor, 'cancel'>
173+
): Promise<RequestCancelWorkflowExecutionResponse> {
174+
return await instrument({
175+
tracer: this.tracer,
176+
spanName: SpanName.WORKFLOW_CANCEL,
177+
fn: async (span) => {
178+
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
179+
if (input.workflowExecution.runId) {
180+
span.setAttribute(RUN_ID_ATTR_KEY, input.workflowExecution.runId);
181+
}
182+
return await next(input);
183+
},
184+
});
185+
}
186+
187+
async describe(
188+
input: WorkflowDescribeInput,
189+
next: Next<WorkflowClientInterceptor, 'describe'>
190+
): Promise<DescribeWorkflowExecutionResponse> {
191+
return await instrument({
192+
tracer: this.tracer,
193+
spanName: SpanName.WORKFLOW_DESCRIBE,
194+
fn: async (span) => {
195+
span.setAttribute(WORKFLOW_ID_ATTR_KEY, input.workflowExecution.workflowId);
196+
if (input.workflowExecution.runId) {
197+
span.setAttribute(RUN_ID_ATTR_KEY, input.workflowExecution.runId);
198+
}
199+
return await next(input);
200+
},
201+
});
202+
}
45203
}

packages/interceptors-opentelemetry/src/instrumentation.ts

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,17 @@ import {
1212

1313
/** Default trace header for opentelemetry interceptors */
1414
export const TRACE_HEADER = '_tracer-data';
15+
16+
// Span attribute keys
1517
/** As in workflow run id */
1618
export const RUN_ID_ATTR_KEY = 'run_id';
19+
export const WORKFLOW_ID_ATTR_KEY = 'workflow_id';
20+
export const EAGER_START_ATTR_KEY = 'eager_start';
21+
export const TERMINATE_REASON_ATTR_KEY = 'terminate_reason';
22+
export const TIMER_DURATION_MS_ATTR_KEY = 'timer_duration_ms';
23+
export const NEXUS_SERVICE_ATTR_KEY = 'nexus_service';
24+
export const NEXUS_OPERATION_ATTR_KEY = 'nexus_operation';
25+
export const NEXUS_ENDPOINT_ATTR_KEY = 'nexus_endpoint';
1726

1827
const payloadConverter = defaultPayloadConverter;
1928

@@ -48,20 +57,41 @@ async function wrapWithSpan<T>(
4857
span.setStatus({ code: otel.SpanStatusCode.OK });
4958
return ret;
5059
} catch (err: any) {
51-
const isBenignErr = err instanceof ApplicationFailure && err.category === ApplicationFailureCategory.BENIGN;
52-
if (acceptableErrors === undefined || !acceptableErrors(err)) {
53-
const statusCode = isBenignErr ? otel.SpanStatusCode.UNSET : otel.SpanStatusCode.ERROR;
54-
span.setStatus({ code: statusCode, message: (err as Error).message ?? String(err) });
55-
span.recordException(err);
56-
} else {
57-
span.setStatus({ code: otel.SpanStatusCode.OK });
58-
}
60+
handleError(err, span, acceptableErrors);
5961
throw err;
6062
} finally {
6163
span.end();
6264
}
6365
}
6466

67+
function wrapWithSpanSync<T>(
68+
span: otel.Span,
69+
fn: (span: otel.Span) => T,
70+
acceptableErrors?: (err: unknown) => boolean
71+
): T {
72+
try {
73+
const ret = fn(span);
74+
span.setStatus({ code: otel.SpanStatusCode.OK });
75+
return ret;
76+
} catch (err: any) {
77+
handleError(err, span, acceptableErrors);
78+
throw err;
79+
} finally {
80+
span.end();
81+
}
82+
}
83+
84+
function handleError(err: any, span: otel.Span, acceptableErrors?: (err: unknown) => boolean): void {
85+
const isBenignErr = err instanceof ApplicationFailure && err.category === ApplicationFailureCategory.BENIGN;
86+
if (acceptableErrors === undefined || !acceptableErrors(err)) {
87+
const statusCode = isBenignErr ? otel.SpanStatusCode.UNSET : otel.SpanStatusCode.ERROR;
88+
span.setStatus({ code: statusCode, message: (err as Error).message ?? String(err) });
89+
span.recordException(err);
90+
} else {
91+
span.setStatus({ code: otel.SpanStatusCode.OK });
92+
}
93+
}
94+
6595
export interface InstrumentOptions<T> {
6696
tracer: otel.Tracer;
6797
spanName: string;
@@ -70,6 +100,8 @@ export interface InstrumentOptions<T> {
70100
acceptableErrors?: (err: unknown) => boolean;
71101
}
72102

103+
export type InstrumentOptionsSync<T> = Omit<InstrumentOptions<T>, 'fn'> & { fn: (span: otel.Span) => T };
104+
73105
/**
74106
* Wraps `fn` in a span which ends when function returns or throws
75107
*/
@@ -87,3 +119,12 @@ export async function instrument<T>({
87119
}
88120
return await tracer.startActiveSpan(spanName, async (span) => await wrapWithSpan(span, fn, acceptableErrors));
89121
}
122+
123+
export function instrumentSync<T>({ tracer, spanName, fn, context, acceptableErrors }: InstrumentOptionsSync<T>): T {
124+
if (context) {
125+
return otel.context.with(context, () => {
126+
return tracer.startActiveSpan(spanName, (span) => wrapWithSpanSync(span, fn, acceptableErrors));
127+
});
128+
}
129+
return tracer.startActiveSpan(spanName, (span) => wrapWithSpanSync(span, fn, acceptableErrors));
130+
}

packages/interceptors-opentelemetry/src/worker/index.ts

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import type {
88
ActivityOutboundCallsInterceptor,
99
InjectedSink,
1010
GetLogAttributesInput,
11+
GetMetricTagsInput,
1112
ActivityExecuteInput,
1213
} from '@temporalio/worker';
1314
import { instrument, extractContextFromHeaders } from '../instrumentation';
@@ -23,7 +24,7 @@ export interface InterceptorOptions {
2324
* Wraps the operation in an opentelemetry Span and links it to a parent Span context if one is
2425
* provided in the Activity input headers.
2526
*/
26-
export class OpenTelemetryActivityInboundInterceptor implements ActivityInboundCallsInterceptor {
27+
export class OpenTelemetryActivityInboundInterceptor implements Required<ActivityInboundCallsInterceptor> {
2728
protected readonly tracer: otel.Tracer;
2829

2930
constructor(
@@ -41,11 +42,11 @@ export class OpenTelemetryActivityInboundInterceptor implements ActivityInboundC
4142
}
4243

4344
/**
44-
* Intercepts calls to emit logs from an Activity.
45+
* Intercepts calls to emit logs and metrics from an Activity.
4546
*
46-
* Attach OpenTelemetry context tracing attributes to emitted log messages, if appropriate.
47+
* Attach OpenTelemetry context tracing attributes to emitted log messages and metrics, if appropriate.
4748
*/
48-
export class OpenTelemetryActivityOutboundInterceptor implements ActivityOutboundCallsInterceptor {
49+
export class OpenTelemetryActivityOutboundInterceptor implements Required<ActivityOutboundCallsInterceptor> {
4950
constructor(protected readonly ctx: ActivityContext) {}
5051

5152
public getLogAttributes(
@@ -65,6 +66,24 @@ export class OpenTelemetryActivityOutboundInterceptor implements ActivityOutboun
6566
return next(input);
6667
}
6768
}
69+
70+
public getMetricTags(
71+
input: GetMetricTagsInput,
72+
next: Next<ActivityOutboundCallsInterceptor, 'getMetricTags'>
73+
): GetMetricTagsInput {
74+
const span = otel.trace.getSpan(otel.context.active());
75+
const spanContext = span?.spanContext();
76+
if (spanContext && otel.isSpanContextValid(spanContext)) {
77+
return next({
78+
trace_id: spanContext.traceId,
79+
span_id: spanContext.spanId,
80+
trace_flags: `0${spanContext.traceFlags.toString(16)}`,
81+
...input,
82+
});
83+
} else {
84+
return next(input);
85+
}
86+
}
6887
}
6988

7089
/**

packages/interceptors-opentelemetry/src/workflow/definitions.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,36 @@ export enum SpanName {
5454
*/
5555
WORKFLOW_SIGNAL_WITH_START = 'SignalWithStartWorkflow',
5656

57+
/**
58+
* Workflow is queried
59+
*/
60+
WORKFLOW_QUERY = 'QueryWorkflow',
61+
62+
/**
63+
* Workflow is updated
64+
*/
65+
WORKFLOW_UPDATE = 'UpdateWorkflow',
66+
67+
/**
68+
* Workflow is started with an update
69+
*/
70+
WORKFLOW_UPDATE_WITH_START = 'UpdateWithStartWorkflow',
71+
72+
/**
73+
* Workflow is terminated
74+
*/
75+
WORKFLOW_TERMINATE = 'TerminateWorkflow',
76+
77+
/**
78+
* Workflow is cancelled
79+
*/
80+
WORKFLOW_CANCEL = 'CancelWorkflow',
81+
82+
/**
83+
* Workflow is described
84+
*/
85+
WORKFLOW_DESCRIBE = 'DescribeWorkflow',
86+
5787
/**
5888
* Workflow run is executing
5989
*/
@@ -74,6 +104,14 @@ export enum SpanName {
74104
* Workflow is continuing as new
75105
*/
76106
CONTINUE_AS_NEW = 'ContinueAsNew',
107+
/**
108+
* Workflow timer is started
109+
*/
110+
WORKFLOW_TIMER = 'StartTimer',
111+
/**
112+
* Nexus operation is started
113+
*/
114+
NEXUS_OPERATION_START = 'StartNexusOperation',
77115
}
78116

79117
export const SPAN_DELIMITER = ':';

0 commit comments

Comments
 (0)