diff --git a/packages/client/src/schedule-helpers.ts b/packages/client/src/schedule-helpers.ts index cd6bf2fc8..2d5904102 100644 --- a/packages/client/src/schedule-helpers.ts +++ b/packages/client/src/schedule-helpers.ts @@ -7,6 +7,7 @@ import { extractWorkflowType, LoadedDataConverter, } from '@temporalio/common'; +import { encodeUserMetadata, decodeUserMetadata } from '@temporalio/common/lib/user-metadata'; import { encodeUnifiedSearchAttributes, decodeSearchAttributes, @@ -196,8 +197,7 @@ export function decodeOptionalStructuredCalendarSpecs( } export function compileScheduleOptions(options: ScheduleOptions): CompiledScheduleOptions { - const workflowTypeOrFunc = options.action.workflowType; - const workflowType = extractWorkflowType(workflowTypeOrFunc); + const workflowType = extractWorkflowType(options.action.workflowType); return { ...options, action: { @@ -270,6 +270,7 @@ export async function encodeScheduleAction( } : undefined, header: { fields: headers }, + userMetadata: await encodeUserMetadata(dataConverter, action.staticSummary, action.staticDetails), priority: action.priority ? compilePriority(action.priority) : undefined, }, }; @@ -320,6 +321,7 @@ export async function decodeScheduleAction( pb: temporal.api.schedule.v1.IScheduleAction ): Promise { if (pb.startWorkflow) { + const { staticSummary, staticDetails } = await decodeUserMetadata(dataConverter, pb.startWorkflow?.userMetadata); return { type: 'startWorkflow', // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -336,6 +338,8 @@ export async function decodeScheduleAction( workflowExecutionTimeout: optionalTsToMs(pb.startWorkflow.workflowExecutionTimeout), workflowRunTimeout: optionalTsToMs(pb.startWorkflow.workflowRunTimeout), workflowTaskTimeout: optionalTsToMs(pb.startWorkflow.workflowTaskTimeout), + staticSummary, + staticDetails, priority: decodePriority(pb.startWorkflow.priority), }; } diff --git a/packages/client/src/schedule-types.ts b/packages/client/src/schedule-types.ts index e75e53676..a695d6c87 100644 --- a/packages/client/src/schedule-types.ts +++ b/packages/client/src/schedule-types.ts @@ -783,6 +783,8 @@ export type ScheduleOptionsStartWorkflowAction = { | 'workflowExecutionTimeout' | 'workflowRunTimeout' | 'workflowTaskTimeout' + | 'staticDetails' + | 'staticSummary' > & { /** * Workflow id to use when starting. Assign a meaningful business id. @@ -815,6 +817,8 @@ export type ScheduleDescriptionStartWorkflowAction = ScheduleSummaryStartWorkflo | 'workflowExecutionTimeout' | 'workflowRunTimeout' | 'workflowTaskTimeout' + | 'staticSummary' + | 'staticDetails' | 'priority' >; diff --git a/packages/client/src/types.ts b/packages/client/src/types.ts index 148781a19..c171a99df 100644 --- a/packages/client/src/types.ts +++ b/packages/client/src/types.ts @@ -69,7 +69,23 @@ export type WorkflowExecutionDescription = Replace< { raw: DescribeWorkflowExecutionResponse; } ->; +> & { + /** + * General fixed details for this workflow execution that may appear in UI/CLI. + * This can be in Temporal markdown format and can span multiple lines. + * + * @experimental User metadata is a new API and suspectible to change. + */ + staticDetails: () => Promise; + + /** + * A single-line fixed summary for this workflow execution that may appear in the UI/CLI. + * This can be in single-line Temporal markdown format. + * + * @experimental User metadata is a new API and suspectible to change. + */ + staticSummary: () => Promise; +}; export type WorkflowService = proto.temporal.api.workflowservice.v1.WorkflowService; export const { WorkflowService } = proto.temporal.api.workflowservice.v1; diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index 42899800d..9ec730155 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -24,6 +24,7 @@ import { WorkflowIdConflictPolicy, compilePriority, } from '@temporalio/common'; +import { encodeUserMetadata } from '@temporalio/common/lib/user-metadata'; import { encodeUnifiedSearchAttributes } from '@temporalio/common/lib/converter/payload-search-attributes'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { History } from '@temporalio/common/lib/proto-utils'; @@ -32,6 +33,7 @@ import { decodeArrayFromPayloads, decodeFromPayloadsAtIndex, decodeOptionalFailureToOptionalError, + decodeOptionalSinglePayload, encodeMapToPayloads, encodeToPayloads, } from '@temporalio/common/lib/internal-non-workflow'; @@ -511,7 +513,7 @@ export class WorkflowClient extends BaseClient { protected async _start( workflowTypeOrFunc: string | T, - options: WithWorkflowArgs, + options: WorkflowStartOptions, interceptors: WorkflowClientInterceptor[] ): Promise { const workflowType = extractWorkflowType(workflowTypeOrFunc); @@ -1226,6 +1228,7 @@ export class WorkflowClient extends BaseClient { : undefined, cronSchedule: options.cronSchedule, header: { fields: headers }, + userMetadata: await encodeUserMetadata(this.dataConverter, options.staticSummary, options.staticDetails), priority: options.priority ? compilePriority(options.priority) : undefined, versioningOverride: options.versioningOverride ?? undefined, }; @@ -1268,7 +1271,6 @@ export class WorkflowClient extends BaseClient { protected async createStartWorkflowRequest(input: WorkflowStartInput): Promise { const { options: opts, workflowType, headers } = input; const { identity, namespace } = this.options; - return { namespace, identity, @@ -1296,6 +1298,7 @@ export class WorkflowClient extends BaseClient { : undefined, cronSchedule: opts.cronSchedule, header: { fields: headers }, + userMetadata: await encodeUserMetadata(this.dataConverter, opts.staticSummary, opts.staticDetails), priority: opts.priority ? compilePriority(opts.priority) : undefined, versioningOverride: opts.versioningOverride ?? undefined, }; @@ -1431,8 +1434,13 @@ export class WorkflowClient extends BaseClient { workflowExecution: { workflowId, runId }, }); const info = await executionInfoFromRaw(raw.workflowExecutionInfo ?? {}, this.client.dataConverter, raw); + const userMetadata = raw.executionConfig?.userMetadata; return { ...info, + staticDetails: async () => + (await decodeOptionalSinglePayload(this.client.dataConverter, userMetadata?.details)) ?? undefined, + staticSummary: async () => + (await decodeOptionalSinglePayload(this.client.dataConverter, userMetadata?.summary)) ?? undefined, raw, }; }, diff --git a/packages/common/src/activity-options.ts b/packages/common/src/activity-options.ts index 1d0bc31cc..151c78b15 100644 --- a/packages/common/src/activity-options.ts +++ b/packages/common/src/activity-options.ts @@ -124,6 +124,14 @@ export interface ActivityOptions { */ versioningIntent?: VersioningIntent; + /** + * A fixed, single-line summary for this workflow execution that may appear in the UI/CLI. + * This can be in single-line Temporal markdown format. + * + * @experimental User metadata is a new API and suspectible to change. + */ + summary?: string; + /** * Priority of this activity */ @@ -192,4 +200,12 @@ export interface LocalActivityOptions { * - `ABANDON` - Do not request cancellation of the activity and immediately report cancellation to the workflow. */ cancellationType?: ActivityCancellationType; + + /** + * A fixed, single-line summary for this workflow execution that may appear in the UI/CLI. + * This can be in single-line Temporal markdown format. + * + * @experimental User metadata is a new API and suspectible to change. + */ + summary?: string; } diff --git a/packages/common/src/internal-non-workflow/codec-helpers.ts b/packages/common/src/internal-non-workflow/codec-helpers.ts index 57afc7ce8..6ab981fa8 100644 --- a/packages/common/src/internal-non-workflow/codec-helpers.ts +++ b/packages/common/src/internal-non-workflow/codec-helpers.ts @@ -1,5 +1,5 @@ import { Payload } from '../interfaces'; -import { arrayFromPayloads, fromPayloadsAtIndex, toPayloads } from '../converter/payload-converter'; +import { arrayFromPayloads, fromPayloadsAtIndex, PayloadConverter, toPayloads } from '../converter/payload-converter'; import { PayloadConverterError } from '../errors'; import { PayloadCodec } from '../converter/payload-codec'; import { ProtoFailure } from '../failure'; @@ -72,6 +72,17 @@ export async function decodeOptionalSingle( return await decodeSingle(codecs, payload); } +/** Run {@link PayloadCodec.decode} and convert from a single Payload */ +export async function decodeOptionalSinglePayload( + dataConverter: LoadedDataConverter, + payload?: Payload | null | undefined +): Promise { + const { payloadConverter, payloadCodecs } = dataConverter; + const decoded = await decodeOptionalSingle(payloadCodecs, payload); + if (decoded == null) return decoded; + return payloadConverter.fromPayload(decoded); +} + /** * Run {@link PayloadConverter.toPayload} on value, and then encode it. */ @@ -80,6 +91,18 @@ export async function encodeToPayload(converter: LoadedDataConverter, value: unk return await encodeSingle(payloadCodecs, payloadConverter.toPayload(value)); } +/** + * Run {@link PayloadConverter.toPayload} on an optional value, and then encode it. + */ +export function encodeOptionalToPayload( + payloadConverter: PayloadConverter, + value: unknown +): Payload | null | undefined { + if (value == null) return value; + + return payloadConverter.toPayload(value); +} + /** * Decode `payloads` and then return {@link arrayFromPayloads}`. */ diff --git a/packages/common/src/internal-workflow/objects-helpers.ts b/packages/common/src/internal-workflow/objects-helpers.ts index addf156d0..a1659e38d 100644 --- a/packages/common/src/internal-workflow/objects-helpers.ts +++ b/packages/common/src/internal-workflow/objects-helpers.ts @@ -35,3 +35,33 @@ export function mergeObjects>( return changed ? (merged as T) : original; } + +function isObject(item: any): item is Record { + return item && typeof item === 'object' && !Array.isArray(item); +} + +/** + * Recursively merges two objects, returning a new object. + * + * Properties from `source` will overwrite properties on `target`. + * Nested objects are merged recursively. + * + * Object fields in the returned object are references, as in, + * the returned object is not completely fresh. + */ +export function deepMerge>(target: T, source: Partial): T { + const output = { ...target }; + + if (isObject(target) && isObject(source)) { + for (const key of Object.keys(source)) { + const sourceValue = source[key]; + if (isObject(sourceValue) && key in target && isObject(target[key] as any)) { + output[key as keyof T] = deepMerge(target[key], sourceValue); + } else { + (output as any)[key] = sourceValue; + } + } + } + + return output; +} diff --git a/packages/common/src/user-metadata.ts b/packages/common/src/user-metadata.ts new file mode 100644 index 000000000..243e41314 --- /dev/null +++ b/packages/common/src/user-metadata.ts @@ -0,0 +1,66 @@ +import { temporal } from '@temporalio/proto'; +import { PayloadConverter } from './converter/payload-converter'; +import { LoadedDataConverter } from './converter/data-converter'; +import { encodeOptionalToPayload, decodeOptionalSinglePayload, encodeOptionalSingle } from './internal-non-workflow'; + +/** + * User metadata that can be attached to workflow commands. + */ +export interface UserMetadata { + /** @experimental A fixed, single line summary of the command's purpose */ + staticSummary?: string; + /** @experimental Fixed additional details about the command for longer-text description, can span multiple lines */ + staticDetails?: string; +} + +export function userMetadataToPayload( + payloadConverter: PayloadConverter, + staticSummary: string | undefined, + staticDetails: string | undefined +): temporal.api.sdk.v1.IUserMetadata | undefined { + if (staticSummary == null && staticDetails == null) return undefined; + + const summary = encodeOptionalToPayload(payloadConverter, staticSummary); + const details = encodeOptionalToPayload(payloadConverter, staticDetails); + + if (summary == null && details == null) return undefined; + + return { summary, details }; +} + +export async function encodeUserMetadata( + dataConverter: LoadedDataConverter, + staticSummary: string | undefined, + staticDetails: string | undefined +): Promise { + if (staticSummary == null && staticDetails == null) return undefined; + + const { payloadConverter, payloadCodecs } = dataConverter; + const summary = await encodeOptionalSingle( + payloadCodecs, + await encodeOptionalToPayload(payloadConverter, staticSummary) + ); + const details = await encodeOptionalSingle( + payloadCodecs, + await encodeOptionalToPayload(payloadConverter, staticDetails) + ); + + if (summary == null && details == null) return undefined; + + return { summary, details }; +} + +export async function decodeUserMetadata( + dataConverter: LoadedDataConverter, + metadata: temporal.api.sdk.v1.IUserMetadata | undefined | null +): Promise { + const res = { staticSummary: undefined, staticDetails: undefined }; + if (metadata == null) return res; + + const staticSummary = (await decodeOptionalSinglePayload(dataConverter, metadata.summary)) ?? undefined; + const staticDetails = (await decodeOptionalSinglePayload(dataConverter, metadata.details)) ?? undefined; + + if (staticSummary == null && staticDetails == null) return res; + + return { staticSummary, staticDetails }; +} diff --git a/packages/common/src/workflow-options.ts b/packages/common/src/workflow-options.ts index efd68028b..393f516f2 100644 --- a/packages/common/src/workflow-options.ts +++ b/packages/common/src/workflow-options.ts @@ -193,6 +193,21 @@ export interface BaseWorkflowOptions { */ typedSearchAttributes?: SearchAttributePair[] | TypedSearchAttributes; + /** + * General fixed details for this workflow execution that may appear in UI/CLI. + * This can be in Temporal markdown format and can span multiple lines. + * + * @experimental User metadata is a new API and suspectible to change. + */ + staticDetails?: string; + /** + * A single-line fixed summary for this workflow execution that may appear in the UI/CLI. + * This can be in single-line Temporal markdown format. + * + * @experimental User metadata is a new API and suspectible to change. + */ + staticSummary?: string; + /** * Priority of a workflow */ diff --git a/packages/test/src/test-integration-split-two.ts b/packages/test/src/test-integration-split-two.ts index d3d934932..4145720a2 100644 --- a/packages/test/src/test-integration-split-two.ts +++ b/packages/test/src/test-integration-split-two.ts @@ -12,9 +12,26 @@ import { } from '@temporalio/common'; import { searchAttributePayloadConverter } from '@temporalio/common/lib/converter/payload-search-attributes'; import { msToNumber, tsToMs } from '@temporalio/common/lib/time'; -import { decode as payloadDecode, decodeFromPayloadsAtIndex } from '@temporalio/common/lib/internal-non-workflow'; +import { + decode as payloadDecode, + decodeFromPayloadsAtIndex, + decodeOptionalSinglePayload, +} from '@temporalio/common/lib/internal-non-workflow'; -import { condition, defineQuery, defineSignal, setDefaultQueryHandler, setHandler, sleep } from '@temporalio/workflow'; +import { + condition, + defineQuery, + defineSignal, + getCurrentDetails, + proxyActivities, + proxyLocalActivities, + setCurrentDetails, + setDefaultQueryHandler, + setHandler, + sleep, + startChild, +} from '@temporalio/workflow'; +import { temporal } from '@temporalio/proto'; import { configurableHelpers, createTestWorkflowBundle } from './helpers-integration'; import * as activities from './activities'; import * as workflows from './workflows'; @@ -763,3 +780,171 @@ test.serial('default query handler is not used if requested query exists', confi t.deepEqual(result, { name: definedQuery.name, args }); }); }); + +export async function completableWorkflow(completes: boolean): Promise { + await condition(() => completes); +} + +export async function userMetadataWorkflow(): Promise<{ + currentDetails: string; + childWorkflowId: string; + childRunId: string; +}> { + let done = false; + const signalDef = defineSignal('done'); + setHandler( + signalDef, + () => { + done = true; + }, + { description: 'signal-desc' } + ); + + // That workflow should call an activity (with summary) + const { activityWithSummary } = proxyActivities({ + scheduleToCloseTimeout: '10s', + scheduleToStartTimeout: '10s', + }); + await activityWithSummary.executeWithOptions( + { + summary: 'activity summary', + retry: { + initialInterval: '1s', + maximumAttempts: 5, + maximumInterval: '10s', + }, + scheduleToStartTimeout: '5s', + }, + [] + ); + const { localActivityWithSummary } = proxyLocalActivities({ scheduleToCloseTimeout: '10s' }); + await localActivityWithSummary.executeWithOptions( + { + summary: 'local activity summary', + retry: { + maximumAttempts: 2, + nonRetryableErrorTypes: ['CustomError'], + }, + scheduleToStartTimeout: '5s', + }, + [] + ); + // Timer (with summary) + await sleep(5, { summary: 'timer summary' }); + // Set current details + setCurrentDetails('current wf details'); + // Start child workflow + const child_handle = await startChild(completableWorkflow, { + args: [false], + staticDetails: 'child details', + staticSummary: 'child summary', + }); + + await condition(() => done); + return { + currentDetails: getCurrentDetails(), + childWorkflowId: child_handle.workflowId, + childRunId: child_handle.firstExecutionRunId, + }; +} + +test.serial('User metadata on workflow, timer, activity, child', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + + const worker = await createWorkerWithDefaults(t, { + activities: { + async activityWithSummary() {}, + async localActivityWithSummary() {}, + }, + }); + + await worker.runUntil(async () => { + // Start a workflow with static details + const handle = await startWorkflow(userMetadataWorkflow, { + staticSummary: 'wf static summary', + staticDetails: 'wf static details', + }); + // Describe workflow -> static summary, static details + const desc = await handle.describe(); + t.true((await desc.staticSummary()) === 'wf static summary'); + t.true((await desc.staticDetails()) === 'wf static details'); + + await handle.signal('done'); + const res = await handle.result(); + t.true(res.currentDetails === 'current wf details'); + + // Get child workflow handle and verify metadata + const childHandle = env.client.workflow.getHandle(res.childWorkflowId, res.childRunId); + const childDesc = await childHandle.describe(); + t.true((await childDesc.staticSummary()) === 'child summary'); + t.true((await childDesc.staticDetails()) === 'child details'); + + // Get history events for main workflow. + const resp = await env.client.workflowService.getWorkflowExecutionHistory({ + namespace: env.client.options.namespace, + execution: { + workflowId: handle.workflowId, + runId: handle.firstExecutionRunId, + }, + }); + for (const event of resp.history?.events ?? []) { + if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) { + t.deepEqual( + await decodeOptionalSinglePayload(env.client.options.loadedDataConverter, event.userMetadata?.summary), + 'wf static summary' + ); + t.deepEqual( + await decodeOptionalSinglePayload(env.client.options.loadedDataConverter, event.userMetadata?.details), + 'wf static details' + ); + } else if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED) { + t.deepEqual( + await decodeOptionalSinglePayload(env.client.options.loadedDataConverter, event.userMetadata?.summary), + 'activity summary' + ); + // Assert that the overriden activity options are what we expect. + const attrs = event.activityTaskScheduledEventAttributes; + t.is(tsToMs(attrs?.scheduleToCloseTimeout), 10000); + t.is(tsToMs(attrs?.scheduleToStartTimeout), 5000); + const retryPolicy = attrs?.retryPolicy; + t.is(retryPolicy?.maximumAttempts, 5); + t.is(tsToMs(retryPolicy?.initialInterval), 1000); + t.is(tsToMs(retryPolicy?.maximumInterval), 10000); + } else if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_TIMER_STARTED) { + t.deepEqual( + await decodeOptionalSinglePayload(env.client.options.loadedDataConverter, event.userMetadata?.summary), + 'timer summary' + ); + } + } + // Get history events for child workflow. + const childResp = await env.client.workflowService.getWorkflowExecutionHistory({ + namespace: env.client.options.namespace, + execution: { + workflowId: res.childWorkflowId, + runId: res.childRunId, + }, + }); + + for (const event of childResp.history?.events ?? []) { + if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) { + t.deepEqual( + await decodeOptionalSinglePayload(env.client.options.loadedDataConverter, event.userMetadata?.summary), + 'child summary' + ); + t.deepEqual( + await decodeOptionalSinglePayload(env.client.options.loadedDataConverter, event.userMetadata?.details), + 'child details' + ); + } + } + // Run metadata query -> get current details + const wfMetadata = (await handle.query('__temporal_workflow_metadata')) as temporal.api.sdk.v1.IWorkflowMetadata; + t.deepEqual(wfMetadata.definition?.signalDefinitions?.length, 1); + t.deepEqual(wfMetadata.definition?.signalDefinitions?.[0].name, 'done'); + t.deepEqual(wfMetadata.definition?.signalDefinitions?.[0].description, 'signal-desc'); + t.deepEqual(wfMetadata.definition?.queryDefinitions?.length, 3); // default queries + t.deepEqual(wfMetadata.currentDetails, 'current wf details'); + }); +}); diff --git a/packages/test/src/test-schedules.ts b/packages/test/src/test-schedules.ts index 4b0e5588c..200f67dfe 100644 --- a/packages/test/src/test-schedules.ts +++ b/packages/test/src/test-schedules.ts @@ -881,4 +881,31 @@ if (RUN_INTEGRATION_TESTS) { await handle.delete(); } }); + + test.serial('User metadata on schedule', async (t) => { + const { client } = t.context; + const scheduleId = `schedule-with-user-metadata-${randomUUID()}`; + const handle = await client.schedule.create({ + scheduleId, + spec: {}, + action: { + type: 'startWorkflow', + workflowType: dummyWorkflow, + taskQueue, + staticSummary: 'schedule static summary', + staticDetails: 'schedule static details', + }, + }); + + try { + const describedSchedule = await handle.describe(); + t.deepEqual(describedSchedule.spec.calendars, []); + t.deepEqual(describedSchedule.spec.intervals, []); + t.deepEqual(describedSchedule.spec.skip, []); + t.deepEqual(describedSchedule.action.staticSummary, 'schedule static summary'); + t.deepEqual(describedSchedule.action.staticDetails, 'schedule static details'); + } finally { + await handle.delete(); + } + }); } diff --git a/packages/worker/src/workflow-codec-runner.ts b/packages/worker/src/workflow-codec-runner.ts index 5b3266688..ffc0ead8e 100644 --- a/packages/worker/src/workflow-codec-runner.ts +++ b/packages/worker/src/workflow-codec-runner.ts @@ -335,6 +335,13 @@ export class WorkflowCodecRunner { }, } : undefined, + userMetadata: + command.userMetadata && (command.userMetadata.summary || command.userMetadata.details) + ? { + summary: await encodeOptionalSingle(this.codecs, command.userMetadata.summary), + details: await encodeOptionalSingle(this.codecs, command.userMetadata.details), + } + : undefined, } ) ?? [] ) diff --git a/packages/workflow/src/interceptors.ts b/packages/workflow/src/interceptors.ts index 978fd5a08..ae2f91fa7 100644 --- a/packages/workflow/src/interceptors.ts +++ b/packages/workflow/src/interceptors.ts @@ -113,6 +113,13 @@ export interface StartChildWorkflowExecutionInput { export interface TimerInput { readonly durationMs: number; readonly seq: number; + readonly options?: TimerOptions; +} + +/** Options for starting a timer (i.e. sleep) */ +export interface TimerOptions { + /** @experimental A fixed, single line summary of the command's purpose */ + readonly summary?: string; } /** diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index b8e86b2a2..78b4d9abe 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -322,6 +322,7 @@ export class Activator implements ActivationHandler { signalDefinitions, updateDefinitions, }, + currentDetails: this.currentDetails, }; }, description: 'Returns metadata associated with this workflow.', @@ -427,6 +428,8 @@ export class Activator implements ActivationHandler { public readonly registeredActivityNames: Set; + public currentDetails: string = ''; + public versioningBehavior?: VersioningBehavior; public workflowDefinitionOptionsGetter?: () => WorkflowDefinitionOptions; diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index d915d6ed7..f6c4a898b 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -2,6 +2,7 @@ import { ActivityFunction, ActivityOptions, compileRetryPolicy, + compilePriority, encodeActivityCancellationType, encodeWorkflowIdReusePolicy, extractWorkflowType, @@ -22,9 +23,9 @@ import { WorkflowReturnType, WorkflowUpdateValidatorType, SearchAttributeUpdatePair, - compilePriority, WorkflowDefinitionOptionsOrGetter, } from '@temporalio/common'; +import { userMetadataToPayload } from '@temporalio/common/lib/user-metadata'; import { encodeUnifiedSearchAttributes, searchAttributePayloadConverter, @@ -33,6 +34,7 @@ import { versioningIntentToProto } from '@temporalio/common/lib/versioning-inten import { Duration, msOptionalToTs, msToNumber, msToTs, requiredTsToMs } from '@temporalio/common/lib/time'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { temporal } from '@temporalio/proto'; +import { deepMerge } from '@temporalio/common/lib/internal-workflow'; import { CancellationScope, registerSleepImplementation } from './cancellation-scope'; import { UpdateScope } from './update-scope'; import { @@ -41,6 +43,7 @@ import { SignalWorkflowInput, StartChildWorkflowExecutionInput, TimerInput, + TimerOptions, } from './interceptors'; import { ChildWorkflowCancellationType, @@ -87,7 +90,7 @@ export function addDefaultWorkflowOptions( /** * Push a startTimer command into state accumulator and register completion */ -function timerNextHandler(input: TimerInput) { +function timerNextHandler({ seq, durationMs, options }: TimerInput) { const activator = getActivator(); return new Promise((resolve, reject) => { const scope = CancellationScope.current(); @@ -98,12 +101,12 @@ function timerNextHandler(input: TimerInput) { if (scope.cancellable) { untrackPromise( scope.cancelRequested.catch((err) => { - if (!activator.completions.timer.delete(input.seq)) { + if (!activator.completions.timer.delete(seq)) { return; // Already resolved or never scheduled } activator.pushCommand({ cancelTimer: { - seq: input.seq, + seq, }, }); reject(err); @@ -112,11 +115,12 @@ function timerNextHandler(input: TimerInput) { } activator.pushCommand({ startTimer: { - seq: input.seq, - startToFireTimeout: msToTs(input.durationMs), + seq, + startToFireTimeout: msToTs(durationMs), }, + userMetadata: userMetadataToPayload(activator.payloadConverter, options?.summary, undefined), }); - activator.completions.timer.set(input.seq, { + activator.completions.timer.set(seq, { resolve, reject, }); @@ -130,8 +134,9 @@ function timerNextHandler(input: TimerInput) { * * @param ms sleep duration - number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string}. * If given a negative number or 0, value will be set to 1. + * @param options optional timer options for additional configuration */ -export function sleep(ms: Duration): Promise { +export function sleep(ms: Duration, options?: TimerOptions): Promise { const activator = assertInWorkflowContext('Workflow.sleep(...) may only be used from a Workflow Execution'); const seq = activator.nextSeqs.timer++; @@ -142,6 +147,7 @@ export function sleep(ms: Duration): Promise { return execute({ durationMs, seq, + options, }); } @@ -198,6 +204,7 @@ function scheduleActivityNextHandler({ options, args, headers, seq, activityType versioningIntent: versioningIntentToProto(options.versioningIntent), priority: options.priority ? compilePriority(options.priority) : undefined, }, + userMetadata: userMetadataToPayload(activator.payloadConverter, options.summary, undefined), }); activator.completions.activity.set(seq, { resolve, @@ -263,6 +270,7 @@ async function scheduleLocalActivityNextHandler({ headers, cancellationType: encodeActivityCancellationType(options.cancellationType), }, + userMetadata: userMetadataToPayload(activator.payloadConverter, options.summary, undefined), }); activator.completions.activity.set(seq, { resolve, @@ -399,6 +407,7 @@ function startChildWorkflowExecutionNextHandler({ versioningIntent: versioningIntentToProto(options.versioningIntent), priority: options.priority ? compilePriority(options.priority) : undefined, }, + userMetadata: userMetadataToPayload(activator.payloadConverter, options?.staticSummary, options?.staticDetails), }); activator.completions.childWorkflowStart.set(seq, { resolve, @@ -501,7 +510,44 @@ export const NotAnActivityMethod = Symbol.for('__TEMPORAL_NOT_AN_ACTIVITY_METHOD * ``` */ export type ActivityInterfaceFor = { - [K in keyof T]: T[K] extends ActivityFunction ? T[K] : typeof NotAnActivityMethod; + [K in keyof T]: T[K] extends ActivityFunction ? ActivityFunctionWithOptions : typeof NotAnActivityMethod; +}; + +export type ActivityFunctionWithOptions = T & { + /** + * Execute the activity, overriding its existing options with the + * provided options. + * + * @param options ActivityOptions + * @param args: list of arguments + * @returns return value of the activity + * + * @experimental executeWithOptions is a new method to provide call-site options + * and is subject to change + */ + executeWithOptions(options: ActivityOptions, args: Parameters): Promise>>; +}; + +/** + * The local activity counterpart to {@link ActivityInterfaceFor} + */ +export type LocalActivityInterfaceFor = { + [K in keyof T]: T[K] extends ActivityFunction ? LocalActivityFunctionWithOptions : typeof NotAnActivityMethod; +}; + +export type LocalActivityFunctionWithOptions = T & { + /** + * Run the local activity, overriding its existing options with the + * provided options. + * + * @param options LocalActivityOptions + * @param args: list of arguments + * @returns return value of the activity + * + * @experimental executeWithOptions is a new method to provide call-site options + * and is subject to change + */ + executeWithOptions(options: LocalActivityOptions, args: Parameters): Promise>>; }; /** @@ -522,6 +568,20 @@ export type ActivityInterfaceFor = { * startToCloseTimeout: '30 minutes', * }); * + * // Use activities with default options + * const result1 = await httpGet('http://example.com'); + * + * // Override options for specific activity calls + * const result2 = await httpGet.executeWithOptions({ + * staticSummary: 'Fetches data from external API', + * scheduleToCloseTimeout: '5m' + * }, ['http://api.example.com']); + * + * const result3 = await otherActivity.executeWithOptions({ + * staticSummary: 'Processes the fetched data', + * taskQueue: 'special-task-queue' + * }, [data]); + * * // Setup Activities from an explicit interface (e.g. when defined by another SDK) * interface JavaActivities { * httpGetFromJava(url: string): Promise @@ -538,6 +598,11 @@ export type ActivityInterfaceFor = { * * export function execute(): Promise { * const response = await httpGet("http://example.com"); + * // Or with custom options: + * const response2 = await httpGetFromJava.executeWithOptions({ + * staticSummary: 'Java HTTP call with timeout override', + * startToCloseTimeout: '2m' + * }, ["http://fast-api.example.com"]); * // ... * } * ``` @@ -548,19 +613,27 @@ export function proxyActivities(options: ActivityOptions) } // Validate as early as possible for immediate user feedback validateActivityOptions(options); - return new Proxy( - {}, - { - get(_, activityType) { - if (typeof activityType !== 'string') { - throw new TypeError(`Only strings are supported for Activity types, got: ${String(activityType)}`); - } - return function activityProxyFunction(...args: unknown[]): Promise { - return scheduleActivity(activityType, args, options); - }; - }, - } - ) as any; + + return new Proxy({} as ActivityInterfaceFor, { + get(_, activityType) { + if (typeof activityType !== 'string') { + throw new TypeError(`Only strings are supported for Activity types, got: ${String(activityType)}`); + } + + function activityProxyFunction(...args: unknown[]): Promise { + return scheduleActivity(activityType as string, args, options); + } + + activityProxyFunction.executeWithOptions = function ( + overrideOptions: ActivityOptions, + args: any[] + ): Promise { + return scheduleActivity(activityType, args, deepMerge(options, overrideOptions)); + }; + + return activityProxyFunction; + }, + }); } /** @@ -573,25 +646,35 @@ export function proxyActivities(options: ActivityOptions) * * @see {@link proxyActivities} for examples */ -export function proxyLocalActivities(options: LocalActivityOptions): ActivityInterfaceFor { +export function proxyLocalActivities( + options: LocalActivityOptions +): LocalActivityInterfaceFor { if (options === undefined) { throw new TypeError('options must be defined'); } // Validate as early as possible for immediate user feedback validateLocalActivityOptions(options); - return new Proxy( - {}, - { - get(_, activityType) { - if (typeof activityType !== 'string') { - throw new TypeError(`Only strings are supported for Activity types, got: ${String(activityType)}`); - } - return function localActivityProxyFunction(...args: unknown[]) { - return scheduleLocalActivity(activityType, args, options); - }; - }, - } - ) as any; + + return new Proxy({} as LocalActivityInterfaceFor, { + get(_, activityType) { + if (typeof activityType !== 'string') { + throw new TypeError(`Only strings are supported for Activity types, got: ${String(activityType)}`); + } + + function localActivityProxyFunction(...args: unknown[]): Promise { + return scheduleLocalActivity(activityType as string, args, options); + } + + localActivityProxyFunction.executeWithOptions = function ( + overrideOptions: LocalActivityOptions, + args: any[] + ): Promise { + return scheduleLocalActivity(activityType, args, deepMerge(options, overrideOptions)); + }; + + return localActivityProxyFunction; + }, + }); } // TODO: deprecate this patch after "enough" time has passed @@ -961,13 +1044,13 @@ export function makeContinueAsNewFunc( * @example * * ```ts - *import { continueAsNew } from '@temporalio/workflow'; -import { SearchAttributeType } from '@temporalio/common'; + * import { continueAsNew } from '@temporalio/workflow'; + * import { SearchAttributeType } from '@temporalio/common'; * - *export async function myWorkflow(n: number): Promise { - * // ... Workflow logic - * await continueAsNew(n + 1); - *} + * export async function myWorkflow(n: number): Promise { + * // ... Workflow logic + * await continueAsNew(n + 1); + * } * ``` */ export function continueAsNew(...args: Parameters): Promise { @@ -1049,6 +1132,18 @@ export function deprecatePatch(patchId: string): void { activator.patchInternal(patchId, true); } +/** + * Returns a Promise that resolves when `fn` evaluates to `true` or `timeout` expires, providing + * options to configure the timer (i.e. provide metadata) + * + * @param timeout number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string} + * + * @returns a boolean indicating whether the condition was true before the timeout expires + * + * @experimental TimerOptions is a new addition and subject to change + */ +export function condition(fn: () => boolean, timeout: Duration, options: TimerOptions): Promise; + /** * Returns a Promise that resolves when `fn` evaluates to `true` or `timeout` expires. * @@ -1063,7 +1158,7 @@ export function condition(fn: () => boolean, timeout: Duration): Promise boolean): Promise; -export async function condition(fn: () => boolean, timeout?: Duration): Promise { +export async function condition(fn: () => boolean, timeout?: Duration, opts?: TimerOptions): Promise { assertInWorkflowContext('Workflow.condition(...) may only be used from a Workflow Execution.'); // Prior to 1.5.0, `condition(fn, 0)` was treated as equivalent to `condition(fn, undefined)` if (timeout === 0 && !patched(CONDITION_0_PATCH)) { @@ -1072,7 +1167,7 @@ export async function condition(fn: () => boolean, timeout?: Duration): Promise< if (typeof timeout === 'number' || typeof timeout === 'string') { return CancellationScope.cancellable(async () => { try { - return await Promise.race([sleep(timeout).then(() => false), conditionInner(fn).then(() => true)]); + return await Promise.race([sleep(timeout, opts).then(() => false), conditionInner(fn).then(() => true)]); } finally { CancellationScope.current().cancel(); } @@ -1655,3 +1750,13 @@ export function setWorkflowOptions( export const stackTraceQuery = defineQuery('__stack_trace'); export const enhancedStackTraceQuery = defineQuery('__enhanced_stack_trace'); export const workflowMetadataQuery = defineQuery('__temporal_workflow_metadata'); + +export function getCurrentDetails(): string { + const activator = assertInWorkflowContext('getCurrentDetails() may only be used from a Workflow Execution.'); + return activator.currentDetails; +} + +export function setCurrentDetails(details: string): void { + const activator = assertInWorkflowContext('getCurrentDetails() may only be used from a Workflow Execution.'); + activator.currentDetails = details; +}