Skip to content

Commit f824e24

Browse files
committed
bump core to 45b1d7e, plumb plugins to core, add LA test
1 parent f4ec862 commit f824e24

File tree

6 files changed

+81
-6
lines changed

6 files changed

+81
-6
lines changed

packages/core-bridge/sdk-core

Submodule sdk-core updated 70 files

packages/core-bridge/src/worker.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,9 @@ pub fn worker_complete_workflow_activation(
166166
),
167167
}
168168
}
169+
CompleteWfError::WorkflowNotEnabled => {
170+
BridgeError::UnexpectedError(format!("{err}"))
171+
}
169172
})
170173
})
171174
}
@@ -225,6 +228,9 @@ pub fn worker_complete_activity_task(
225228
field: None,
226229
message: format!("Malformed Activity Completion: {reason:?}"),
227230
},
231+
CompleteActivityError::ActivityNotEnabled => {
232+
BridgeError::UnexpectedError(format!("{err}"))
233+
}
228234
})
229235
})
230236
}
@@ -466,6 +472,7 @@ mod config {
466472
use std::{sync::Arc, time::Duration};
467473

468474
use temporalio_common::protos::temporal::api::enums::v1::VersioningBehavior as CoreVersioningBehavior;
475+
use temporalio_common::protos::temporal::api::worker::v1::PluginInfo;
469476
use temporalio_common::worker::{
470477
ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind,
471478
PollerBehavior as CorePollerBehavior, SlotKind, WorkerConfig, WorkerConfigBuilder,
@@ -499,14 +506,15 @@ mod config {
499506
workflow_task_poller_behavior: PollerBehavior,
500507
activity_task_poller_behavior: PollerBehavior,
501508
nexus_task_poller_behavior: PollerBehavior,
502-
enable_non_local_activities: bool,
509+
task_types: WorkerTaskTypes,
503510
sticky_queue_schedule_to_start_timeout: Duration,
504511
max_cached_workflows: usize,
505512
max_heartbeat_throttle_interval: Duration,
506513
default_heartbeat_throttle_interval: Duration,
507514
max_activities_per_second: Option<f64>,
508515
max_task_queue_activities_per_second: Option<f64>,
509516
shutdown_grace_time: Option<Duration>,
517+
plugins: Vec<String>,
510518
}
511519

512520
#[derive(TryFromJs)]
@@ -540,6 +548,25 @@ mod config {
540548
AutoUpgrade,
541549
}
542550

551+
#[derive(TryFromJs)]
552+
pub struct WorkerTaskTypes {
553+
enable_workflows: bool,
554+
enable_local_activities: bool,
555+
enable_remote_activities: bool,
556+
enable_nexus: bool,
557+
}
558+
559+
impl From<&WorkerTaskTypes> for temporalio_common::worker::WorkerTaskTypes {
560+
fn from(t: &WorkerTaskTypes) -> Self {
561+
Self {
562+
enable_workflows: t.enable_workflows,
563+
enable_local_activities: t.enable_local_activities,
564+
enable_remote_activities: t.enable_remote_activities,
565+
enable_nexus: t.enable_nexus,
566+
}
567+
}
568+
}
569+
543570
impl BridgeWorkerOptions {
544571
pub(crate) fn into_core_config(self) -> Result<WorkerConfig, WorkerConfigBuilderError> {
545572
// Set all other options
@@ -566,14 +593,23 @@ mod config {
566593
.workflow_task_poller_behavior(self.workflow_task_poller_behavior)
567594
.activity_task_poller_behavior(self.activity_task_poller_behavior)
568595
.nexus_task_poller_behavior(self.nexus_task_poller_behavior)
569-
.no_remote_activities(!self.enable_non_local_activities)
596+
.task_types(&self.task_types)
570597
.sticky_queue_schedule_to_start_timeout(self.sticky_queue_schedule_to_start_timeout)
571598
.max_cached_workflows(self.max_cached_workflows)
572599
.max_heartbeat_throttle_interval(self.max_heartbeat_throttle_interval)
573600
.default_heartbeat_throttle_interval(self.default_heartbeat_throttle_interval)
574601
.max_task_queue_activities_per_second(self.max_task_queue_activities_per_second)
575602
.max_worker_activities_per_second(self.max_activities_per_second)
576603
.graceful_shutdown_period(self.shutdown_grace_time)
604+
.plugins(
605+
self.plugins
606+
.into_iter()
607+
.map(|name| PluginInfo {
608+
name,
609+
version: String::new(),
610+
})
611+
.collect::<Vec<_>>(),
612+
)
577613
.build()
578614
}
579615
}

packages/core-bridge/ts/native.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,14 +213,20 @@ export interface WorkerOptions {
213213
workflowTaskPollerBehavior: PollerBehavior;
214214
activityTaskPollerBehavior: PollerBehavior;
215215
nexusTaskPollerBehavior: PollerBehavior;
216-
enableNonLocalActivities: boolean;
216+
taskTypes: {
217+
enableWorkflows: boolean;
218+
enableLocalActivities: boolean;
219+
enableRemoteActivities: boolean;
220+
enableNexus: boolean;
221+
};
217222
stickyQueueScheduleToStartTimeout: number;
218223
maxCachedWorkflows: number;
219224
maxHeartbeatThrottleInterval: number;
220225
defaultHeartbeatThrottleInterval: number;
221226
maxTaskQueueActivitiesPerSecond: Option<number>;
222227
maxActivitiesPerSecond: Option<number>;
223228
shutdownGraceTime: number;
229+
plugins: string[];
224230
}
225231

226232
export type PollerBehavior =

packages/test/src/test-bridge.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,14 +298,20 @@ const GenericConfigs = {
298298
initial: 5,
299299
maximum: 100,
300300
},
301-
enableNonLocalActivities: false,
301+
taskTypes: {
302+
enableWorkflows: true,
303+
enableLocalActivities: false,
304+
enableRemoteActivities: false,
305+
enableNexus: false,
306+
},
302307
stickyQueueScheduleToStartTimeout: 1000,
303308
maxCachedWorkflows: 1000,
304309
maxHeartbeatThrottleInterval: 1000,
305310
defaultHeartbeatThrottleInterval: 1000,
306311
maxTaskQueueActivitiesPerSecond: null,
307312
maxActivitiesPerSecond: null,
308313
shutdownGraceTime: 1000,
314+
plugins: [],
309315
} satisfies native.WorkerOptions,
310316
},
311317
ephemeralServer: {

packages/test/src/test-local-activities.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,3 +657,24 @@ test.serial('retryPolicy is set correctly', async (t) => {
657657
t.deepEqual(await executeWorkflow(getRetryPolicyFromActivityInfo, { args: [retryPolicy, false] }), retryPolicy);
658658
});
659659
});
660+
661+
export async function runLocalActivityWithNonLocalActivitiesDisabled(): Promise<string> {
662+
const { echo } = workflow.proxyLocalActivities({ startToCloseTimeout: '1m' });
663+
return await echo('hello from local activity');
664+
}
665+
666+
test.serial('Local activities work when enableNonLocalActivities is false', async (t) => {
667+
const { executeWorkflow, createWorker } = helpers(t);
668+
const worker = await createWorker({
669+
activities: {
670+
async echo(message: string): Promise<string> {
671+
return message;
672+
},
673+
},
674+
enableNonLocalActivities: false,
675+
});
676+
await worker.runUntil(async () => {
677+
const result = await executeWorkflow(runLocalActivityWithNonLocalActivitiesDisabled);
678+
t.is(result, 'hello from local activity');
679+
});
680+
});

packages/worker/src/worker-options.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1090,14 +1090,20 @@ export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): n
10901090
workflowTaskPollerBehavior: toNativeTaskPollerBehavior(opts.workflowTaskPollerBehavior),
10911091
activityTaskPollerBehavior: toNativeTaskPollerBehavior(opts.activityTaskPollerBehavior),
10921092
nexusTaskPollerBehavior: toNativeTaskPollerBehavior(opts.nexusTaskPollerBehavior),
1093-
enableNonLocalActivities: opts.enableNonLocalActivities,
1093+
taskTypes: {
1094+
enableWorkflows: opts.workflowBundle !== undefined || opts.workflowsPath !== undefined,
1095+
enableLocalActivities: opts.maxCachedWorkflows > 0 && opts.activities.size > 0,
1096+
enableRemoteActivities: opts.enableNonLocalActivities && opts.activities.size > 0,
1097+
enableNexus: opts.nexusServiceRegistry !== undefined,
1098+
},
10941099
stickyQueueScheduleToStartTimeout: msToNumber(opts.stickyQueueScheduleToStartTimeout),
10951100
maxCachedWorkflows: opts.maxCachedWorkflows,
10961101
maxHeartbeatThrottleInterval: msToNumber(opts.maxHeartbeatThrottleInterval),
10971102
defaultHeartbeatThrottleInterval: msToNumber(opts.defaultHeartbeatThrottleInterval),
10981103
maxTaskQueueActivitiesPerSecond: opts.maxTaskQueueActivitiesPerSecond ?? null,
10991104
maxActivitiesPerSecond: opts.maxActivitiesPerSecond ?? null,
11001105
shutdownGraceTime: msToNumber(opts.shutdownGraceTime),
1106+
plugins: opts.plugins?.map((p) => p.name) ?? [],
11011107
};
11021108
}
11031109

0 commit comments

Comments
 (0)