Skip to content

Commit 4eabd07

Browse files
committed
fix(pb): rewrite runner wf to handle batch signals
1 parent 321b2c3 commit 4eabd07

File tree

7 files changed

+1187
-10
lines changed

7 files changed

+1187
-10
lines changed

engine/packages/pegboard-runner/src/conn.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ pub async fn init_conn(
166166
};
167167

168168
// Forward to runner wf
169-
ctx.signal(pegboard::workflows::runner::Forward { inner: packet })
169+
ctx.signal(pegboard::workflows::runner2::Forward { inner: packet })
170170
.to_workflow_id(workflow_id)
171171
.send()
172172
.await

engine/packages/pegboard-runner/src/ping_task.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub async fn task(
5050
if let RunnerEligibility::ReEligible = notif.eligibility {
5151
tracing::debug!(runner_id=?notif.runner_id, "runner has become eligible again");
5252

53-
ctx.signal(pegboard::workflows::runner::CheckQueue {})
53+
ctx.signal(pegboard::workflows::runner2::CheckQueue {})
5454
.to_workflow_id(notif.workflow_id)
5555
.send()
5656
.await?;

engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ async fn handle_message(
342342
| protocol::ToServer::ToServerEvents(_)
343343
| protocol::ToServer::ToServerAckCommands(_)
344344
| protocol::ToServer::ToServerStopping => {
345-
ctx.signal(pegboard::workflows::runner::Forward {
345+
ctx.signal(pegboard::workflows::runner2::Forward {
346346
inner: protocol::ToServer::try_from(msg)
347347
.context("failed to convert message for workflow forwarding")?,
348348
})

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
319319
.await?;
320320

321321
// Send signal to stop actor now that we know it will be sleeping
322-
ctx.signal(crate::workflows::runner::Command {
322+
ctx.signal(crate::workflows::runner2::Command {
323323
inner: protocol::Command::CommandStopActor(
324324
protocol::CommandStopActor {
325325
actor_id: input.actor_id.to_string(),
@@ -348,7 +348,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
348348
})
349349
.await?;
350350

351-
ctx.signal(crate::workflows::runner::Command {
351+
ctx.signal(crate::workflows::runner2::Command {
352352
inner: protocol::Command::CommandStopActor(
353353
protocol::CommandStopActor {
354354
actor_id: input.actor_id.to_string(),
@@ -495,7 +495,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
495495
})
496496
.await?;
497497

498-
ctx.signal(crate::workflows::runner::Command {
498+
ctx.signal(crate::workflows::runner2::Command {
499499
inner: protocol::Command::CommandStopActor(protocol::CommandStopActor {
500500
actor_id: input.actor_id.to_string(),
501501
generation: state.generation,
@@ -509,7 +509,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
509509
Main::Destroy(_) => {
510510
// If allocated, send stop actor command
511511
if let Some(runner_workflow_id) = state.runner_workflow_id {
512-
ctx.signal(crate::workflows::runner::Command {
512+
ctx.signal(crate::workflows::runner2::Command {
513513
inner: protocol::Command::CommandStopActor(protocol::CommandStopActor {
514514
actor_id: input.actor_id.to_string(),
515515
generation: state.generation,
@@ -628,7 +628,7 @@ async fn handle_stopped(
628628
if let (StoppedVariant::Lost { .. }, Some(old_runner_workflow_id)) =
629629
(&variant, old_runner_workflow_id)
630630
{
631-
ctx.signal(crate::workflows::runner::Command {
631+
ctx.signal(crate::workflows::runner2::Command {
632632
inner: protocol::Command::CommandStopActor(protocol::CommandStopActor {
633633
actor_id: input.actor_id.to_string(),
634634
generation: state.generation,

engine/packages/pegboard/src/workflows/actor/runtime.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ pub async fn spawn_actor(
514514
.send()
515515
.await?;
516516

517-
ctx.signal(crate::workflows::runner::Command {
517+
ctx.signal(crate::workflows::runner2::Command {
518518
inner: protocol::Command::CommandStartActor(protocol::CommandStartActor {
519519
actor_id: input.actor_id.to_string(),
520520
generation,
@@ -563,7 +563,7 @@ pub async fn spawn_actor(
563563
})
564564
.await?;
565565

566-
ctx.signal(crate::workflows::runner::Command {
566+
ctx.signal(crate::workflows::runner2::Command {
567567
inner: protocol::Command::CommandStartActor(protocol::CommandStartActor {
568568
actor_id: input.actor_id.to_string(),
569569
generation,
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
pub mod actor;
22
pub mod runner;
3+
pub mod runner2;

0 commit comments

Comments
 (0)