Skip to content

Commit 98b7004

Browse files
authored
Add RawValue support for non-converted Payloads (#1664)
1 parent 260b570 commit 98b7004

File tree

3 files changed

+61
-9
lines changed

3 files changed

+61
-9
lines changed

packages/common/src/converter/payload-converter.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,25 @@ export function mapFromPayloads<K extends string, T = unknown>(
100100
) as Record<K, T>;
101101
}
102102

103+
export declare const rawPayloadTypeBrand: unique symbol;
104+
/**
105+
* RawValue is a wrapper over a payload.
106+
* A payload that belongs to a RawValue is special in that it bypasses user-defined payload converters,
107+
* instead using the default payload converter. The payload still undergoes codec conversion.
108+
*/
109+
export class RawValue<T = unknown> {
110+
private readonly _payload: Payload;
111+
private readonly [rawPayloadTypeBrand]: T = undefined as T;
112+
113+
constructor(value: T, payloadConverter: PayloadConverter = defaultPayloadConverter) {
114+
this._payload = payloadConverter.toPayload(value);
115+
}
116+
117+
get payload(): Payload {
118+
return this._payload;
119+
}
120+
}
121+
103122
export interface PayloadConverterWithEncoding {
104123
/**
105124
* Converts a value to a {@link Payload}.
@@ -143,6 +162,9 @@ export class CompositePayloadConverter implements PayloadConverter {
143162
* Returns the first successful result, throws {@link ValueError} if there is no converter that can handle the value.
144163
*/
145164
public toPayload<T>(value: T): Payload {
165+
if (value instanceof RawValue) {
166+
return value.payload;
167+
}
146168
for (const converter of this.converters) {
147169
const result = converter.toPayload(value);
148170
if (result !== undefined) {
@@ -160,6 +182,7 @@ export class CompositePayloadConverter implements PayloadConverter {
160182
if (payload.metadata === undefined || payload.metadata === null) {
161183
throw new ValueError('Missing payload metadata');
162184
}
185+
163186
const encoding = decode(payload.metadata[METADATA_ENCODING_KEY]);
164187
const converter = this.converterByEncoding.get(encoding);
165188
if (converter === undefined) {

packages/test/src/test-integration-workflows.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
ActivityCancellationType,
1515
ApplicationFailure,
1616
defineSearchAttributeKey,
17+
RawValue,
1718
SearchAttributePair,
1819
SearchAttributeType,
1920
TypedSearchAttributes,
@@ -1340,6 +1341,31 @@ test('can register search attributes to dev server', async (t) => {
13401341
await env.teardown();
13411342
});
13421343

1344+
export async function rawValueWorkflow(value: unknown): Promise<RawValue> {
1345+
const { rawValueActivity } = workflow.proxyActivities({ startToCloseTimeout: '10s' });
1346+
return await rawValueActivity(new RawValue(value));
1347+
}
1348+
1349+
test('workflow and activity can receive/return RawValue', async (t) => {
1350+
const { executeWorkflow, createWorker } = helpers(t);
1351+
const worker = await createWorker({
1352+
activities: {
1353+
async rawValueActivity(value: unknown): Promise<RawValue> {
1354+
return new RawValue(value);
1355+
},
1356+
},
1357+
});
1358+
1359+
await worker.runUntil(async () => {
1360+
const testValue = 'test';
1361+
const rawValue = new RawValue(testValue);
1362+
const res = await executeWorkflow(rawValueWorkflow, {
1363+
args: [rawValue],
1364+
});
1365+
t.deepEqual(res, testValue);
1366+
});
1367+
});
1368+
13431369
export async function ChildWorkflowInfo(): Promise<workflow.RootWorkflowInfo | undefined> {
13441370
let blocked = true;
13451371
workflow.setHandler(unblockSignal, () => {

packages/workflow/src/internals.ts

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
WorkflowUpdateValidatorType,
2121
mapFromPayloads,
2222
fromPayloadsAtIndex,
23+
RawValue,
2324
WorkflowFunctionWithOptions,
2425
VersioningBehavior,
2526
WorkflowDefinitionOptions,
@@ -41,13 +42,13 @@ import {
4142
DefaultSignalHandler,
4243
StackTraceSDKInfo,
4344
StackTraceFileSlice,
44-
EnhancedStackTrace,
4545
StackTraceFileLocation,
4646
WorkflowInfo,
4747
WorkflowCreateOptionsInternal,
4848
ActivationCompletion,
4949
DefaultUpdateHandler,
5050
DefaultQueryHandler,
51+
EnhancedStackTrace,
5152
} from './interfaces';
5253
import { type SinkCall } from './sinks';
5354
import { untrackPromise } from './stack-helpers';
@@ -263,17 +264,19 @@ export class Activator implements ActivationHandler {
263264
'__stack_trace',
264265
{
265266
handler: () => {
266-
return this.getStackTraces()
267-
.map((s) => s.formatted)
268-
.join('\n\n');
267+
return new RawValue<string>(
268+
this.getStackTraces()
269+
.map((s) => s.formatted)
270+
.join('\n\n')
271+
);
269272
},
270273
description: 'Returns a sensible stack trace.',
271274
},
272275
],
273276
[
274277
'__enhanced_stack_trace',
275278
{
276-
handler: (): EnhancedStackTrace => {
279+
handler: (): RawValue => {
277280
const { sourceMap } = this;
278281
const sdk: StackTraceSDKInfo = { name: 'typescript', version: pkg.version };
279282
const stacks = this.getStackTraces().map(({ structured: locations }) => ({ locations }));
@@ -293,15 +296,15 @@ export class Activator implements ActivationHandler {
293296
}
294297
}
295298
}
296-
return { sdk, stacks, sources };
299+
return new RawValue<EnhancedStackTrace>({ sdk, stacks, sources });
297300
},
298301
description: 'Returns a stack trace annotated with source information.',
299302
},
300303
],
301304
[
302305
'__temporal_workflow_metadata',
303306
{
304-
handler: (): temporal.api.sdk.v1.IWorkflowMetadata => {
307+
handler: (): RawValue => {
305308
const workflowType = this.info.workflowType;
306309
const queryDefinitions = Array.from(this.queryHandlers.entries()).map(([name, value]) => ({
307310
name,
@@ -315,14 +318,14 @@ export class Activator implements ActivationHandler {
315318
name,
316319
description: value.description,
317320
}));
318-
return {
321+
return new RawValue<temporal.api.sdk.v1.IWorkflowMetadata>({
319322
definition: {
320323
type: workflowType,
321324
queryDefinitions,
322325
signalDefinitions,
323326
updateDefinitions,
324327
},
325-
};
328+
});
326329
},
327330
description: 'Returns metadata associated with this workflow.',
328331
},

0 commit comments

Comments
 (0)