diff --git a/packages/common/src/converter/payload-converter.ts b/packages/common/src/converter/payload-converter.ts index 662eb1a99..5baa8278d 100644 --- a/packages/common/src/converter/payload-converter.ts +++ b/packages/common/src/converter/payload-converter.ts @@ -100,6 +100,25 @@ export function mapFromPayloads( ) as Record; } +export declare const rawPayloadTypeBrand: unique symbol; +/** + * RawValue is a wrapper over a payload. + * A payload that belongs to a RawValue is special in that it bypasses user-defined payload converters, + * instead using the default payload converter. The payload still undergoes codec conversion. + */ +export class RawValue { + private readonly _payload: Payload; + private readonly [rawPayloadTypeBrand]: T = undefined as T; + + constructor(value: T, payloadConverter: PayloadConverter = defaultPayloadConverter) { + this._payload = payloadConverter.toPayload(value); + } + + get payload(): Payload { + return this._payload; + } +} + export interface PayloadConverterWithEncoding { /** * Converts a value to a {@link Payload}. @@ -143,6 +162,9 @@ export class CompositePayloadConverter implements PayloadConverter { * Returns the first successful result, throws {@link ValueError} if there is no converter that can handle the value. */ public toPayload(value: T): Payload { + if (value instanceof RawValue) { + return value.payload; + } for (const converter of this.converters) { const result = converter.toPayload(value); if (result !== undefined) { @@ -160,6 +182,7 @@ export class CompositePayloadConverter implements PayloadConverter { if (payload.metadata === undefined || payload.metadata === null) { throw new ValueError('Missing payload metadata'); } + const encoding = decode(payload.metadata[METADATA_ENCODING_KEY]); const converter = this.converterByEncoding.get(encoding); if (converter === undefined) { diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index ea05b71f6..cef4061e4 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -14,6 +14,7 @@ import { ActivityCancellationType, ApplicationFailure, defineSearchAttributeKey, + RawValue, SearchAttributePair, SearchAttributeType, TypedSearchAttributes, @@ -1340,6 +1341,31 @@ test('can register search attributes to dev server', async (t) => { await env.teardown(); }); +export async function rawValueWorkflow(value: unknown): Promise { + const { rawValueActivity } = workflow.proxyActivities({ startToCloseTimeout: '10s' }); + return await rawValueActivity(new RawValue(value)); +} + +test('workflow and activity can receive/return RawValue', async (t) => { + const { executeWorkflow, createWorker } = helpers(t); + const worker = await createWorker({ + activities: { + async rawValueActivity(value: unknown): Promise { + return new RawValue(value); + }, + }, + }); + + await worker.runUntil(async () => { + const testValue = 'test'; + const rawValue = new RawValue(testValue); + const res = await executeWorkflow(rawValueWorkflow, { + args: [rawValue], + }); + t.deepEqual(res, testValue); + }); +}); + export async function ChildWorkflowInfo(): Promise { let blocked = true; workflow.setHandler(unblockSignal, () => { diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index b8e86b2a2..231560375 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -20,6 +20,7 @@ import { WorkflowUpdateValidatorType, mapFromPayloads, fromPayloadsAtIndex, + RawValue, WorkflowFunctionWithOptions, VersioningBehavior, WorkflowDefinitionOptions, @@ -41,13 +42,13 @@ import { DefaultSignalHandler, StackTraceSDKInfo, StackTraceFileSlice, - EnhancedStackTrace, StackTraceFileLocation, WorkflowInfo, WorkflowCreateOptionsInternal, ActivationCompletion, DefaultUpdateHandler, DefaultQueryHandler, + EnhancedStackTrace, } from './interfaces'; import { type SinkCall } from './sinks'; import { untrackPromise } from './stack-helpers'; @@ -263,9 +264,11 @@ export class Activator implements ActivationHandler { '__stack_trace', { handler: () => { - return this.getStackTraces() - .map((s) => s.formatted) - .join('\n\n'); + return new RawValue( + this.getStackTraces() + .map((s) => s.formatted) + .join('\n\n') + ); }, description: 'Returns a sensible stack trace.', }, @@ -273,7 +276,7 @@ export class Activator implements ActivationHandler { [ '__enhanced_stack_trace', { - handler: (): EnhancedStackTrace => { + handler: (): RawValue => { const { sourceMap } = this; const sdk: StackTraceSDKInfo = { name: 'typescript', version: pkg.version }; const stacks = this.getStackTraces().map(({ structured: locations }) => ({ locations })); @@ -293,7 +296,7 @@ export class Activator implements ActivationHandler { } } } - return { sdk, stacks, sources }; + return new RawValue({ sdk, stacks, sources }); }, description: 'Returns a stack trace annotated with source information.', }, @@ -301,7 +304,7 @@ export class Activator implements ActivationHandler { [ '__temporal_workflow_metadata', { - handler: (): temporal.api.sdk.v1.IWorkflowMetadata => { + handler: (): RawValue => { const workflowType = this.info.workflowType; const queryDefinitions = Array.from(this.queryHandlers.entries()).map(([name, value]) => ({ name, @@ -315,14 +318,14 @@ export class Activator implements ActivationHandler { name, description: value.description, })); - return { + return new RawValue({ definition: { type: workflowType, queryDefinitions, signalDefinitions, updateDefinitions, }, - }; + }); }, description: 'Returns metadata associated with this workflow.', },