Skip to content

Commit 8d11332

Browse files
committed
fix(engine): ensure first keepalive tick is not skipped to prevent timeout on ws hibernation
1 parent bd10c4a commit 8d11332

File tree

3 files changed

+22
-23
lines changed

3 files changed

+22
-23
lines changed

engine/packages/pegboard-gateway/src/keepalive_task.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use anyhow::Result;
22
use gas::prelude::*;
3+
use pegboard::tunnel::id as tunnel_id;
34
use pegboard::tunnel::id::{GatewayId, RequestId};
45
use rand::Rng;
56
use std::time::Duration;
@@ -12,7 +13,6 @@ use crate::shared_state::SharedState;
1213
/// next actor start.
1314
///
1415
/// Only ran for hibernating requests.
15-
1616
pub async fn task(
1717
shared_state: SharedState,
1818
ctx: StandaloneCtx,
@@ -30,10 +30,6 @@ pub async fn task(
3030
));
3131
ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
3232

33-
// Discard the first tick since it fires immediately and we've already called this
34-
// above
35-
ping_interval.tick().await;
36-
3733
loop {
3834
tokio::select! {
3935
_ = ping_interval.tick() => {}
@@ -46,6 +42,13 @@ pub async fn task(
4642
let jitter = { rand::thread_rng().gen_range(0..128) };
4743
tokio::time::sleep(Duration::from_millis(jitter)).await;
4844

45+
tracing::debug!(
46+
%actor_id,
47+
gateway_id=%tunnel_id::gateway_id_to_string(&gateway_id),
48+
request_id=%tunnel_id::request_id_to_string(&request_id),
49+
"updating hws keepalive"
50+
);
51+
4952
tokio::try_join!(
5053
ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input {
5154
actor_id,

engine/packages/pegboard-gateway/src/lib.rs

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,20 @@ use hyper::{Request, Response, StatusCode};
88
use pegboard::tunnel::id::{self as tunnel_id, RequestId};
99
use rivet_error::*;
1010
use rivet_guard_core::{
11-
WebSocketHandle,
1211
custom_serve::{CustomServeTrait, HibernationResult},
1312
errors::{ServiceUnavailable, WebSocketServiceUnavailable},
14-
proxy_service::{ResponseBody, is_ws_hibernate},
13+
proxy_service::{is_ws_hibernate, ResponseBody},
1514
request_context::RequestContext,
1615
websocket_handle::WebSocketReceiver,
16+
WebSocketHandle,
1717
};
1818
use rivet_runner_protocol as protocol;
1919
use rivet_util::serde::HashableMap;
2020
use std::{sync::Arc, time::Duration};
21-
use tokio::sync::{Mutex, watch};
21+
use tokio::sync::{watch, Mutex};
2222
use tokio_tungstenite::tungstenite::{
23+
protocol::frame::{coding::CloseCode, CloseFrame},
2324
Message,
24-
protocol::frame::{CloseFrame, coding::CloseCode},
2525
};
2626

2727
use crate::shared_state::{InFlightRequestHandle, SharedState};
@@ -578,16 +578,6 @@ impl CustomServeTrait for PegboardGateway {
578578
client_ws: WebSocketHandle,
579579
request_id: RequestId,
580580
) -> Result<HibernationResult> {
581-
// Insert hibernating request entry before checking for pending messages
582-
// This ensures the entry exists even if we immediately rewake the actor
583-
self.ctx
584-
.op(pegboard::ops::actor::hibernating_request::upsert::Input {
585-
actor_id: self.actor_id,
586-
gateway_id: self.shared_state.gateway_id(),
587-
request_id,
588-
})
589-
.await?;
590-
591581
// Immediately rewake if we have pending messages
592582
if self
593583
.shared_state

engine/packages/pegboard-gateway/src/shared_state.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use anyhow::Result;
22
use gas::prelude::*;
33
use pegboard::tunnel::id::{self as tunnel_id, GatewayId, RequestId};
44
use rivet_guard_core::errors::WebSocketServiceTimeout;
5-
use rivet_runner_protocol::{self as protocol, PROTOCOL_VERSION, versioned};
6-
use scc::{HashMap, hash_map::Entry};
5+
use rivet_runner_protocol::{self as protocol, versioned, PROTOCOL_VERSION};
6+
use scc::{hash_map::Entry, HashMap};
77
use std::{
88
ops::Deref,
99
sync::Arc,
@@ -13,7 +13,7 @@ use tokio::sync::{mpsc, watch};
1313
use universalpubsub::{NextOutput, PubSub, PublishOpts, Subscriber};
1414
use vbare::OwnedVersionedData;
1515

16-
use crate::{WebsocketPendingLimitReached, metrics};
16+
use crate::{metrics, WebsocketPendingLimitReached};
1717

1818
const GC_INTERVAL: Duration = Duration::from_secs(15);
1919
const TUNNEL_PING_TIMEOUT: i64 = util::duration::seconds(30);
@@ -512,7 +512,13 @@ impl SharedState {
512512
}
513513
}
514514

515-
if hs.last_ping.elapsed() > hibernation_timeout {
515+
let hs_elapsed = hs.last_ping.elapsed();
516+
tracing::debug!(
517+
hs_elapsed=%hs_elapsed.as_secs_f64(),
518+
timeout=%hibernation_timeout.as_secs_f64(),
519+
"checking hibernating state elapsed time"
520+
);
521+
if hs_elapsed> hibernation_timeout {
516522
break 'reason Some(MsgGcReason::HibernationTimeout);
517523
}
518524
} else if req.msg_tx.is_closed() {

0 commit comments

Comments
 (0)