Skip to content

Commit 7e275b0

Browse files
committed
fix(engine): ensure first keepalive tick is not skipped to prevent timeout on ws hibernation
1 parent bd10c4a commit 7e275b0

File tree

13 files changed

+101
-233
lines changed

13 files changed

+101
-233
lines changed

engine/packages/pegboard-gateway/src/keepalive_task.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use anyhow::Result;
22
use gas::prelude::*;
3+
use pegboard::tunnel::id as tunnel_id;
34
use pegboard::tunnel::id::{GatewayId, RequestId};
45
use rand::Rng;
56
use std::time::Duration;
@@ -12,7 +13,6 @@ use crate::shared_state::SharedState;
1213
/// next actor start.
1314
///
1415
/// Only ran for hibernating requests.
15-
1616
pub async fn task(
1717
shared_state: SharedState,
1818
ctx: StandaloneCtx,
@@ -30,10 +30,6 @@ pub async fn task(
3030
));
3131
ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
3232

33-
// Discard the first tick since it fires immediately and we've already called this
34-
// above
35-
ping_interval.tick().await;
36-
3733
loop {
3834
tokio::select! {
3935
_ = ping_interval.tick() => {}
@@ -46,6 +42,13 @@ pub async fn task(
4642
let jitter = { rand::thread_rng().gen_range(0..128) };
4743
tokio::time::sleep(Duration::from_millis(jitter)).await;
4844

45+
tracing::debug!(
46+
%actor_id,
47+
gateway_id=%tunnel_id::gateway_id_to_string(&gateway_id),
48+
request_id=%tunnel_id::request_id_to_string(&request_id),
49+
"updating hws keepalive"
50+
);
51+
4952
tokio::try_join!(
5053
ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input {
5154
actor_id,

engine/packages/pegboard-gateway/src/lib.rs

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,20 @@ use hyper::{Request, Response, StatusCode};
88
use pegboard::tunnel::id::{self as tunnel_id, RequestId};
99
use rivet_error::*;
1010
use rivet_guard_core::{
11-
WebSocketHandle,
1211
custom_serve::{CustomServeTrait, HibernationResult},
1312
errors::{ServiceUnavailable, WebSocketServiceUnavailable},
14-
proxy_service::{ResponseBody, is_ws_hibernate},
13+
proxy_service::{is_ws_hibernate, ResponseBody},
1514
request_context::RequestContext,
1615
websocket_handle::WebSocketReceiver,
16+
WebSocketHandle,
1717
};
1818
use rivet_runner_protocol as protocol;
1919
use rivet_util::serde::HashableMap;
2020
use std::{sync::Arc, time::Duration};
21-
use tokio::sync::{Mutex, watch};
21+
use tokio::sync::{watch, Mutex};
2222
use tokio_tungstenite::tungstenite::{
23+
protocol::frame::{coding::CloseCode, CloseFrame},
2324
Message,
24-
protocol::frame::{CloseFrame, coding::CloseCode},
2525
};
2626

2727
use crate::shared_state::{InFlightRequestHandle, SharedState};
@@ -578,16 +578,6 @@ impl CustomServeTrait for PegboardGateway {
578578
client_ws: WebSocketHandle,
579579
request_id: RequestId,
580580
) -> Result<HibernationResult> {
581-
// Insert hibernating request entry before checking for pending messages
582-
// This ensures the entry exists even if we immediately rewake the actor
583-
self.ctx
584-
.op(pegboard::ops::actor::hibernating_request::upsert::Input {
585-
actor_id: self.actor_id,
586-
gateway_id: self.shared_state.gateway_id(),
587-
request_id,
588-
})
589-
.await?;
590-
591581
// Immediately rewake if we have pending messages
592582
if self
593583
.shared_state

engine/packages/pegboard-gateway/src/shared_state.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use anyhow::Result;
22
use gas::prelude::*;
33
use pegboard::tunnel::id::{self as tunnel_id, GatewayId, RequestId};
44
use rivet_guard_core::errors::WebSocketServiceTimeout;
5-
use rivet_runner_protocol::{self as protocol, PROTOCOL_VERSION, versioned};
6-
use scc::{HashMap, hash_map::Entry};
5+
use rivet_runner_protocol::{self as protocol, versioned, PROTOCOL_VERSION};
6+
use scc::{hash_map::Entry, HashMap};
77
use std::{
88
ops::Deref,
99
sync::Arc,
@@ -13,7 +13,7 @@ use tokio::sync::{mpsc, watch};
1313
use universalpubsub::{NextOutput, PubSub, PublishOpts, Subscriber};
1414
use vbare::OwnedVersionedData;
1515

16-
use crate::{WebsocketPendingLimitReached, metrics};
16+
use crate::{metrics, WebsocketPendingLimitReached};
1717

1818
const GC_INTERVAL: Duration = Duration::from_secs(15);
1919
const TUNNEL_PING_TIMEOUT: i64 = util::duration::seconds(30);
@@ -512,7 +512,13 @@ impl SharedState {
512512
}
513513
}
514514

515-
if hs.last_ping.elapsed() > hibernation_timeout {
515+
let hs_elapsed = hs.last_ping.elapsed();
516+
tracing::debug!(
517+
hs_elapsed=%hs_elapsed.as_secs_f64(),
518+
timeout=%hibernation_timeout.as_secs_f64(),
519+
"checking hibernating state elapsed time"
520+
);
521+
if hs_elapsed> hibernation_timeout {
516522
break 'reason Some(MsgGcReason::HibernationTimeout);
517523
}
518524
} else if req.msg_tx.is_closed() {

engine/sdks/schemas/runner-protocol/v3.bare

Lines changed: 1 addition & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/sdks/typescript/runner-protocol/src/index.ts

Lines changed: 3 additions & 33 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/sdks/typescript/runner/src/actor.ts

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import type * as protocol from "@rivetkit/engine-runner-protocol";
2+
import { logger } from "./log";
23
import type { PendingRequest } from "./tunnel";
4+
import { arraysEqual, idToStr, promiseWithResolvers } from "./utils";
35
import type { WebSocketTunnelAdapter } from "./websocket-tunnel-adapter";
4-
import { arraysEqual, promiseWithResolvers } from "./utils";
5-
import { logger } from "./log";
6-
import * as tunnelId from "./tunnel-id";
76

87
export interface ActorConfig {
98
name: string;
@@ -72,8 +71,8 @@ export class RunnerActor {
7271
if (exists) {
7372
logger()?.warn({
7473
msg: "attempting to set pending request twice, replacing existing",
75-
gatewayId: tunnelId.gatewayIdToString(gatewayId),
76-
requestId: tunnelId.requestIdToString(requestId),
74+
gatewayId: idToStr(gatewayId),
75+
requestId: idToStr(requestId),
7776
});
7877
// Delete existing pending request before adding the new one
7978
this.deletePendingRequest(gatewayId, requestId);
@@ -92,8 +91,8 @@ export class RunnerActor {
9291
});
9392
logger()?.debug({
9493
msg: "added pending request",
95-
gatewayId: tunnelId.gatewayIdToString(gatewayId),
96-
requestId: tunnelId.requestIdToString(requestId),
94+
gatewayId: idToStr(gatewayId),
95+
requestId: idToStr(requestId),
9796
length: this.pendingRequests.length,
9897
});
9998
}
@@ -109,8 +108,8 @@ export class RunnerActor {
109108
if (exists) {
110109
logger()?.warn({
111110
msg: "attempting to set pending request twice, replacing existing",
112-
gatewayId: tunnelId.gatewayIdToString(gatewayId),
113-
requestId: tunnelId.requestIdToString(requestId),
111+
gatewayId: idToStr(gatewayId),
112+
requestId: idToStr(requestId),
114113
});
115114
// Delete existing pending request before adding the new one
116115
this.deletePendingRequest(gatewayId, requestId);
@@ -130,8 +129,8 @@ export class RunnerActor {
130129
});
131130
logger()?.debug({
132131
msg: "added pending request with stream controller",
133-
gatewayId: tunnelId.gatewayIdToString(gatewayId),
134-
requestId: tunnelId.requestIdToString(requestId),
132+
gatewayId: idToStr(gatewayId),
133+
requestId: idToStr(requestId),
135134
length: this.pendingRequests.length,
136135
});
137136
}
@@ -149,8 +148,8 @@ export class RunnerActor {
149148
this.pendingRequests.splice(index, 1);
150149
logger()?.debug({
151150
msg: "removed pending request",
152-
gatewayId: tunnelId.gatewayIdToString(gatewayId),
153-
requestId: tunnelId.requestIdToString(requestId),
151+
gatewayId: idToStr(gatewayId),
152+
requestId: idToStr(requestId),
154153
length: this.pendingRequests.length,
155154
});
156155
}

engine/sdks/typescript/runner/src/mod.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import {
1313
import { importWebSocket } from "./websocket.js";
1414

1515
export type { HibernatingWebSocketMetadata };
16-
export * as tunnelId from "./tunnel-id";
1716
export { RunnerActor, type ActorConfig };
17+
export { idToStr } from "./utils";
1818

1919
const KV_EXPIRE: number = 30_000;
2020
const PROTOCOL_VERSION: number = 3;

engine/sdks/typescript/runner/src/stringify.ts

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type * as protocol from "@rivetkit/engine-runner-protocol";
2+
import { idToStr } from "./utils";
23

34
/**
45
* Helper function to stringify ArrayBuffer for logging
@@ -24,6 +25,13 @@ function stringifyMap(map: ReadonlyMap<string, string>): string {
2425
return `Map(${map.size}){${entries}}`;
2526
}
2627

28+
/**
29+
* Helper function to stringify MessageId for logging
30+
*/
31+
function stringifyMessageId(messageId: protocol.MessageId): string {
32+
return `MessageId{gatewayId: ${idToStr(messageId.gatewayId)}, requestId: ${idToStr(messageId.requestId)}, messageIndex: ${messageId.messageIndex}}`;
33+
}
34+
2735
/**
2836
* Stringify ToServerTunnelMessageKind for logging
2937
* Handles ArrayBuffers, BigInts, and Maps that can't be JSON.stringified
@@ -111,15 +119,17 @@ export function stringifyToClientTunnelMessageKind(
111119
export function stringifyCommand(command: protocol.Command): string {
112120
switch (command.tag) {
113121
case "CommandStartActor": {
114-
const { actorId, generation, config, hibernatingRequests } = command.val;
122+
const { actorId, generation, config, hibernatingRequests } =
123+
command.val;
115124
const keyStr = config.key === null ? "null" : `"${config.key}"`;
116125
const inputStr =
117126
config.input === null
118127
? "null"
119128
: stringifyArrayBuffer(config.input);
120-
const hibernatingRequestsStr = hibernatingRequests.length > 0
121-
? `[${hibernatingRequests.map((hr) => `{gatewayId: ${stringifyArrayBuffer(hr.gatewayId)}, requestId: ${stringifyArrayBuffer(hr.requestId)}}`).join(", ")}]`
122-
: "[]";
129+
const hibernatingRequestsStr =
130+
hibernatingRequests.length > 0
131+
? `[${hibernatingRequests.map((hr) => `{gatewayId: ${idToStr(hr.gatewayId)}, requestId: ${idToStr(hr.requestId)}}`).join(", ")}]`
132+
: "[]";
123133
return `CommandStartActor{actorId: "${actorId}", generation: ${generation}, config: {name: "${config.name}", key: ${keyStr}, createTs: ${stringifyBigInt(config.createTs)}, input: ${inputStr}}, hibernatingRequests: ${hibernatingRequestsStr}}`;
124134
}
125135
case "CommandStopActor": {
@@ -193,8 +203,14 @@ export function stringifyEventWrapper(wrapper: protocol.EventWrapper): string {
193203
export function stringifyToServer(message: protocol.ToServer): string {
194204
switch (message.tag) {
195205
case "ToServerInit": {
196-
const { name, version, totalSlots, lastCommandIdx, prepopulateActorNames, metadata } =
197-
message.val;
206+
const {
207+
name,
208+
version,
209+
totalSlots,
210+
lastCommandIdx,
211+
prepopulateActorNames,
212+
metadata,
213+
} = message.val;
198214
const lastCommandIdxStr =
199215
lastCommandIdx === null
200216
? "null"
@@ -227,8 +243,7 @@ export function stringifyToServer(message: protocol.ToServer): string {
227243
}
228244
case "ToServerTunnelMessage": {
229245
const { messageId, messageKind } = message.val;
230-
const messageIdStr = stringifyArrayBuffer(messageId);
231-
return `ToServerTunnelMessage{messageId: ${messageIdStr}, messageKind: ${stringifyToServerTunnelMessageKind(messageKind)}}`;
246+
return `ToServerTunnelMessage{messageId: ${stringifyMessageId(messageId)}, messageKind: ${stringifyToServerTunnelMessageKind(messageKind)}}`;
232247
}
233248
}
234249
}
@@ -261,8 +276,7 @@ export function stringifyToClient(message: protocol.ToClient): string {
261276
}
262277
case "ToClientTunnelMessage": {
263278
const { messageId, messageKind } = message.val;
264-
const messageIdStr = stringifyArrayBuffer(messageId);
265-
return `ToClientTunnelMessage{messageId: ${messageIdStr}, messageKind: ${stringifyToClientTunnelMessageKind(messageKind)}}`;
279+
return `ToClientTunnelMessage{messageId: ${stringifyMessageId(messageId)}, messageKind: ${stringifyToClientTunnelMessageKind(messageKind)}}`;
266280
}
267281
}
268282
}

0 commit comments

Comments
 (0)