Skip to content

Commit 31af31a

Browse files
committed
fix(gateway): prevent gc from removing hibernating in flight req, check actor started after sub when hibernating
1 parent 8b84eb1 commit 31af31a

File tree

8 files changed

+142
-89
lines changed

8 files changed

+142
-89
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/guard/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R
2828
}
2929

3030
// Share shared context
31-
let shared_state = shared_state::SharedState::new(ctx.ups()?);
31+
let shared_state = shared_state::SharedState::new(&config, ctx.ups()?);
3232
shared_state.start().await?;
3333

3434
// Create handlers

engine/packages/guard/src/shared_state.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ use universalpubsub::PubSub;
66
pub struct SharedState(Arc<SharedStateInner>);
77

88
impl SharedState {
9-
pub fn new(pubsub: PubSub) -> SharedState {
9+
pub fn new(config: &rivet_config::Config, pubsub: PubSub) -> SharedState {
1010
SharedState(Arc::new(SharedStateInner {
11-
pegboard_gateway: pegboard_gateway::shared_state::SharedState::new(pubsub),
11+
pegboard_gateway: pegboard_gateway::shared_state::SharedState::new(config, pubsub),
1212
}))
1313
}
1414

engine/packages/pegboard-gateway/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ hyper-tungstenite.workspace = true
1818
lazy_static.workspace = true
1919
pegboard.workspace = true
2020
rand.workspace = true
21+
rivet-config.workspace = true
2122
rivet-error.workspace = true
2223
rivet-guard-core.workspace = true
2324
rivet-metrics.workspace = true

engine/packages/pegboard-gateway/src/keepalive_task.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@ use std::time::Duration;
66
use tokio::sync::watch;
77

88
use super::LifecycleResult;
9+
use crate::shared_state::SharedState;
910

1011
/// Periodically pings writes keepalive in UDB. This is used to restore hibernating request IDs on
1112
/// next actor start.
1213
///
13-
///Only ran for hibernating requests.
14+
/// Only ran for hibernating requests.
15+
1416
pub async fn task(
17+
shared_state: SharedState,
1518
ctx: StandaloneCtx,
1619
actor_id: Id,
1720
gateway_id: GatewayId,
@@ -43,11 +46,14 @@ pub async fn task(
4346
let jitter = { rand::thread_rng().gen_range(0..128) };
4447
tokio::time::sleep(Duration::from_millis(jitter)).await;
4548

46-
ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input {
47-
actor_id,
48-
gateway_id,
49-
request_id,
50-
})
51-
.await?;
49+
tokio::try_join!(
50+
ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input {
51+
actor_id,
52+
gateway_id,
53+
request_id,
54+
}),
55+
// Keep alive in flight req during hibernation
56+
shared_state.keepalive_hws(request_id),
57+
)?;
5258
}
5359
}

engine/packages/pegboard-gateway/src/lib.rs

Lines changed: 46 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ impl CustomServeTrait for PegboardGateway {
167167
let InFlightRequestHandle {
168168
mut msg_rx,
169169
mut drop_rx,
170+
..
170171
} = self
171172
.shared_state
172173
.start_in_flight_request(tunnel_subject, request_id)
@@ -212,7 +213,7 @@ impl CustomServeTrait for PegboardGateway {
212213
}
213214
} else {
214215
tracing::warn!(
215-
request_id=?tunnel_id::request_id_to_string(&request_id),
216+
request_id=%tunnel_id::request_id_to_string(&request_id),
216217
"received no message response during request init",
217218
);
218219
break;
@@ -267,14 +268,14 @@ impl CustomServeTrait for PegboardGateway {
267268
Ok(response)
268269
}
269270

270-
#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, runner_id=?self.runner_id))]
271+
#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, runner_id=?self.runner_id, request_id=%tunnel_id::request_id_to_string(&request_id)))]
271272
async fn handle_websocket(
272273
&self,
273274
client_ws: WebSocketHandle,
274275
headers: &hyper::HeaderMap,
275276
_path: &str,
276277
_request_context: &mut RequestContext,
277-
unique_request_id: RequestId,
278+
request_id: RequestId,
278279
after_hibernation: bool,
279280
) -> Result<Option<CloseFrame>> {
280281
// Use the actor ID from the gateway instance
@@ -298,15 +299,20 @@ impl CustomServeTrait for PegboardGateway {
298299
pegboard::pubsub_subjects::RunnerReceiverSubject::new(self.runner_id).to_string();
299300

300301
// Start listening for WebSocket messages
301-
let request_id = unique_request_id;
302302
let InFlightRequestHandle {
303303
mut msg_rx,
304304
mut drop_rx,
305+
new,
305306
} = self
306307
.shared_state
307308
.start_in_flight_request(tunnel_subject.clone(), request_id)
308309
.await;
309310

311+
ensure!(
312+
!after_hibernation || !new,
313+
"should not be creating a new in flight entry after hibernation"
314+
);
315+
310316
// If we are reconnecting after hibernation, don't send an open message
311317
let can_hibernate = if after_hibernation {
312318
true
@@ -348,7 +354,7 @@ impl CustomServeTrait for PegboardGateway {
348354
}
349355
} else {
350356
tracing::warn!(
351-
request_id=?tunnel_id::request_id_to_string(&request_id),
357+
request_id=%tunnel_id::request_id_to_string(&request_id),
352358
"received no message response during ws init",
353359
);
354360
break;
@@ -416,17 +422,23 @@ impl CustomServeTrait for PegboardGateway {
416422
request_id,
417423
ping_abort_rx,
418424
));
425+
let keepalive = if can_hibernate {
426+
Some(tokio::spawn(keepalive_task::task(
427+
self.shared_state.clone(),
428+
self.ctx.clone(),
429+
self.actor_id,
430+
self.shared_state.gateway_id(),
431+
request_id,
432+
keepalive_abort_rx,
433+
)))
434+
} else {
435+
None
436+
};
419437

420438
let tunnel_to_ws_abort_tx2 = tunnel_to_ws_abort_tx.clone();
421439
let ws_to_tunnel_abort_tx2 = ws_to_tunnel_abort_tx.clone();
422440
let ping_abort_tx2 = ping_abort_tx.clone();
423441

424-
// Clone variables needed for keepalive task
425-
let ctx_clone = self.ctx.clone();
426-
let actor_id_clone = self.actor_id;
427-
let gateway_id_clone = self.shared_state.gateway_id();
428-
let request_id_clone = request_id;
429-
430442
// Wait for all tasks to complete
431443
let (tunnel_to_ws_res, ws_to_tunnel_res, ping_res, keepalive_res) = tokio::join!(
432444
async {
@@ -478,17 +490,9 @@ impl CustomServeTrait for PegboardGateway {
478490
res
479491
},
480492
async {
481-
if !can_hibernate {
493+
let Some(keepalive) = keepalive else {
482494
return Ok(LifecycleResult::Aborted);
483-
}
484-
485-
let keepalive = tokio::spawn(keepalive_task::task(
486-
ctx_clone,
487-
actor_id_clone,
488-
gateway_id_clone,
489-
request_id_clone,
490-
keepalive_abort_rx,
491-
));
495+
};
492496

493497
let res = keepalive.await?;
494498

@@ -568,14 +572,12 @@ impl CustomServeTrait for PegboardGateway {
568572
}
569573
}
570574

571-
#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id))]
575+
#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, request_id=%tunnel_id::request_id_to_string(&request_id)))]
572576
async fn handle_websocket_hibernation(
573577
&self,
574578
client_ws: WebSocketHandle,
575-
unique_request_id: RequestId,
579+
request_id: RequestId,
576580
) -> Result<HibernationResult> {
577-
let request_id = unique_request_id;
578-
579581
// Insert hibernating request entry before checking for pending messages
580582
// This ensures the entry exists even if we immediately rewake the actor
581583
self.ctx
@@ -592,21 +594,19 @@ impl CustomServeTrait for PegboardGateway {
592594
.has_pending_websocket_messages(request_id)
593595
.await?
594596
{
595-
tracing::debug!(
596-
?unique_request_id,
597-
"detected pending requests on websocket hibernation, rewaking actor"
598-
);
597+
tracing::debug!("exiting hibernating due to pending messages");
599598

600599
return Ok(HibernationResult::Continue);
601600
}
602601

603602
// Start keepalive task
604603
let (keepalive_abort_tx, keepalive_abort_rx) = watch::channel(());
605604
let keepalive_handle = tokio::spawn(keepalive_task::task(
605+
self.shared_state.clone(),
606606
self.ctx.clone(),
607607
self.actor_id,
608608
self.shared_state.gateway_id(),
609-
unique_request_id,
609+
request_id,
610610
keepalive_abort_rx,
611611
));
612612

@@ -623,7 +623,7 @@ impl CustomServeTrait for PegboardGateway {
623623
.op(pegboard::ops::actor::hibernating_request::delete::Input {
624624
actor_id: self.actor_id,
625625
gateway_id: self.shared_state.gateway_id(),
626-
request_id: unique_request_id,
626+
request_id,
627627
})
628628
.await?;
629629
}
@@ -643,6 +643,21 @@ impl PegboardGateway {
643643
.subscribe::<pegboard::workflows::actor::Ready>(("actor_id", self.actor_id))
644644
.await?;
645645

646+
// Fetch actor info after sub to prevent race condition
647+
if let Some(actor) = self
648+
.ctx
649+
.op(pegboard::ops::actor::get_for_gateway::Input {
650+
actor_id: self.actor_id,
651+
})
652+
.await?
653+
{
654+
if actor.runner_id.is_some() {
655+
tracing::debug!("actor became ready during hibernation");
656+
657+
return Ok(HibernationResult::Continue);
658+
}
659+
}
660+
646661
let res = tokio::select! {
647662
_ = ready_sub.next() => {
648663
tracing::debug!("actor became ready during hibernation");

0 commit comments

Comments
 (0)