Skip to content

Commit a4e1da3

Browse files
committed
fix(pb): add ability to timeout force resceduling pending state (#3502)
1 parent 84f0a3c commit a4e1da3

File tree

3 files changed

+158
-76
lines changed

3 files changed

+158
-76
lines changed

engine/packages/guard/src/routing/pegboard_gateway.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use rivet_guard_core::proxy_service::{RouteConfig, RouteTarget, RoutingOutput, R
88
use super::{SEC_WEBSOCKET_PROTOCOL, WS_PROTOCOL_ACTOR, WS_PROTOCOL_TOKEN, X_RIVET_TOKEN};
99
use crate::{errors, shared_state::SharedState};
1010

11+
const ACTOR_FORCE_WAKE_PENDING_TIMEOUT: i64 = util::duration::seconds(60);
1112
const ACTOR_READY_TIMEOUT: Duration = Duration::from_secs(10);
1213
pub const X_RIVET_ACTOR: HeaderName = HeaderName::from_static("x-rivet-actor");
1314

@@ -169,10 +170,14 @@ async fn route_request_inner(
169170
if actor.sleeping {
170171
tracing::debug!(?actor_id, "actor sleeping, waking");
171172

172-
ctx.signal(pegboard::workflows::actor::Wake {})
173-
.to_workflow_id(actor.workflow_id)
174-
.send()
175-
.await?;
173+
ctx.signal(pegboard::workflows::actor::Wake {
174+
allocation_override: pegboard::workflows::actor::AllocationOverride::DontSleep {
175+
pending_timeout: Some(ACTOR_FORCE_WAKE_PENDING_TIMEOUT),
176+
},
177+
})
178+
.to_workflow_id(actor.workflow_id)
179+
.send()
180+
.await?;
176181
}
177182

178183
let runner_id = if let (Some(runner_id), true) = (actor.runner_id, actor.connectable) {
@@ -193,10 +198,14 @@ async fn route_request_inner(
193198
tracing::debug!(?actor_id, ?wake_retries, "actor stopped while we were waiting for it to become ready, attempting rewake");
194199
wake_retries += 1;
195200

196-
let res = ctx.signal(pegboard::workflows::actor::Wake {})
197-
.to_workflow_id(actor.workflow_id)
198-
.send()
199-
.await;
201+
let res = ctx.signal(pegboard::workflows::actor::Wake {
202+
allocation_override: pegboard::workflows::actor::AllocationOverride::DontSleep {
203+
pending_timeout: Some(ACTOR_FORCE_WAKE_PENDING_TIMEOUT),
204+
},
205+
})
206+
.to_workflow_id(actor.workflow_id)
207+
.send()
208+
.await;
200209

201210
if let Some(WorkflowError::WorkflowNotFound) = res
202211
.as_ref()

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 58 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ mod keys;
1010
mod runtime;
1111
mod setup;
1212

13+
pub use runtime::AllocationOverride;
14+
1315
#[derive(Clone, Debug, Serialize, Deserialize, Hash)]
1416
pub struct Input {
1517
pub actor_id: Id,
@@ -211,38 +213,39 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
211213
.send()
212214
.await?;
213215

214-
let lifecycle_state = match runtime::spawn_actor(ctx, input, 0, false).await? {
215-
runtime::SpawnActorOutput::Allocated {
216-
runner_id,
217-
runner_workflow_id,
218-
} => runtime::LifecycleState::new(
219-
runner_id,
220-
runner_workflow_id,
221-
ctx.config().pegboard().actor_start_threshold(),
222-
),
223-
runtime::SpawnActorOutput::Sleep => {
224-
ctx.activity(runtime::SetSleepingInput {
225-
actor_id: input.actor_id,
226-
})
227-
.await?;
216+
let lifecycle_state =
217+
match runtime::spawn_actor(ctx, input, 0, AllocationOverride::None).await? {
218+
runtime::SpawnActorOutput::Allocated {
219+
runner_id,
220+
runner_workflow_id,
221+
} => runtime::LifecycleState::new(
222+
runner_id,
223+
runner_workflow_id,
224+
ctx.config().pegboard().actor_start_threshold(),
225+
),
226+
runtime::SpawnActorOutput::Sleep => {
227+
ctx.activity(runtime::SetSleepingInput {
228+
actor_id: input.actor_id,
229+
})
230+
.await?;
228231

229-
runtime::LifecycleState::new_sleeping()
230-
}
231-
runtime::SpawnActorOutput::Destroy => {
232-
// Destroyed early
233-
ctx.workflow(destroy::Input {
234-
namespace_id: input.namespace_id,
235-
actor_id: input.actor_id,
236-
name: input.name.clone(),
237-
key: input.key.clone(),
238-
generation: 0,
239-
})
240-
.output()
241-
.await?;
232+
runtime::LifecycleState::new_sleeping()
233+
}
234+
runtime::SpawnActorOutput::Destroy => {
235+
// Destroyed early
236+
ctx.workflow(destroy::Input {
237+
namespace_id: input.namespace_id,
238+
actor_id: input.actor_id,
239+
name: input.name.clone(),
240+
key: input.key.clone(),
241+
generation: 0,
242+
})
243+
.output()
244+
.await?;
242245

243-
return Ok(());
244-
}
245-
};
246+
return Ok(());
247+
}
248+
};
246249

247250
let lifecycle_res = ctx
248251
.loope(
@@ -273,10 +276,8 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
273276
} else {
274277
tracing::debug!(actor_id=?input.actor_id, "actor wake");
275278

276-
state.wake_for_alarm = true;
277-
278279
// Fake signal
279-
Main::Wake(Wake {})
280+
Main::Wake(Wake { allocation_override: AllocationOverride::DontSleep { pending_timeout: None } })
280281
}
281282
} else {
282283
// Listen for signal normally
@@ -404,14 +405,14 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
404405
}
405406
}
406407
}
407-
Main::Wake(_sig) => {
408+
Main::Wake(sig) => {
408409
if state.sleeping {
409410
if state.runner_id.is_none() {
410411
state.alarm_ts = None;
411412
state.sleeping = false;
412413
state.will_wake = false;
413414

414-
match runtime::reschedule_actor(ctx, &input, state, false)
415+
match runtime::reschedule_actor(ctx, &input, state, sig.allocation_override)
415416
.await?
416417
{
417418
runtime::SpawnActorOutput::Allocated { .. } => {}
@@ -425,8 +426,6 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
425426
}));
426427
}
427428
}
428-
429-
state.wake_for_alarm = false;
430429
} else if !state.will_wake {
431430
state.will_wake = true;
432431

@@ -440,8 +439,6 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
440439
actor_id=?input.actor_id,
441440
"cannot wake actor that is not sleeping",
442441
);
443-
444-
state.wake_for_alarm = false;
445442
}
446443
}
447444
Main::Lost(sig) => {
@@ -569,16 +566,15 @@ async fn handle_stopped(
569566
tracing::debug!(?variant, "actor stopped");
570567

571568
let force_reschedule = match &variant {
572-
StoppedVariant::Normal {
573-
code: protocol::StopCode::Ok,
574-
} => {
569+
StoppedVariant::Normal { code } => {
575570
// Reset retry count on successful exit
576-
state.reschedule_state = Default::default();
571+
if let protocol::StopCode::Ok = code {
572+
state.reschedule_state = Default::default();
573+
}
577574

578575
false
579576
}
580577
StoppedVariant::Lost { force_reschedule } => *force_reschedule,
581-
_ => false,
582578
};
583579

584580
// Clear stop gc timeout to prevent being marked as lost in the lifecycle loop
@@ -640,7 +636,16 @@ async fn handle_stopped(
640636

641637
// Reschedule no matter what
642638
if force_reschedule {
643-
match runtime::reschedule_actor(ctx, &input, state, true).await? {
639+
match runtime::reschedule_actor(
640+
ctx,
641+
&input,
642+
state,
643+
AllocationOverride::DontSleep {
644+
pending_timeout: None,
645+
},
646+
)
647+
.await?
648+
{
644649
runtime::SpawnActorOutput::Allocated { .. } => {}
645650
// NOTE: This should be unreachable because force_reschedule is true
646651
runtime::SpawnActorOutput::Sleep => {
@@ -667,7 +672,9 @@ async fn handle_stopped(
667672

668673
match (input.crash_policy, graceful_exit) {
669674
(CrashPolicy::Restart, false) => {
670-
match runtime::reschedule_actor(ctx, &input, state, false).await? {
675+
match runtime::reschedule_actor(ctx, &input, state, AllocationOverride::None)
676+
.await?
677+
{
671678
runtime::SpawnActorOutput::Allocated { .. } => {}
672679
// NOTE: Its not possible for `SpawnActorOutput::Sleep` to be returned here, the crash
673680
// policy is `Restart`.
@@ -698,7 +705,7 @@ async fn handle_stopped(
698705
else if state.will_wake {
699706
state.sleeping = false;
700707

701-
match runtime::reschedule_actor(ctx, &input, state, false).await? {
708+
match runtime::reschedule_actor(ctx, &input, state, AllocationOverride::None).await? {
702709
runtime::SpawnActorOutput::Allocated { .. } => {}
703710
runtime::SpawnActorOutput::Sleep => {
704711
state.sleeping = true;
@@ -708,7 +715,6 @@ async fn handle_stopped(
708715
}
709716
}
710717

711-
state.wake_for_alarm = false;
712718
state.will_wake = false;
713719
state.going_away = false;
714720

@@ -749,7 +755,10 @@ pub struct Event {
749755
}
750756

751757
#[signal("pegboard_actor_wake")]
752-
pub struct Wake {}
758+
pub struct Wake {
759+
#[serde(default)]
760+
pub allocation_override: AllocationOverride,
761+
}
753762

754763
#[derive(Debug)]
755764
#[signal("pegboard_actor_lost")]

0 commit comments

Comments
 (0)