Skip to content

Commit 1557d50

Browse files
committed
Worker heartbeating
1 parent 8c320f1 commit 1557d50

File tree

7 files changed

+70
-13
lines changed

7 files changed

+70
-13
lines changed

packages/core-bridge/src/runtime.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,13 @@ pub struct Runtime {
5959
pub fn runtime_new(
6060
bridge_options: config::RuntimeOptions,
6161
) -> BridgeResult<OpaqueOutboundHandle<Runtime>> {
62-
let (telemetry_options, metrics_options, logging_options) = bridge_options.try_into()?;
62+
let (telemetry_options, metrics_options, logging_options, worker_heartbeat_interval_millis) =
63+
bridge_options.try_into()?;
6364

6465
// Create core runtime which starts tokio multi-thread runtime
6566
let runtime_options = RuntimeOptionsBuilder::default()
6667
.telemetry_options(telemetry_options)
68+
.heartbeat_interval(worker_heartbeat_interval_millis.map(Duration::from_millis))
6769
.build()
6870
.context("Failed to build runtime options")?;
6971
let mut core_runtime = CoreRuntime::new(runtime_options, TokioRuntimeBuilder::default())
@@ -265,6 +267,7 @@ mod config {
265267
log_exporter: LogExporterOptions,
266268
telemetry: TelemetryOptions,
267269
metrics_exporter: Option<MetricsExporterOptions>,
270+
worker_heartbeat_interval_millis: Option<u64>,
268271
}
269272

270273
#[derive(Debug, Clone, TryFromJs)]
@@ -321,6 +324,7 @@ mod config {
321324
CoreTelemetryOptions,
322325
Option<super::BridgeMetricsExporter>,
323326
super::BridgeLogExporter,
327+
Option<u64>,
324328
)> for RuntimeOptions
325329
{
326330
type Error = BridgeError;
@@ -330,8 +334,16 @@ mod config {
330334
CoreTelemetryOptions,
331335
Option<super::BridgeMetricsExporter>,
332336
super::BridgeLogExporter,
337+
Option<u64>,
333338
)> {
334-
let (telemetry_logger, log_exporter) = match self.log_exporter {
339+
let Self {
340+
log_exporter,
341+
telemetry,
342+
metrics_exporter,
343+
worker_heartbeat_interval_millis,
344+
} = self;
345+
346+
let (telemetry_logger, log_exporter) = match log_exporter {
335347
LogExporterOptions::Console { filter } => (
336348
CoreTelemetryLogger::Console { filter },
337349
BridgeLogExporter::Console,
@@ -351,17 +363,21 @@ mod config {
351363
let mut telemetry_options = TelemetryOptionsBuilder::default();
352364
let telemetry_options = telemetry_options
353365
.logging(telemetry_logger)
354-
.metric_prefix(self.telemetry.metric_prefix)
355-
.attach_service_name(self.telemetry.attach_service_name)
366+
.metric_prefix(telemetry.metric_prefix)
367+
.attach_service_name(telemetry.attach_service_name)
356368
.build()
357369
.context("Failed to build telemetry options")?;
358370

359-
let metrics_exporter = self
360-
.metrics_exporter
371+
let metrics_exporter = metrics_exporter
361372
.map(std::convert::TryInto::try_into)
362373
.transpose()?;
363374

364-
Ok((telemetry_options, metrics_exporter, log_exporter))
375+
Ok((
376+
telemetry_options,
377+
metrics_exporter,
378+
log_exporter,
379+
worker_heartbeat_interval_millis,
380+
))
365381
}
366382
}
367383

packages/core-bridge/src/worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ pub fn worker_complete_nexus_task(
295295
.complete_nexus_task(nexus_completion)
296296
.await
297297
.map_err(|err| match err {
298-
CompleteNexusError::NexusNotEnabled {} => {
298+
CompleteNexusError::NexusNotEnabled => {
299299
BridgeError::UnexpectedError(format!("{err}"))
300300
}
301301
CompleteNexusError::MalformedNexusCompletion { reason } => BridgeError::TypeError {

packages/core-bridge/ts/native.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ export type JsonString<_T> = string;
4040
// Runtime
4141
////////////////////////////////////////////////////////////////////////////////////////////////////
4242

43-
export declare function newRuntime(telemOptions: RuntimeOptions): Runtime;
43+
export declare function newRuntime(runtimeOptions: RuntimeOptions): Runtime;
4444

4545
export declare function runtimeShutdown(runtime: Runtime): void;
4646

@@ -52,6 +52,7 @@ export type RuntimeOptions = {
5252
logExporter: LogExporterOptions;
5353
telemetry: TelemetryOptions;
5454
metricsExporter: MetricExporterOptions;
55+
workerHeartbeatIntervalMillis: Option<number>;
5556
};
5657

5758
export type TelemetryOptions = {

packages/test/src/test-bridge.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ const GenericConfigs = {
241241
attachServiceName: false,
242242
},
243243
metricsExporter: null,
244+
workerHeartbeatIntervalMillis: null,
244245
} satisfies native.RuntimeOptions,
245246
},
246247
client: {

packages/test/src/test-runtime.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,29 @@ if (RUN_INTEGRATION_TESTS) {
156156
t.is(logEntries.filter((x) => x.message.startsWith('workflow log ')).length, 5);
157157
t.is(logEntries.filter((x) => x.message.startsWith('final log')).length, 1);
158158
});
159+
160+
test.serial('Runtime handle heartbeat duration default', async (t) => {
161+
const runtime = Runtime.install({});
162+
const SIXTY_SECONDS = 60 * 1000;
163+
t.true(runtime.options.runtimeOptions.workerHeartbeatIntervalMillis === SIXTY_SECONDS);
164+
await runtime.shutdown();
165+
});
166+
167+
test.serial('Runtime handle heartbeat duration null', async (t) => {
168+
const runtime = Runtime.install({
169+
workerHeartbeatInterval: null,
170+
});
171+
t.true(runtime.options.runtimeOptions.workerHeartbeatIntervalMillis === null);
172+
await runtime.shutdown();
173+
});
174+
175+
test.serial('Runtime handle heartbeat duration undefined', async (t) => {
176+
const runtime = Runtime.install({
177+
workerHeartbeatInterval: 13 * 1000,
178+
});
179+
t.true(runtime.options.runtimeOptions.workerHeartbeatIntervalMillis === 13 * 1000);
180+
await runtime.shutdown();
181+
});
159182
}
160183

161184
export async function log5Times(): Promise<void> {

packages/worker/src/runtime-options.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ export interface RuntimeOptions {
3232
*/
3333
telemetryOptions?: TelemetryOptions;
3434

35+
/**
36+
* Interval for worker heartbeats. `null` disables heartbeating.
37+
*
38+
* @format number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string}
39+
* @default 60000 (60 seconds)
40+
*/
41+
workerHeartbeatInterval?: Duration | null;
42+
3543
/**
3644
* Automatically shutdown workers on any of these signals.
3745
*
@@ -359,18 +367,25 @@ export interface PrometheusMetricsExporter {
359367
*/
360368
export interface CompiledRuntimeOptions {
361369
shutdownSignals: NodeJS.Signals[];
362-
telemetryOptions: native.RuntimeOptions;
370+
runtimeOptions: native.RuntimeOptions;
363371
logger: Logger;
364372
}
365373

366374
export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions {
367375
const { metrics, noTemporalPrefixForMetrics } = options.telemetryOptions ?? {}; // eslint-disable-line deprecation/deprecation
368376
const [logger, logExporter] = compileLoggerOptions(options);
369377

378+
let workerHeartbeatIntervalMillis: number | null;
379+
if (options.workerHeartbeatInterval === null) {
380+
workerHeartbeatIntervalMillis = null;
381+
} else {
382+
workerHeartbeatIntervalMillis = msToNumber(options.workerHeartbeatInterval ?? '60s');
383+
}
384+
370385
return {
371386
logger,
372387
shutdownSignals: options.shutdownSignals ?? ['SIGINT', 'SIGTERM', 'SIGQUIT', 'SIGUSR2'],
373-
telemetryOptions: {
388+
runtimeOptions: {
374389
logExporter,
375390
telemetry: {
376391
metricPrefix: metrics?.metricPrefix ?? (noTemporalPrefixForMetrics ? '' : 'temporal_'),
@@ -400,6 +415,7 @@ export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions
400415
globalTags: metrics.globalTags ?? {},
401416
} satisfies native.MetricExporterOptions)
402417
: null,
418+
workerHeartbeatIntervalMillis,
403419
},
404420
};
405421
}

packages/worker/src/runtime.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ export class Runtime {
5050
public readonly options: CompiledRuntimeOptions
5151
) {
5252
this.logger = options.logger;
53-
this.metricMeter = options.telemetryOptions.metricsExporter
53+
this.metricMeter = options.runtimeOptions.metricsExporter
5454
? MetricMeterWithComposedTags.compose(new RuntimeMetricMeter(this.native), {}, true)
5555
: noopMetricMeter;
5656

@@ -97,7 +97,7 @@ export class Runtime {
9797
*/
9898
protected static create(options: RuntimeOptions, instantiator: 'install' | 'instance'): Runtime {
9999
const compiledOptions = compileOptions(options);
100-
const runtime = native.newRuntime(compiledOptions.telemetryOptions);
100+
const runtime = native.newRuntime(compiledOptions.runtimeOptions);
101101

102102
// Remember the provided options in case Core is reinstantiated after being shut down
103103
this.defaultOptions = options;

0 commit comments

Comments
 (0)