Skip to content

Commit 2cb75a1

Browse files
committed
fix(gateway): prevent gc from removing hibernating in flight req, check actor started after sub when hibernating
1 parent 8b84eb1 commit 2cb75a1

File tree

4 files changed

+114
-82
lines changed

4 files changed

+114
-82
lines changed

engine/packages/pegboard-gateway/src/keepalive_task.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@ use std::time::Duration;
66
use tokio::sync::watch;
77

88
use super::LifecycleResult;
9+
use crate::shared_state::SharedState;
910

1011
/// Periodically pings writes keepalive in UDB. This is used to restore hibernating request IDs on
1112
/// next actor start.
1213
///
13-
///Only ran for hibernating requests.
14+
/// Only ran for hibernating requests.
15+
1416
pub async fn task(
17+
shared_state: SharedState,
1518
ctx: StandaloneCtx,
1619
actor_id: Id,
1720
gateway_id: GatewayId,
@@ -43,11 +46,14 @@ pub async fn task(
4346
let jitter = { rand::thread_rng().gen_range(0..128) };
4447
tokio::time::sleep(Duration::from_millis(jitter)).await;
4548

46-
ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input {
47-
actor_id,
48-
gateway_id,
49-
request_id,
50-
})
51-
.await?;
49+
tokio::try_join!(
50+
ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input {
51+
actor_id,
52+
gateway_id,
53+
request_id,
54+
}),
55+
// Keep alive in flight req during hibernation
56+
shared_state.keepalive_hws(request_id),
57+
)?;
5258
}
5359
}

engine/packages/pegboard-gateway/src/lib.rs

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ impl CustomServeTrait for PegboardGateway {
212212
}
213213
} else {
214214
tracing::warn!(
215-
request_id=?tunnel_id::request_id_to_string(&request_id),
215+
request_id=%tunnel_id::request_id_to_string(&request_id),
216216
"received no message response during request init",
217217
);
218218
break;
@@ -267,14 +267,14 @@ impl CustomServeTrait for PegboardGateway {
267267
Ok(response)
268268
}
269269

270-
#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, runner_id=?self.runner_id))]
270+
#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, runner_id=?self.runner_id, request_id=%tunnel_id::request_id_to_string(&request_id)))]
271271
async fn handle_websocket(
272272
&self,
273273
client_ws: WebSocketHandle,
274274
headers: &hyper::HeaderMap,
275275
_path: &str,
276276
_request_context: &mut RequestContext,
277-
unique_request_id: RequestId,
277+
request_id: RequestId,
278278
after_hibernation: bool,
279279
) -> Result<Option<CloseFrame>> {
280280
// Use the actor ID from the gateway instance
@@ -298,7 +298,6 @@ impl CustomServeTrait for PegboardGateway {
298298
pegboard::pubsub_subjects::RunnerReceiverSubject::new(self.runner_id).to_string();
299299

300300
// Start listening for WebSocket messages
301-
let request_id = unique_request_id;
302301
let InFlightRequestHandle {
303302
mut msg_rx,
304303
mut drop_rx,
@@ -348,7 +347,7 @@ impl CustomServeTrait for PegboardGateway {
348347
}
349348
} else {
350349
tracing::warn!(
351-
request_id=?tunnel_id::request_id_to_string(&request_id),
350+
request_id=%tunnel_id::request_id_to_string(&request_id),
352351
"received no message response during ws init",
353352
);
354353
break;
@@ -416,17 +415,23 @@ impl CustomServeTrait for PegboardGateway {
416415
request_id,
417416
ping_abort_rx,
418417
));
418+
let keepalive = if can_hibernate {
419+
Some(tokio::spawn(keepalive_task::task(
420+
self.shared_state.clone(),
421+
self.ctx.clone(),
422+
self.actor_id,
423+
self.shared_state.gateway_id(),
424+
request_id,
425+
keepalive_abort_rx,
426+
)))
427+
} else {
428+
None
429+
};
419430

420431
let tunnel_to_ws_abort_tx2 = tunnel_to_ws_abort_tx.clone();
421432
let ws_to_tunnel_abort_tx2 = ws_to_tunnel_abort_tx.clone();
422433
let ping_abort_tx2 = ping_abort_tx.clone();
423434

424-
// Clone variables needed for keepalive task
425-
let ctx_clone = self.ctx.clone();
426-
let actor_id_clone = self.actor_id;
427-
let gateway_id_clone = self.shared_state.gateway_id();
428-
let request_id_clone = request_id;
429-
430435
// Wait for all tasks to complete
431436
let (tunnel_to_ws_res, ws_to_tunnel_res, ping_res, keepalive_res) = tokio::join!(
432437
async {
@@ -478,17 +483,9 @@ impl CustomServeTrait for PegboardGateway {
478483
res
479484
},
480485
async {
481-
if !can_hibernate {
486+
let Some(keepalive) = keepalive else {
482487
return Ok(LifecycleResult::Aborted);
483-
}
484-
485-
let keepalive = tokio::spawn(keepalive_task::task(
486-
ctx_clone,
487-
actor_id_clone,
488-
gateway_id_clone,
489-
request_id_clone,
490-
keepalive_abort_rx,
491-
));
488+
};
492489

493490
let res = keepalive.await?;
494491

@@ -568,14 +565,12 @@ impl CustomServeTrait for PegboardGateway {
568565
}
569566
}
570567

571-
#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id))]
568+
#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, request_id=%tunnel_id::request_id_to_string(&request_id)))]
572569
async fn handle_websocket_hibernation(
573570
&self,
574571
client_ws: WebSocketHandle,
575-
unique_request_id: RequestId,
572+
request_id: RequestId,
576573
) -> Result<HibernationResult> {
577-
let request_id = unique_request_id;
578-
579574
// Insert hibernating request entry before checking for pending messages
580575
// This ensures the entry exists even if we immediately rewake the actor
581576
self.ctx
@@ -592,21 +587,19 @@ impl CustomServeTrait for PegboardGateway {
592587
.has_pending_websocket_messages(request_id)
593588
.await?
594589
{
595-
tracing::debug!(
596-
?unique_request_id,
597-
"detected pending requests on websocket hibernation, rewaking actor"
598-
);
590+
tracing::debug!("exiting hibernating due to pending messages");
599591

600592
return Ok(HibernationResult::Continue);
601593
}
602594

603595
// Start keepalive task
604596
let (keepalive_abort_tx, keepalive_abort_rx) = watch::channel(());
605597
let keepalive_handle = tokio::spawn(keepalive_task::task(
598+
self.shared_state.clone(),
606599
self.ctx.clone(),
607600
self.actor_id,
608601
self.shared_state.gateway_id(),
609-
unique_request_id,
602+
request_id,
610603
keepalive_abort_rx,
611604
));
612605

@@ -623,7 +616,7 @@ impl CustomServeTrait for PegboardGateway {
623616
.op(pegboard::ops::actor::hibernating_request::delete::Input {
624617
actor_id: self.actor_id,
625618
gateway_id: self.shared_state.gateway_id(),
626-
request_id: unique_request_id,
619+
request_id,
627620
})
628621
.await?;
629622
}
@@ -643,6 +636,21 @@ impl PegboardGateway {
643636
.subscribe::<pegboard::workflows::actor::Ready>(("actor_id", self.actor_id))
644637
.await?;
645638

639+
// Fetch actor info after sub to prevent race condition
640+
if let Some(actor) = self
641+
.ctx
642+
.op(pegboard::ops::actor::get_for_gateway::Input {
643+
actor_id: self.actor_id,
644+
})
645+
.await?
646+
{
647+
if actor.runner_id.is_some() {
648+
tracing::debug!("actor became ready during hibernation");
649+
650+
return Ok(HibernationResult::Continue);
651+
}
652+
}
653+
646654
let res = tokio::select! {
647655
_ = ready_sub.next() => {
648656
tracing::debug!("actor became ready during hibernation");

0 commit comments

Comments
 (0)