diff --git a/packages/core-bridge/sdk-core b/packages/core-bridge/sdk-core index 850db67c8..45b1d7edf 160000 --- a/packages/core-bridge/sdk-core +++ b/packages/core-bridge/sdk-core @@ -1 +1 @@ -Subproject commit 850db67c87ac9208da53df1cd82f8a36d71c5227 +Subproject commit 45b1d7edf7138eb6e307a0acddccf970f19ee73e diff --git a/packages/core-bridge/src/runtime.rs b/packages/core-bridge/src/runtime.rs index 24993b1bd..fae1d1449 100644 --- a/packages/core-bridge/src/runtime.rs +++ b/packages/core-bridge/src/runtime.rs @@ -59,12 +59,13 @@ pub struct Runtime { pub fn runtime_new( bridge_options: config::RuntimeOptions, ) -> BridgeResult> { - let (telemetry_options, metrics_options, logging_options) = bridge_options.try_into()?; + let (telemetry_options, metrics_options, logging_options, worker_heartbeat_interval_millis) = + bridge_options.try_into()?; // Create core runtime which starts tokio multi-thread runtime let runtime_options = RuntimeOptionsBuilder::default() .telemetry_options(telemetry_options) - .heartbeat_interval(None) + .heartbeat_interval(worker_heartbeat_interval_millis.map(Duration::from_millis)) .build() .context("Failed to build runtime options")?; let mut core_runtime = CoreRuntime::new(runtime_options, TokioRuntimeBuilder::default()) @@ -266,6 +267,7 @@ mod config { log_exporter: LogExporterOptions, telemetry: TelemetryOptions, metrics_exporter: Option, + worker_heartbeat_interval_millis: Option, } #[derive(Debug, Clone, TryFromJs)] @@ -322,6 +324,7 @@ mod config { CoreTelemetryOptions, Option, super::BridgeLogExporter, + Option, )> for RuntimeOptions { type Error = BridgeError; @@ -331,8 +334,16 @@ mod config { CoreTelemetryOptions, Option, super::BridgeLogExporter, + Option, )> { - let (telemetry_logger, log_exporter) = match self.log_exporter { + let Self { + log_exporter, + telemetry, + metrics_exporter, + worker_heartbeat_interval_millis, + } = self; + + let (telemetry_logger, log_exporter) = match log_exporter { LogExporterOptions::Console { filter } => ( CoreTelemetryLogger::Console { filter }, BridgeLogExporter::Console, @@ -352,17 +363,21 @@ mod config { let mut telemetry_options = TelemetryOptionsBuilder::default(); let telemetry_options = telemetry_options .logging(telemetry_logger) - .metric_prefix(self.telemetry.metric_prefix) - .attach_service_name(self.telemetry.attach_service_name) + .metric_prefix(telemetry.metric_prefix) + .attach_service_name(telemetry.attach_service_name) .build() .context("Failed to build telemetry options")?; - let metrics_exporter = self - .metrics_exporter + let metrics_exporter = metrics_exporter .map(std::convert::TryInto::try_into) .transpose()?; - Ok((telemetry_options, metrics_exporter, log_exporter)) + Ok(( + telemetry_options, + metrics_exporter, + log_exporter, + worker_heartbeat_interval_millis, + )) } } diff --git a/packages/core-bridge/src/worker.rs b/packages/core-bridge/src/worker.rs index d0d0cdff3..ae6989cf8 100644 --- a/packages/core-bridge/src/worker.rs +++ b/packages/core-bridge/src/worker.rs @@ -166,6 +166,9 @@ pub fn worker_complete_workflow_activation( ), } } + CompleteWfError::WorkflowNotEnabled => { + BridgeError::UnexpectedError(err.to_string()) + } }) }) } @@ -225,6 +228,9 @@ pub fn worker_complete_activity_task( field: None, message: format!("Malformed Activity Completion: {reason:?}"), }, + CompleteActivityError::ActivityNotEnabled => { + BridgeError::UnexpectedError(err.to_string()) + } }) }) } @@ -296,7 +302,7 @@ pub fn worker_complete_nexus_task( .await .map_err(|err| match err { CompleteNexusError::NexusNotEnabled => { - BridgeError::UnexpectedError(format!("{err}")) + BridgeError::UnexpectedError(err.to_string()) } CompleteNexusError::MalformedNexusCompletion { reason } => BridgeError::TypeError { field: None, @@ -466,6 +472,7 @@ mod config { use std::{sync::Arc, time::Duration}; use temporalio_common::protos::temporal::api::enums::v1::VersioningBehavior as CoreVersioningBehavior; + use temporalio_common::protos::temporal::api::worker::v1::PluginInfo; use temporalio_common::worker::{ ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind, PollerBehavior as CorePollerBehavior, SlotKind, WorkerConfig, WorkerConfigBuilder, @@ -499,7 +506,7 @@ mod config { workflow_task_poller_behavior: PollerBehavior, activity_task_poller_behavior: PollerBehavior, nexus_task_poller_behavior: PollerBehavior, - enable_non_local_activities: bool, + task_types: WorkerTaskTypes, sticky_queue_schedule_to_start_timeout: Duration, max_cached_workflows: usize, max_heartbeat_throttle_interval: Duration, @@ -507,6 +514,7 @@ mod config { max_activities_per_second: Option, max_task_queue_activities_per_second: Option, shutdown_grace_time: Option, + plugins: Vec, } #[derive(TryFromJs)] @@ -540,6 +548,26 @@ mod config { AutoUpgrade, } + #[derive(TryFromJs)] + #[allow(clippy::struct_excessive_bools)] + pub struct WorkerTaskTypes { + enable_workflows: bool, + enable_local_activities: bool, + enable_remote_activities: bool, + enable_nexus: bool, + } + + impl From<&WorkerTaskTypes> for temporalio_common::worker::WorkerTaskTypes { + fn from(t: &WorkerTaskTypes) -> Self { + Self { + enable_workflows: t.enable_workflows, + enable_local_activities: t.enable_local_activities, + enable_remote_activities: t.enable_remote_activities, + enable_nexus: t.enable_nexus, + } + } + } + impl BridgeWorkerOptions { pub(crate) fn into_core_config(self) -> Result { // Set all other options @@ -566,7 +594,7 @@ mod config { .workflow_task_poller_behavior(self.workflow_task_poller_behavior) .activity_task_poller_behavior(self.activity_task_poller_behavior) .nexus_task_poller_behavior(self.nexus_task_poller_behavior) - .no_remote_activities(!self.enable_non_local_activities) + .task_types(&self.task_types) .sticky_queue_schedule_to_start_timeout(self.sticky_queue_schedule_to_start_timeout) .max_cached_workflows(self.max_cached_workflows) .max_heartbeat_throttle_interval(self.max_heartbeat_throttle_interval) @@ -574,6 +602,15 @@ mod config { .max_task_queue_activities_per_second(self.max_task_queue_activities_per_second) .max_worker_activities_per_second(self.max_activities_per_second) .graceful_shutdown_period(self.shutdown_grace_time) + .plugins( + self.plugins + .into_iter() + .map(|name| PluginInfo { + name, + version: String::new(), + }) + .collect::>(), + ) .build() } } diff --git a/packages/core-bridge/ts/native.ts b/packages/core-bridge/ts/native.ts index 2ddd20a1c..099bf92f6 100644 --- a/packages/core-bridge/ts/native.ts +++ b/packages/core-bridge/ts/native.ts @@ -40,7 +40,7 @@ export type JsonString<_T> = string; // Runtime //////////////////////////////////////////////////////////////////////////////////////////////////// -export declare function newRuntime(telemOptions: RuntimeOptions): Runtime; +export declare function newRuntime(runtimeOptions: RuntimeOptions): Runtime; export declare function runtimeShutdown(runtime: Runtime): void; @@ -52,6 +52,7 @@ export type RuntimeOptions = { logExporter: LogExporterOptions; telemetry: TelemetryOptions; metricsExporter: MetricExporterOptions; + workerHeartbeatIntervalMillis: Option; }; export type TelemetryOptions = { @@ -213,7 +214,12 @@ export interface WorkerOptions { workflowTaskPollerBehavior: PollerBehavior; activityTaskPollerBehavior: PollerBehavior; nexusTaskPollerBehavior: PollerBehavior; - enableNonLocalActivities: boolean; + taskTypes: { + enableWorkflows: boolean; + enableLocalActivities: boolean; + enableRemoteActivities: boolean; + enableNexus: boolean; + }; stickyQueueScheduleToStartTimeout: number; maxCachedWorkflows: number; maxHeartbeatThrottleInterval: number; @@ -221,6 +227,7 @@ export interface WorkerOptions { maxTaskQueueActivitiesPerSecond: Option; maxActivitiesPerSecond: Option; shutdownGraceTime: number; + plugins: string[]; } export type PollerBehavior = diff --git a/packages/test/src/test-bridge.ts b/packages/test/src/test-bridge.ts index df14e0183..7ae3b6a16 100644 --- a/packages/test/src/test-bridge.ts +++ b/packages/test/src/test-bridge.ts @@ -241,6 +241,7 @@ const GenericConfigs = { attachServiceName: false, }, metricsExporter: null, + workerHeartbeatIntervalMillis: null, } satisfies native.RuntimeOptions, }, client: { @@ -298,7 +299,12 @@ const GenericConfigs = { initial: 5, maximum: 100, }, - enableNonLocalActivities: false, + taskTypes: { + enableWorkflows: true, + enableLocalActivities: false, + enableRemoteActivities: false, + enableNexus: false, + }, stickyQueueScheduleToStartTimeout: 1000, maxCachedWorkflows: 1000, maxHeartbeatThrottleInterval: 1000, @@ -306,6 +312,7 @@ const GenericConfigs = { maxTaskQueueActivitiesPerSecond: null, maxActivitiesPerSecond: null, shutdownGraceTime: 1000, + plugins: [], } satisfies native.WorkerOptions, }, ephemeralServer: { diff --git a/packages/test/src/test-local-activities.ts b/packages/test/src/test-local-activities.ts index ae73518b9..7f92bc360 100644 --- a/packages/test/src/test-local-activities.ts +++ b/packages/test/src/test-local-activities.ts @@ -657,3 +657,24 @@ test.serial('retryPolicy is set correctly', async (t) => { t.deepEqual(await executeWorkflow(getRetryPolicyFromActivityInfo, { args: [retryPolicy, false] }), retryPolicy); }); }); + +export async function runLocalActivityWithNonLocalActivitiesDisabled(): Promise { + const { echo } = workflow.proxyLocalActivities({ startToCloseTimeout: '1m' }); + return await echo('hello from local activity'); +} + +test.serial('Local activities work when enableNonLocalActivities is false', async (t) => { + const { executeWorkflow, createWorker } = helpers(t); + const worker = await createWorker({ + activities: { + async echo(message: string): Promise { + return message; + }, + }, + enableNonLocalActivities: false, + }); + await worker.runUntil(async () => { + const result = await executeWorkflow(runLocalActivityWithNonLocalActivitiesDisabled); + t.is(result, 'hello from local activity'); + }); +}); diff --git a/packages/worker/src/runtime-options.ts b/packages/worker/src/runtime-options.ts index d38db5a87..f74fc6150 100644 --- a/packages/worker/src/runtime-options.ts +++ b/packages/worker/src/runtime-options.ts @@ -32,6 +32,14 @@ export interface RuntimeOptions { */ telemetryOptions?: TelemetryOptions; + /** + * Interval for worker heartbeats. `null` disables heartbeating. + * + * @format number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string} + * @default 60000 (60 seconds) + */ + workerHeartbeatInterval?: Duration | null; + /** * Automatically shutdown workers on any of these signals. * @@ -359,7 +367,7 @@ export interface PrometheusMetricsExporter { */ export interface CompiledRuntimeOptions { shutdownSignals: NodeJS.Signals[]; - telemetryOptions: native.RuntimeOptions; + runtimeOptions: native.RuntimeOptions; logger: Logger; } @@ -367,10 +375,17 @@ export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions const { metrics, noTemporalPrefixForMetrics } = options.telemetryOptions ?? {}; // eslint-disable-line deprecation/deprecation const [logger, logExporter] = compileLoggerOptions(options); + let workerHeartbeatIntervalMillis: number | null; + if (options.workerHeartbeatInterval === null) { + workerHeartbeatIntervalMillis = null; + } else { + workerHeartbeatIntervalMillis = msToNumber(options.workerHeartbeatInterval ?? '60s'); + } + return { logger, shutdownSignals: options.shutdownSignals ?? ['SIGINT', 'SIGTERM', 'SIGQUIT', 'SIGUSR2'], - telemetryOptions: { + runtimeOptions: { logExporter, telemetry: { metricPrefix: metrics?.metricPrefix ?? (noTemporalPrefixForMetrics ? '' : 'temporal_'), @@ -400,6 +415,7 @@ export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions globalTags: metrics.globalTags ?? {}, } satisfies native.MetricExporterOptions) : null, + workerHeartbeatIntervalMillis, }, }; } diff --git a/packages/worker/src/runtime.ts b/packages/worker/src/runtime.ts index 49bfc0f5a..e98227e68 100644 --- a/packages/worker/src/runtime.ts +++ b/packages/worker/src/runtime.ts @@ -50,7 +50,7 @@ export class Runtime { public readonly options: CompiledRuntimeOptions ) { this.logger = options.logger; - this.metricMeter = options.telemetryOptions.metricsExporter + this.metricMeter = options.runtimeOptions.metricsExporter ? MetricMeterWithComposedTags.compose(new RuntimeMetricMeter(this.native), {}, true) : noopMetricMeter; @@ -97,7 +97,7 @@ export class Runtime { */ protected static create(options: RuntimeOptions, instantiator: 'install' | 'instance'): Runtime { const compiledOptions = compileOptions(options); - const runtime = native.newRuntime(compiledOptions.telemetryOptions); + const runtime = native.newRuntime(compiledOptions.runtimeOptions); // Remember the provided options in case Core is reinstantiated after being shut down this.defaultOptions = options; diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index f04771623..e738b6234 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -1078,6 +1078,8 @@ function nexusServiceRegistryFromOptions(opts: WorkerOptions): nexus.ServiceRegi } export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): native.WorkerOptions { + const enableWorkflows = opts.workflowBundle !== undefined || opts.workflowsPath !== undefined; + const enableLocalActivities = enableWorkflows && opts.activities.size > 0; return { identity: opts.identity, buildId: opts.buildId, // eslint-disable-line deprecation/deprecation @@ -1090,7 +1092,12 @@ export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): n workflowTaskPollerBehavior: toNativeTaskPollerBehavior(opts.workflowTaskPollerBehavior), activityTaskPollerBehavior: toNativeTaskPollerBehavior(opts.activityTaskPollerBehavior), nexusTaskPollerBehavior: toNativeTaskPollerBehavior(opts.nexusTaskPollerBehavior), - enableNonLocalActivities: opts.enableNonLocalActivities, + taskTypes: { + enableWorkflows, + enableLocalActivities, + enableRemoteActivities: opts.enableNonLocalActivities && opts.activities.size > 0, + enableNexus: opts.nexusServiceRegistry !== undefined, + }, stickyQueueScheduleToStartTimeout: msToNumber(opts.stickyQueueScheduleToStartTimeout), maxCachedWorkflows: opts.maxCachedWorkflows, maxHeartbeatThrottleInterval: msToNumber(opts.maxHeartbeatThrottleInterval), @@ -1098,6 +1105,7 @@ export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): n maxTaskQueueActivitiesPerSecond: opts.maxTaskQueueActivitiesPerSecond ?? null, maxActivitiesPerSecond: opts.maxActivitiesPerSecond ?? null, shutdownGraceTime: msToNumber(opts.shutdownGraceTime), + plugins: opts.plugins?.map((p) => p.name) ?? [], }; }