From f824e24bdff6d3e6d9acbee43716ad9188c4a327 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 25 Nov 2025 17:06:57 -0800 Subject: [PATCH 01/12] bump core to 45b1d7e, plumb plugins to core, add LA test --- packages/core-bridge/sdk-core | 2 +- packages/core-bridge/src/worker.rs | 40 ++++++++++++++++++++-- packages/core-bridge/ts/native.ts | 8 ++++- packages/test/src/test-bridge.ts | 8 ++++- packages/test/src/test-local-activities.ts | 21 ++++++++++++ packages/worker/src/worker-options.ts | 8 ++++- 6 files changed, 81 insertions(+), 6 deletions(-) diff --git a/packages/core-bridge/sdk-core b/packages/core-bridge/sdk-core index 850db67c8..45b1d7edf 160000 --- a/packages/core-bridge/sdk-core +++ b/packages/core-bridge/sdk-core @@ -1 +1 @@ -Subproject commit 850db67c87ac9208da53df1cd82f8a36d71c5227 +Subproject commit 45b1d7edf7138eb6e307a0acddccf970f19ee73e diff --git a/packages/core-bridge/src/worker.rs b/packages/core-bridge/src/worker.rs index d0d0cdff3..8c1175554 100644 --- a/packages/core-bridge/src/worker.rs +++ b/packages/core-bridge/src/worker.rs @@ -166,6 +166,9 @@ pub fn worker_complete_workflow_activation( ), } } + CompleteWfError::WorkflowNotEnabled => { + BridgeError::UnexpectedError(format!("{err}")) + } }) }) } @@ -225,6 +228,9 @@ pub fn worker_complete_activity_task( field: None, message: format!("Malformed Activity Completion: {reason:?}"), }, + CompleteActivityError::ActivityNotEnabled => { + BridgeError::UnexpectedError(format!("{err}")) + } }) }) } @@ -466,6 +472,7 @@ mod config { use std::{sync::Arc, time::Duration}; use temporalio_common::protos::temporal::api::enums::v1::VersioningBehavior as CoreVersioningBehavior; + use temporalio_common::protos::temporal::api::worker::v1::PluginInfo; use temporalio_common::worker::{ ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind, PollerBehavior as CorePollerBehavior, SlotKind, WorkerConfig, WorkerConfigBuilder, @@ -499,7 +506,7 @@ mod config { workflow_task_poller_behavior: PollerBehavior, activity_task_poller_behavior: PollerBehavior, nexus_task_poller_behavior: PollerBehavior, - enable_non_local_activities: bool, + task_types: WorkerTaskTypes, sticky_queue_schedule_to_start_timeout: Duration, max_cached_workflows: usize, max_heartbeat_throttle_interval: Duration, @@ -507,6 +514,7 @@ mod config { max_activities_per_second: Option, max_task_queue_activities_per_second: Option, shutdown_grace_time: Option, + plugins: Vec, } #[derive(TryFromJs)] @@ -540,6 +548,25 @@ mod config { AutoUpgrade, } + #[derive(TryFromJs)] + pub struct WorkerTaskTypes { + enable_workflows: bool, + enable_local_activities: bool, + enable_remote_activities: bool, + enable_nexus: bool, + } + + impl From<&WorkerTaskTypes> for temporalio_common::worker::WorkerTaskTypes { + fn from(t: &WorkerTaskTypes) -> Self { + Self { + enable_workflows: t.enable_workflows, + enable_local_activities: t.enable_local_activities, + enable_remote_activities: t.enable_remote_activities, + enable_nexus: t.enable_nexus, + } + } + } + impl BridgeWorkerOptions { pub(crate) fn into_core_config(self) -> Result { // Set all other options @@ -566,7 +593,7 @@ mod config { .workflow_task_poller_behavior(self.workflow_task_poller_behavior) .activity_task_poller_behavior(self.activity_task_poller_behavior) .nexus_task_poller_behavior(self.nexus_task_poller_behavior) - .no_remote_activities(!self.enable_non_local_activities) + .task_types(&self.task_types) .sticky_queue_schedule_to_start_timeout(self.sticky_queue_schedule_to_start_timeout) .max_cached_workflows(self.max_cached_workflows) .max_heartbeat_throttle_interval(self.max_heartbeat_throttle_interval) @@ -574,6 +601,15 @@ mod config { .max_task_queue_activities_per_second(self.max_task_queue_activities_per_second) .max_worker_activities_per_second(self.max_activities_per_second) .graceful_shutdown_period(self.shutdown_grace_time) + .plugins( + self.plugins + .into_iter() + .map(|name| PluginInfo { + name, + version: String::new(), + }) + .collect::>(), + ) .build() } } diff --git a/packages/core-bridge/ts/native.ts b/packages/core-bridge/ts/native.ts index 2ddd20a1c..d3269b366 100644 --- a/packages/core-bridge/ts/native.ts +++ b/packages/core-bridge/ts/native.ts @@ -213,7 +213,12 @@ export interface WorkerOptions { workflowTaskPollerBehavior: PollerBehavior; activityTaskPollerBehavior: PollerBehavior; nexusTaskPollerBehavior: PollerBehavior; - enableNonLocalActivities: boolean; + taskTypes: { + enableWorkflows: boolean; + enableLocalActivities: boolean; + enableRemoteActivities: boolean; + enableNexus: boolean; + }; stickyQueueScheduleToStartTimeout: number; maxCachedWorkflows: number; maxHeartbeatThrottleInterval: number; @@ -221,6 +226,7 @@ export interface WorkerOptions { maxTaskQueueActivitiesPerSecond: Option; maxActivitiesPerSecond: Option; shutdownGraceTime: number; + plugins: string[]; } export type PollerBehavior = diff --git a/packages/test/src/test-bridge.ts b/packages/test/src/test-bridge.ts index df14e0183..dd3a01373 100644 --- a/packages/test/src/test-bridge.ts +++ b/packages/test/src/test-bridge.ts @@ -298,7 +298,12 @@ const GenericConfigs = { initial: 5, maximum: 100, }, - enableNonLocalActivities: false, + taskTypes: { + enableWorkflows: true, + enableLocalActivities: false, + enableRemoteActivities: false, + enableNexus: false, + }, stickyQueueScheduleToStartTimeout: 1000, maxCachedWorkflows: 1000, maxHeartbeatThrottleInterval: 1000, @@ -306,6 +311,7 @@ const GenericConfigs = { maxTaskQueueActivitiesPerSecond: null, maxActivitiesPerSecond: null, shutdownGraceTime: 1000, + plugins: [], } satisfies native.WorkerOptions, }, ephemeralServer: { diff --git a/packages/test/src/test-local-activities.ts b/packages/test/src/test-local-activities.ts index ae73518b9..7f92bc360 100644 --- a/packages/test/src/test-local-activities.ts +++ b/packages/test/src/test-local-activities.ts @@ -657,3 +657,24 @@ test.serial('retryPolicy is set correctly', async (t) => { t.deepEqual(await executeWorkflow(getRetryPolicyFromActivityInfo, { args: [retryPolicy, false] }), retryPolicy); }); }); + +export async function runLocalActivityWithNonLocalActivitiesDisabled(): Promise { + const { echo } = workflow.proxyLocalActivities({ startToCloseTimeout: '1m' }); + return await echo('hello from local activity'); +} + +test.serial('Local activities work when enableNonLocalActivities is false', async (t) => { + const { executeWorkflow, createWorker } = helpers(t); + const worker = await createWorker({ + activities: { + async echo(message: string): Promise { + return message; + }, + }, + enableNonLocalActivities: false, + }); + await worker.runUntil(async () => { + const result = await executeWorkflow(runLocalActivityWithNonLocalActivitiesDisabled); + t.is(result, 'hello from local activity'); + }); +}); diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index f04771623..d3a124f7c 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -1090,7 +1090,12 @@ export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): n workflowTaskPollerBehavior: toNativeTaskPollerBehavior(opts.workflowTaskPollerBehavior), activityTaskPollerBehavior: toNativeTaskPollerBehavior(opts.activityTaskPollerBehavior), nexusTaskPollerBehavior: toNativeTaskPollerBehavior(opts.nexusTaskPollerBehavior), - enableNonLocalActivities: opts.enableNonLocalActivities, + taskTypes: { + enableWorkflows: opts.workflowBundle !== undefined || opts.workflowsPath !== undefined, + enableLocalActivities: opts.maxCachedWorkflows > 0 && opts.activities.size > 0, + enableRemoteActivities: opts.enableNonLocalActivities && opts.activities.size > 0, + enableNexus: opts.nexusServiceRegistry !== undefined, + }, stickyQueueScheduleToStartTimeout: msToNumber(opts.stickyQueueScheduleToStartTimeout), maxCachedWorkflows: opts.maxCachedWorkflows, maxHeartbeatThrottleInterval: msToNumber(opts.maxHeartbeatThrottleInterval), @@ -1098,6 +1103,7 @@ export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): n maxTaskQueueActivitiesPerSecond: opts.maxTaskQueueActivitiesPerSecond ?? null, maxActivitiesPerSecond: opts.maxActivitiesPerSecond ?? null, shutdownGraceTime: msToNumber(opts.shutdownGraceTime), + plugins: opts.plugins?.map((p) => p.name) ?? [], }; } From 0a9c2ff6c445afa14f57ed65a2b157d557cbcc54 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Tue, 21 Oct 2025 16:43:25 -0400 Subject: [PATCH 02/12] update core --- packages/core-bridge/src/runtime.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core-bridge/src/runtime.rs b/packages/core-bridge/src/runtime.rs index 24993b1bd..27f447302 100644 --- a/packages/core-bridge/src/runtime.rs +++ b/packages/core-bridge/src/runtime.rs @@ -64,7 +64,7 @@ pub fn runtime_new( // Create core runtime which starts tokio multi-thread runtime let runtime_options = RuntimeOptionsBuilder::default() .telemetry_options(telemetry_options) - .heartbeat_interval(None) + .heartbeat_interval(None) // TODO: change to pick up heartbeat_interval .build() .context("Failed to build runtime options")?; let mut core_runtime = CoreRuntime::new(runtime_options, TokioRuntimeBuilder::default()) From ab66408e49ffc928893d00a4cddf98c572a5a328 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Thu, 23 Oct 2025 13:09:43 -0400 Subject: [PATCH 03/12] add skipClientWorkerSetCheck to worker options --- packages/test/src/test-integration-workflows.ts | 2 ++ packages/test/src/test-sinks.ts | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 32a43781c..8fd501cd2 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -462,6 +462,7 @@ test('Worker requests Eager Activity Dispatch if possible', async (t) => { activities: { testActivity: () => 'workflow-and-activity-worker', }, + skipClientWorkerSetCheck: true, }); const handle = await startWorkflow(executeEagerActivity); await activityWorker.runUntil(workflowWorker.runUntil(handle.result())); @@ -501,6 +502,7 @@ test("Worker doesn't request Eager Activity Dispatch if no activities are regist const workflowWorker = await createWorker({ connection: workflowWorkerConnection, activities: {}, + skipClientWorkerSetCheck: true, }); const handle = await startWorkflow(dontExecuteEagerActivity); const result = await activityWorker.runUntil(workflowWorker.runUntil(handle.result())); diff --git a/packages/test/src/test-sinks.ts b/packages/test/src/test-sinks.ts index a7ed18b88..735922657 100644 --- a/packages/test/src/test-sinks.ts +++ b/packages/test/src/test-sinks.ts @@ -261,6 +261,7 @@ if (RUN_INTEGRATION_TESTS) { sinks, maxCachedWorkflows: 0, maxConcurrentWorkflowTaskExecutions: 2, + skipClientWorkerSetCheck: true, }); const client = new WorkflowClient(); await worker.runUntil(client.execute(workflows.logSinkTester, { taskQueue, workflowId: uuid4() })); @@ -307,6 +308,7 @@ if (RUN_INTEGRATION_TESTS) { ...defaultOptions, taskQueue, sinks, + skipClientWorkerSetCheck: true, }); const workflowId = uuid4(); await worker.runUntil(client.execute(workflows.logSinkTester, { taskQueue, workflowId })); @@ -320,6 +322,7 @@ if (RUN_INTEGRATION_TESTS) { { ...defaultOptions, sinks, + skipClientWorkerSetCheck: true, }, history, workflowId @@ -351,6 +354,7 @@ if (RUN_INTEGRATION_TESTS) { ...defaultOptions, taskQueue, sinks, + skipClientWorkerSetCheck: true, }); const client = new WorkflowClient(); const workflowId = uuid4(); @@ -367,6 +371,7 @@ if (RUN_INTEGRATION_TESTS) { { ...defaultOptions, sinks, + skipClientWorkerSetCheck: true, }, history, workflowId @@ -411,6 +416,7 @@ if (RUN_INTEGRATION_TESTS) { ...defaultOptions, taskQueue, sinks, + skipClientWorkerSetCheck: true, }); await worker.runUntil( client.execute(workflows.upsertAndReadSearchAttributes, { From 15f08a56c5d4837560d1d2560cb72974b546f20c Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Fri, 24 Oct 2025 16:03:39 -0400 Subject: [PATCH 04/12] remove public config setting --- packages/test/src/test-integration-workflows.ts | 2 -- packages/test/src/test-sinks.ts | 6 ------ 2 files changed, 8 deletions(-) diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 8fd501cd2..32a43781c 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -462,7 +462,6 @@ test('Worker requests Eager Activity Dispatch if possible', async (t) => { activities: { testActivity: () => 'workflow-and-activity-worker', }, - skipClientWorkerSetCheck: true, }); const handle = await startWorkflow(executeEagerActivity); await activityWorker.runUntil(workflowWorker.runUntil(handle.result())); @@ -502,7 +501,6 @@ test("Worker doesn't request Eager Activity Dispatch if no activities are regist const workflowWorker = await createWorker({ connection: workflowWorkerConnection, activities: {}, - skipClientWorkerSetCheck: true, }); const handle = await startWorkflow(dontExecuteEagerActivity); const result = await activityWorker.runUntil(workflowWorker.runUntil(handle.result())); diff --git a/packages/test/src/test-sinks.ts b/packages/test/src/test-sinks.ts index 735922657..a7ed18b88 100644 --- a/packages/test/src/test-sinks.ts +++ b/packages/test/src/test-sinks.ts @@ -261,7 +261,6 @@ if (RUN_INTEGRATION_TESTS) { sinks, maxCachedWorkflows: 0, maxConcurrentWorkflowTaskExecutions: 2, - skipClientWorkerSetCheck: true, }); const client = new WorkflowClient(); await worker.runUntil(client.execute(workflows.logSinkTester, { taskQueue, workflowId: uuid4() })); @@ -308,7 +307,6 @@ if (RUN_INTEGRATION_TESTS) { ...defaultOptions, taskQueue, sinks, - skipClientWorkerSetCheck: true, }); const workflowId = uuid4(); await worker.runUntil(client.execute(workflows.logSinkTester, { taskQueue, workflowId })); @@ -322,7 +320,6 @@ if (RUN_INTEGRATION_TESTS) { { ...defaultOptions, sinks, - skipClientWorkerSetCheck: true, }, history, workflowId @@ -354,7 +351,6 @@ if (RUN_INTEGRATION_TESTS) { ...defaultOptions, taskQueue, sinks, - skipClientWorkerSetCheck: true, }); const client = new WorkflowClient(); const workflowId = uuid4(); @@ -371,7 +367,6 @@ if (RUN_INTEGRATION_TESTS) { { ...defaultOptions, sinks, - skipClientWorkerSetCheck: true, }, history, workflowId @@ -416,7 +411,6 @@ if (RUN_INTEGRATION_TESTS) { ...defaultOptions, taskQueue, sinks, - skipClientWorkerSetCheck: true, }); await worker.runUntil( client.execute(workflows.upsertAndReadSearchAttributes, { From f5c1584e7472bcfc773b154b1a1194c908c5d5a0 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Fri, 24 Oct 2025 16:26:39 -0400 Subject: [PATCH 05/12] enable skip validation for replay workers --- packages/core-bridge/src/worker.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/core-bridge/src/worker.rs b/packages/core-bridge/src/worker.rs index 8c1175554..b1f559059 100644 --- a/packages/core-bridge/src/worker.rs +++ b/packages/core-bridge/src/worker.rs @@ -405,9 +405,10 @@ pub fn replay_worker_new( OpaqueOutboundHandle, OpaqueOutboundHandle, )> { - let config = config + let mut config = config .into_core_config() .context("Failed to convert WorkerOptions to CoreWorkerConfig")?; + config.skip_client_worker_set_check = true; let runtime = runtime.borrow()?.core_runtime.clone(); enter_sync!(runtime); From 4ea1c1ac19fa848a2ed10010fee2038f701d852d Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Mon, 27 Oct 2025 08:55:50 -0400 Subject: [PATCH 06/12] update core and remove bridge config altering --- packages/core-bridge/src/worker.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/core-bridge/src/worker.rs b/packages/core-bridge/src/worker.rs index b1f559059..8c1175554 100644 --- a/packages/core-bridge/src/worker.rs +++ b/packages/core-bridge/src/worker.rs @@ -405,10 +405,9 @@ pub fn replay_worker_new( OpaqueOutboundHandle, OpaqueOutboundHandle, )> { - let mut config = config + let config = config .into_core_config() .context("Failed to convert WorkerOptions to CoreWorkerConfig")?; - config.skip_client_worker_set_check = true; let runtime = runtime.borrow()?.core_runtime.clone(); enter_sync!(runtime); From de8793334dadbacf977ae98727409cd8f63eee94 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 28 Oct 2025 10:23:15 -0700 Subject: [PATCH 07/12] Worker heartbeating --- packages/core-bridge/src/runtime.rs | 31 +++++++++++++++++++------- packages/core-bridge/ts/native.ts | 3 ++- packages/test/src/test-bridge.ts | 1 + packages/test/src/test-runtime.ts | 23 +++++++++++++++++++ packages/worker/src/runtime-options.ts | 20 +++++++++++++++-- packages/worker/src/runtime.ts | 4 ++-- 6 files changed, 69 insertions(+), 13 deletions(-) diff --git a/packages/core-bridge/src/runtime.rs b/packages/core-bridge/src/runtime.rs index 27f447302..fae1d1449 100644 --- a/packages/core-bridge/src/runtime.rs +++ b/packages/core-bridge/src/runtime.rs @@ -59,12 +59,13 @@ pub struct Runtime { pub fn runtime_new( bridge_options: config::RuntimeOptions, ) -> BridgeResult> { - let (telemetry_options, metrics_options, logging_options) = bridge_options.try_into()?; + let (telemetry_options, metrics_options, logging_options, worker_heartbeat_interval_millis) = + bridge_options.try_into()?; // Create core runtime which starts tokio multi-thread runtime let runtime_options = RuntimeOptionsBuilder::default() .telemetry_options(telemetry_options) - .heartbeat_interval(None) // TODO: change to pick up heartbeat_interval + .heartbeat_interval(worker_heartbeat_interval_millis.map(Duration::from_millis)) .build() .context("Failed to build runtime options")?; let mut core_runtime = CoreRuntime::new(runtime_options, TokioRuntimeBuilder::default()) @@ -266,6 +267,7 @@ mod config { log_exporter: LogExporterOptions, telemetry: TelemetryOptions, metrics_exporter: Option, + worker_heartbeat_interval_millis: Option, } #[derive(Debug, Clone, TryFromJs)] @@ -322,6 +324,7 @@ mod config { CoreTelemetryOptions, Option, super::BridgeLogExporter, + Option, )> for RuntimeOptions { type Error = BridgeError; @@ -331,8 +334,16 @@ mod config { CoreTelemetryOptions, Option, super::BridgeLogExporter, + Option, )> { - let (telemetry_logger, log_exporter) = match self.log_exporter { + let Self { + log_exporter, + telemetry, + metrics_exporter, + worker_heartbeat_interval_millis, + } = self; + + let (telemetry_logger, log_exporter) = match log_exporter { LogExporterOptions::Console { filter } => ( CoreTelemetryLogger::Console { filter }, BridgeLogExporter::Console, @@ -352,17 +363,21 @@ mod config { let mut telemetry_options = TelemetryOptionsBuilder::default(); let telemetry_options = telemetry_options .logging(telemetry_logger) - .metric_prefix(self.telemetry.metric_prefix) - .attach_service_name(self.telemetry.attach_service_name) + .metric_prefix(telemetry.metric_prefix) + .attach_service_name(telemetry.attach_service_name) .build() .context("Failed to build telemetry options")?; - let metrics_exporter = self - .metrics_exporter + let metrics_exporter = metrics_exporter .map(std::convert::TryInto::try_into) .transpose()?; - Ok((telemetry_options, metrics_exporter, log_exporter)) + Ok(( + telemetry_options, + metrics_exporter, + log_exporter, + worker_heartbeat_interval_millis, + )) } } diff --git a/packages/core-bridge/ts/native.ts b/packages/core-bridge/ts/native.ts index d3269b366..099bf92f6 100644 --- a/packages/core-bridge/ts/native.ts +++ b/packages/core-bridge/ts/native.ts @@ -40,7 +40,7 @@ export type JsonString<_T> = string; // Runtime //////////////////////////////////////////////////////////////////////////////////////////////////// -export declare function newRuntime(telemOptions: RuntimeOptions): Runtime; +export declare function newRuntime(runtimeOptions: RuntimeOptions): Runtime; export declare function runtimeShutdown(runtime: Runtime): void; @@ -52,6 +52,7 @@ export type RuntimeOptions = { logExporter: LogExporterOptions; telemetry: TelemetryOptions; metricsExporter: MetricExporterOptions; + workerHeartbeatIntervalMillis: Option; }; export type TelemetryOptions = { diff --git a/packages/test/src/test-bridge.ts b/packages/test/src/test-bridge.ts index dd3a01373..7ae3b6a16 100644 --- a/packages/test/src/test-bridge.ts +++ b/packages/test/src/test-bridge.ts @@ -241,6 +241,7 @@ const GenericConfigs = { attachServiceName: false, }, metricsExporter: null, + workerHeartbeatIntervalMillis: null, } satisfies native.RuntimeOptions, }, client: { diff --git a/packages/test/src/test-runtime.ts b/packages/test/src/test-runtime.ts index 405e0a90b..3f0baaab9 100644 --- a/packages/test/src/test-runtime.ts +++ b/packages/test/src/test-runtime.ts @@ -156,6 +156,29 @@ if (RUN_INTEGRATION_TESTS) { t.is(logEntries.filter((x) => x.message.startsWith('workflow log ')).length, 5); t.is(logEntries.filter((x) => x.message.startsWith('final log')).length, 1); }); + + test.serial('Runtime handle heartbeat duration default', async (t) => { + const runtime = Runtime.install({}); + const SIXTY_SECONDS = 60 * 1000; + t.true(runtime.options.runtimeOptions.workerHeartbeatIntervalMillis === SIXTY_SECONDS); + await runtime.shutdown(); + }); + + test.serial('Runtime handle heartbeat duration null', async (t) => { + const runtime = Runtime.install({ + workerHeartbeatInterval: null, + }); + t.true(runtime.options.runtimeOptions.workerHeartbeatIntervalMillis === null); + await runtime.shutdown(); + }); + + test.serial('Runtime handle heartbeat duration undefined', async (t) => { + const runtime = Runtime.install({ + workerHeartbeatInterval: 13 * 1000, + }); + t.true(runtime.options.runtimeOptions.workerHeartbeatIntervalMillis === 13 * 1000); + await runtime.shutdown(); + }); } export async function log5Times(): Promise { diff --git a/packages/worker/src/runtime-options.ts b/packages/worker/src/runtime-options.ts index d38db5a87..f74fc6150 100644 --- a/packages/worker/src/runtime-options.ts +++ b/packages/worker/src/runtime-options.ts @@ -32,6 +32,14 @@ export interface RuntimeOptions { */ telemetryOptions?: TelemetryOptions; + /** + * Interval for worker heartbeats. `null` disables heartbeating. + * + * @format number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string} + * @default 60000 (60 seconds) + */ + workerHeartbeatInterval?: Duration | null; + /** * Automatically shutdown workers on any of these signals. * @@ -359,7 +367,7 @@ export interface PrometheusMetricsExporter { */ export interface CompiledRuntimeOptions { shutdownSignals: NodeJS.Signals[]; - telemetryOptions: native.RuntimeOptions; + runtimeOptions: native.RuntimeOptions; logger: Logger; } @@ -367,10 +375,17 @@ export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions const { metrics, noTemporalPrefixForMetrics } = options.telemetryOptions ?? {}; // eslint-disable-line deprecation/deprecation const [logger, logExporter] = compileLoggerOptions(options); + let workerHeartbeatIntervalMillis: number | null; + if (options.workerHeartbeatInterval === null) { + workerHeartbeatIntervalMillis = null; + } else { + workerHeartbeatIntervalMillis = msToNumber(options.workerHeartbeatInterval ?? '60s'); + } + return { logger, shutdownSignals: options.shutdownSignals ?? ['SIGINT', 'SIGTERM', 'SIGQUIT', 'SIGUSR2'], - telemetryOptions: { + runtimeOptions: { logExporter, telemetry: { metricPrefix: metrics?.metricPrefix ?? (noTemporalPrefixForMetrics ? '' : 'temporal_'), @@ -400,6 +415,7 @@ export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions globalTags: metrics.globalTags ?? {}, } satisfies native.MetricExporterOptions) : null, + workerHeartbeatIntervalMillis, }, }; } diff --git a/packages/worker/src/runtime.ts b/packages/worker/src/runtime.ts index 49bfc0f5a..e98227e68 100644 --- a/packages/worker/src/runtime.ts +++ b/packages/worker/src/runtime.ts @@ -50,7 +50,7 @@ export class Runtime { public readonly options: CompiledRuntimeOptions ) { this.logger = options.logger; - this.metricMeter = options.telemetryOptions.metricsExporter + this.metricMeter = options.runtimeOptions.metricsExporter ? MetricMeterWithComposedTags.compose(new RuntimeMetricMeter(this.native), {}, true) : noopMetricMeter; @@ -97,7 +97,7 @@ export class Runtime { */ protected static create(options: RuntimeOptions, instantiator: 'install' | 'instance'): Runtime { const compiledOptions = compileOptions(options); - const runtime = native.newRuntime(compiledOptions.telemetryOptions); + const runtime = native.newRuntime(compiledOptions.runtimeOptions); // Remember the provided options in case Core is reinstantiated after being shut down this.defaultOptions = options; From 3dce03386385b6aec01c8ae57433a3acd6dd3420 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 26 Nov 2025 17:57:26 -0800 Subject: [PATCH 08/12] removed non-behavior test --- packages/test/src/test-runtime.ts | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/packages/test/src/test-runtime.ts b/packages/test/src/test-runtime.ts index 3f0baaab9..405e0a90b 100644 --- a/packages/test/src/test-runtime.ts +++ b/packages/test/src/test-runtime.ts @@ -156,29 +156,6 @@ if (RUN_INTEGRATION_TESTS) { t.is(logEntries.filter((x) => x.message.startsWith('workflow log ')).length, 5); t.is(logEntries.filter((x) => x.message.startsWith('final log')).length, 1); }); - - test.serial('Runtime handle heartbeat duration default', async (t) => { - const runtime = Runtime.install({}); - const SIXTY_SECONDS = 60 * 1000; - t.true(runtime.options.runtimeOptions.workerHeartbeatIntervalMillis === SIXTY_SECONDS); - await runtime.shutdown(); - }); - - test.serial('Runtime handle heartbeat duration null', async (t) => { - const runtime = Runtime.install({ - workerHeartbeatInterval: null, - }); - t.true(runtime.options.runtimeOptions.workerHeartbeatIntervalMillis === null); - await runtime.shutdown(); - }); - - test.serial('Runtime handle heartbeat duration undefined', async (t) => { - const runtime = Runtime.install({ - workerHeartbeatInterval: 13 * 1000, - }); - t.true(runtime.options.runtimeOptions.workerHeartbeatIntervalMillis === 13 * 1000); - await runtime.shutdown(); - }); } export async function log5Times(): Promise { From afb9c9ffdaae29e768712a6fbfd3e549b2cce4db Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 1 Dec 2025 17:21:55 -0800 Subject: [PATCH 09/12] PR feedback, format --- packages/core-bridge/src/worker.rs | 7 ++++--- packages/worker/src/worker-options.ts | 2 ++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/core-bridge/src/worker.rs b/packages/core-bridge/src/worker.rs index 8c1175554..ae6989cf8 100644 --- a/packages/core-bridge/src/worker.rs +++ b/packages/core-bridge/src/worker.rs @@ -167,7 +167,7 @@ pub fn worker_complete_workflow_activation( } } CompleteWfError::WorkflowNotEnabled => { - BridgeError::UnexpectedError(format!("{err}")) + BridgeError::UnexpectedError(err.to_string()) } }) }) @@ -229,7 +229,7 @@ pub fn worker_complete_activity_task( message: format!("Malformed Activity Completion: {reason:?}"), }, CompleteActivityError::ActivityNotEnabled => { - BridgeError::UnexpectedError(format!("{err}")) + BridgeError::UnexpectedError(err.to_string()) } }) }) @@ -302,7 +302,7 @@ pub fn worker_complete_nexus_task( .await .map_err(|err| match err { CompleteNexusError::NexusNotEnabled => { - BridgeError::UnexpectedError(format!("{err}")) + BridgeError::UnexpectedError(err.to_string()) } CompleteNexusError::MalformedNexusCompletion { reason } => BridgeError::TypeError { field: None, @@ -549,6 +549,7 @@ mod config { } #[derive(TryFromJs)] + #[allow(clippy::struct_excessive_bools)] pub struct WorkerTaskTypes { enable_workflows: bool, enable_local_activities: bool, diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index d3a124f7c..2e9187f79 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -1078,6 +1078,8 @@ function nexusServiceRegistryFromOptions(opts: WorkerOptions): nexus.ServiceRegi } export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): native.WorkerOptions { + const enableWorkflows = opts.workflowBundle !== undefined || opts.workflowsPath !== undefined; + const enableLocalActivities = enableWorkflows && opts.activities.size > 0; return { identity: opts.identity, buildId: opts.buildId, // eslint-disable-line deprecation/deprecation From 826a363907b5c4ce33df277aca1e4d9ed3a2a037 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 1 Dec 2025 17:33:38 -0800 Subject: [PATCH 10/12] Oops, forgot a spot --- packages/worker/src/worker-options.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index 2e9187f79..e738b6234 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -1093,8 +1093,8 @@ export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): n activityTaskPollerBehavior: toNativeTaskPollerBehavior(opts.activityTaskPollerBehavior), nexusTaskPollerBehavior: toNativeTaskPollerBehavior(opts.nexusTaskPollerBehavior), taskTypes: { - enableWorkflows: opts.workflowBundle !== undefined || opts.workflowsPath !== undefined, - enableLocalActivities: opts.maxCachedWorkflows > 0 && opts.activities.size > 0, + enableWorkflows, + enableLocalActivities, enableRemoteActivities: opts.enableNonLocalActivities && opts.activities.size > 0, enableNexus: opts.nexusServiceRegistry !== undefined, }, From ebed34dbf9a696edd4acb0afaddfdddf14668489 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 2 Dec 2025 14:52:50 -0800 Subject: [PATCH 11/12] use 0 to disable heartbeat instead of null --- packages/worker/src/runtime-options.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/worker/src/runtime-options.ts b/packages/worker/src/runtime-options.ts index f74fc6150..c498d88fb 100644 --- a/packages/worker/src/runtime-options.ts +++ b/packages/worker/src/runtime-options.ts @@ -33,12 +33,12 @@ export interface RuntimeOptions { telemetryOptions?: TelemetryOptions; /** - * Interval for worker heartbeats. `null` disables heartbeating. + * Interval for worker heartbeats. Accepted range is between 1s and 60s. `0` disables heartbeating. * * @format number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string} * @default 60000 (60 seconds) */ - workerHeartbeatInterval?: Duration | null; + workerHeartbeatInterval?: Duration; /** * Automatically shutdown workers on any of these signals. @@ -376,7 +376,7 @@ export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions const [logger, logExporter] = compileLoggerOptions(options); let workerHeartbeatIntervalMillis: number | null; - if (options.workerHeartbeatInterval === null) { + if (options.workerHeartbeatInterval === 0) { workerHeartbeatIntervalMillis = null; } else { workerHeartbeatIntervalMillis = msToNumber(options.workerHeartbeatInterval ?? '60s'); From 3eb05dab8e71566f1daec34d798710014c579f21 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 2 Dec 2025 15:16:19 -0800 Subject: [PATCH 12/12] Clean up, fix edge case with 0s or non-number input --- packages/worker/src/runtime-options.ts | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/packages/worker/src/runtime-options.ts b/packages/worker/src/runtime-options.ts index c498d88fb..2d4a9cbc4 100644 --- a/packages/worker/src/runtime-options.ts +++ b/packages/worker/src/runtime-options.ts @@ -375,12 +375,7 @@ export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions const { metrics, noTemporalPrefixForMetrics } = options.telemetryOptions ?? {}; // eslint-disable-line deprecation/deprecation const [logger, logExporter] = compileLoggerOptions(options); - let workerHeartbeatIntervalMillis: number | null; - if (options.workerHeartbeatInterval === 0) { - workerHeartbeatIntervalMillis = null; - } else { - workerHeartbeatIntervalMillis = msToNumber(options.workerHeartbeatInterval ?? '60s'); - } + const heartbeatMillis = msToNumber(options.workerHeartbeatInterval ?? '60s'); return { logger, @@ -415,7 +410,7 @@ export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions globalTags: metrics.globalTags ?? {}, } satisfies native.MetricExporterOptions) : null, - workerHeartbeatIntervalMillis, + workerHeartbeatIntervalMillis: heartbeatMillis === 0 ? null : heartbeatMillis, }, }; }