diff --git a/packages/core-bridge/src/helpers/try_from_js.rs b/packages/core-bridge/src/helpers/try_from_js.rs index 764b2f58f..1675d6bf8 100644 --- a/packages/core-bridge/src/helpers/try_from_js.rs +++ b/packages/core-bridge/src/helpers/try_from_js.rs @@ -1,4 +1,8 @@ -use std::{collections::HashMap, net::SocketAddr, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + net::SocketAddr, + time::Duration, +}; use neon::{ handle::Handle, @@ -163,6 +167,24 @@ impl TryFromJs for Vec { } } +#[allow(clippy::implicit_hasher)] +impl TryFromJs for HashSet { + fn try_from_js<'cx, 'b>( + cx: &mut impl Context<'cx>, + js_value: Handle<'b, JsValue>, + ) -> BridgeResult { + let array = js_value.downcast::(cx)?; + let len = array.len(cx); + let mut result = Self::with_capacity(len as usize); + + for i in 0..len { + let value = array.get_value(cx, i)?; + result.insert(T::try_from_js(cx, value)?); + } + Ok(result) + } +} + #[allow(clippy::implicit_hasher)] impl TryFromJs for HashMap { fn try_from_js<'cx, 'b>( diff --git a/packages/core-bridge/src/worker.rs b/packages/core-bridge/src/worker.rs index 872bfd60e..6cd8c568f 100644 --- a/packages/core-bridge/src/worker.rs +++ b/packages/core-bridge/src/worker.rs @@ -404,16 +404,23 @@ impl MutableFinalize for HistoryForReplayTunnelHandle {} //////////////////////////////////////////////////////////////////////////////////////////////////// mod config { - use std::{sync::Arc, time::Duration}; + use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, + }; use temporal_sdk_core::{ ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceSlotOptions, SlotSupplierOptions as CoreSlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder, - api::worker::{ - ActivitySlotKind, LocalActivitySlotKind, PollerBehavior as CorePollerBehavior, - SlotKind, WorkerConfig, WorkerConfigBuilder, WorkerConfigBuilderError, - WorkerDeploymentOptions as CoreWorkerDeploymentOptions, - WorkerDeploymentVersion as CoreWorkerDeploymentVersion, WorkflowSlotKind, + api::{ + errors::WorkflowErrorType as CoreWorkflowErrorType, + worker::{ + ActivitySlotKind, LocalActivitySlotKind, PollerBehavior as CorePollerBehavior, + SlotKind, WorkerConfig, WorkerConfigBuilder, WorkerConfigBuilderError, + WorkerDeploymentOptions as CoreWorkerDeploymentOptions, + WorkerDeploymentVersion as CoreWorkerDeploymentVersion, WorkflowSlotKind, + }, }, protos::temporal::api::enums::v1::VersioningBehavior as CoreVersioningBehavior, }; @@ -447,6 +454,8 @@ mod config { max_activities_per_second: Option, max_task_queue_activities_per_second: Option, shutdown_grace_time: Option, + workflow_failure_errors: HashSet, + workflow_types_to_failure_errors: HashMap>, } #[derive(TryFromJs)] @@ -513,6 +522,10 @@ mod config { .max_task_queue_activities_per_second(self.max_task_queue_activities_per_second) .max_worker_activities_per_second(self.max_activities_per_second) .graceful_shutdown_period(self.shutdown_grace_time) + .workflow_failure_errors(into_core_workflow_error_set(self.workflow_failure_errors)) + .workflow_types_to_failure_errors(into_core_workflow_error_map_of_sets( + self.workflow_types_to_failure_errors, + )) .build() } } @@ -584,6 +597,33 @@ mod config { } } + #[derive(TryFromJs, Hash, Eq, PartialEq)] + pub enum WorkflowErrorType { + Nondeterminism, + } + + impl From for CoreWorkflowErrorType { + fn from(val: WorkflowErrorType) -> Self { + match val { + WorkflowErrorType::Nondeterminism => Self::Nondeterminism, + } + } + } + + fn into_core_workflow_error_set( + val: HashSet, + ) -> HashSet { + val.into_iter().map(Into::into).collect() + } + + fn into_core_workflow_error_map_of_sets( + val: HashMap>, + ) -> HashMap> { + val.into_iter() + .map(|(k, v)| (k, into_core_workflow_error_set(v))) + .collect() + } + #[derive(TryFromJs)] #[allow(clippy::struct_field_names)] pub(super) struct WorkerTuner { diff --git a/packages/core-bridge/ts/native.ts b/packages/core-bridge/ts/native.ts index bc365fc77..6e700acd0 100644 --- a/packages/core-bridge/ts/native.ts +++ b/packages/core-bridge/ts/native.ts @@ -200,6 +200,8 @@ export interface WorkerOptions { maxTaskQueueActivitiesPerSecond: Option; maxActivitiesPerSecond: Option; shutdownGraceTime: number; + workflowFailureErrors: WorkflowErrorType[]; + workflowTypesToFailureErrors: Record; } export type PollerBehavior = @@ -227,6 +229,8 @@ export type WorkerDeploymentVersion = { export type VersioningBehavior = { type: 'pinned' } | { type: 'auto-upgrade' }; +export type WorkflowErrorType = { type: 'nondeterminism' }; + //////////////////////////////////////////////////////////////////////////////////////////////////// // Worker Tuner //////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index 6b27f0be0..ef6757c6b 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -474,6 +474,23 @@ export interface WorkerOptions { */ sinks?: InjectedSinks; + /** + * The types of exceptions that, if a Workflow-thrown error extends, will cause the Workflow + * Execution or the Update to fail instead of suspending the Workflow via task failure. + * + * This property expects a record of Workflow-type names to the list of error types that will + * cause that type of Workflow to fail. Uses the `'*'` key to specify a list of error types that + * applies to all Workflow types. + * + * If either list of error types includes `NondeterminismError`, then non-determinism errors + * will cause the Workflow Excution to fail. If the list of error types includes `Error`, it + * effectively will fail a workflow/update in all user exception cases, including non-determinism + * errors. + * + * @experimental + */ + workflowTypesToFailureErrors?: Record<'*' | string, (string | 'NondeterminismError' | 'Error')[]>; + /** * @deprecated SDK tracing is no longer supported. This option is ignored. */ @@ -972,6 +989,24 @@ export function compileWorkerOptions( } export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): native.WorkerOptions { + const workflowFailureErrors: native.WorkflowErrorType[] = []; + const workflowTypesToFailureErrors: Record = {}; + + for (const [k, v] of Object.entries(opts.workflowTypesToFailureErrors ?? {})) { + const errorTypes: native.WorkflowErrorType[] = []; + + // Core only cares about Non-Determinism Error; other error types are handled by lang side + if (v.includes('NondeterminismError') || v.includes('Error')) { + errorTypes.push({ type: 'nondeterminism' }); + } + + if (k === '*') { + workflowFailureErrors.push(...errorTypes); + } else { + workflowTypesToFailureErrors[k] = errorTypes; + } + } + return { identity: opts.identity, buildId: opts.buildId, // eslint-disable-line deprecation/deprecation @@ -983,7 +1018,7 @@ export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): n nonStickyToStickyPollRatio: opts.nonStickyToStickyPollRatio, workflowTaskPollerBehavior: toNativeTaskPollerBehavior(opts.workflowTaskPollerBehavior), activityTaskPollerBehavior: toNativeTaskPollerBehavior(opts.activityTaskPollerBehavior), - enableNonLocalActivities: opts.enableNonLocalActivities, + enableNonLocalActivities: opts.enableNonLocalActivities && opts.activities.size > 0, stickyQueueScheduleToStartTimeout: msToNumber(opts.stickyQueueScheduleToStartTimeout), maxCachedWorkflows: opts.maxCachedWorkflows, maxHeartbeatThrottleInterval: msToNumber(opts.maxHeartbeatThrottleInterval), @@ -991,6 +1026,8 @@ export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): n maxTaskQueueActivitiesPerSecond: opts.maxTaskQueueActivitiesPerSecond ?? null, maxActivitiesPerSecond: opts.maxActivitiesPerSecond ?? null, shutdownGraceTime: msToNumber(opts.shutdownGraceTime), + workflowFailureErrors, + workflowTypesToFailureErrors, }; }