Skip to content

Commit 193f67a

Browse files
committed
chore(rivetkit): fix hibernation implementation
1 parent d287067 commit 193f67a

File tree

17 files changed

+437
-237
lines changed

17 files changed

+437
-237
lines changed

engine/packages/pegboard-gateway/src/lib.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,23 @@ use pegboard::tunnel::id::{self as tunnel_id, RequestId};
99
use rand::Rng;
1010
use rivet_error::*;
1111
use rivet_guard_core::{
12-
WebSocketHandle,
1312
custom_serve::{CustomServeTrait, HibernationResult},
1413
errors::{ServiceUnavailable, WebSocketServiceUnavailable},
15-
proxy_service::{ResponseBody, is_ws_hibernate},
14+
proxy_service::{is_ws_hibernate, ResponseBody},
1615
request_context::RequestContext,
1716
websocket_handle::WebSocketReceiver,
17+
WebSocketHandle,
1818
};
1919
use rivet_runner_protocol as protocol;
2020
use rivet_util::serde::HashableMap;
2121
use std::{sync::Arc, time::Duration};
2222
use tokio::{
23-
sync::{Mutex, watch},
23+
sync::{watch, Mutex},
2424
task::JoinHandle,
2525
};
2626
use tokio_tungstenite::tungstenite::{
27+
protocol::frame::{coding::CloseCode, CloseFrame},
2728
Message,
28-
protocol::frame::{CloseFrame, coding::CloseCode},
2929
};
3030

3131
use crate::shared_state::{InFlightRequestHandle, SharedState};
@@ -486,7 +486,7 @@ impl CustomServeTrait for PegboardGateway {
486486
};
487487

488488
// Send close frame to runner if not hibernating
489-
if lifecycle_res
489+
if !&lifecycle_res
490490
.as_ref()
491491
.map_or_else(is_ws_hibernate, |_| false)
492492
{

engine/packages/pegboard-gateway/src/shared_state.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ impl SharedState {
124124
entry.msg_tx = msg_tx;
125125
entry.drop_tx = drop_tx;
126126
entry.opened = false;
127-
entry.message_index = 0;
128127

129128
if entry.stopping {
130129
entry.hibernation_state = None;
@@ -194,6 +193,11 @@ impl SharedState {
194193
};
195194

196195
hs.pending_ws_msgs.push(pending_ws_msg);
196+
tracing::debug!(
197+
index=current_message_index,
198+
new_count=hs.pending_ws_msgs.len(),
199+
"pushed pending websocket message"
200+
);
197201
}
198202

199203
self.ups
@@ -391,7 +395,6 @@ impl SharedState {
391395

392396
let len_after = hs.pending_ws_msgs.len();
393397
tracing::debug!(
394-
request_id=?tunnel_id::request_id_to_string(&request_id),
395398
ack_index,
396399
removed_count = len_before - len_after,
397400
remaining_count = len_after,

engine/sdks/typescript/runner/src/actor.ts

Lines changed: 101 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import type * as protocol from "@rivetkit/engine-runner-protocol";
22
import type { PendingRequest } from "./tunnel";
33
import type { WebSocketTunnelAdapter } from "./websocket-tunnel-adapter";
4-
import { arraysEqual } from "./utils";
4+
import { arraysEqual, promiseWithResolvers } from "./utils";
5+
import { logger } from "./log";
6+
import * as tunnelId from "./tunnel-id";
57

68
export interface ActorConfig {
79
name: string;
@@ -24,11 +26,28 @@ export class RunnerActor {
2426
requestId: protocol.RequestId;
2527
ws: WebSocketTunnelAdapter;
2628
}> = [];
29+
actorStartPromise: ReturnType<typeof promiseWithResolvers<void>>;
2730

28-
constructor(actorId: string, generation: number, config: ActorConfig) {
31+
/**
32+
* If restoreHibernatingRequests has been called. This is used to assert
33+
* that the caller is implemented correctly.
34+
**/
35+
hibernationRestored: boolean = false;
36+
37+
constructor(
38+
actorId: string,
39+
generation: number,
40+
config: ActorConfig,
41+
/**
42+
* List of hibernating requests provided by the gateway on actor start.
43+
* This represents the WebSocket connections that the gateway knows about.
44+
**/
45+
public hibernatingRequests: readonly protocol.HibernatingRequest[],
46+
) {
2947
this.actorId = actorId;
3048
this.generation = generation;
3149
this.config = config;
50+
this.actorStartPromise = promiseWithResolvers();
3251
}
3352

3453
// Pending request methods
@@ -43,13 +62,78 @@ export class RunnerActor {
4362
)?.request;
4463
}
4564

46-
setPendingRequest(
65+
createPendingRequest(
66+
gatewayId: protocol.GatewayId,
67+
requestId: protocol.RequestId,
68+
clientMessageIndex: number,
69+
) {
70+
const exists =
71+
this.getPendingRequest(gatewayId, requestId) !== undefined;
72+
if (exists) {
73+
logger()?.warn({
74+
msg: "attempting to set pending request twice, replacing existing",
75+
gatewayId: tunnelId.gatewayIdToString(gatewayId),
76+
requestId: tunnelId.requestIdToString(requestId),
77+
});
78+
// Delete existing pending request before adding the new one
79+
this.deletePendingRequest(gatewayId, requestId);
80+
}
81+
this.pendingRequests.push({
82+
gatewayId,
83+
requestId,
84+
request: {
85+
resolve: () => {},
86+
reject: () => {},
87+
actorId: this.actorId,
88+
gatewayId: gatewayId,
89+
requestId: requestId,
90+
clientMessageIndex,
91+
},
92+
});
93+
logger()?.debug({
94+
msg: "added pending request",
95+
gatewayId: tunnelId.gatewayIdToString(gatewayId),
96+
requestId: tunnelId.requestIdToString(requestId),
97+
length: this.pendingRequests.length,
98+
});
99+
}
100+
101+
createPendingRequestWithStreamController(
47102
gatewayId: protocol.GatewayId,
48103
requestId: protocol.RequestId,
49-
request: PendingRequest,
104+
clientMessageIndex: number,
105+
streamController: ReadableStreamDefaultController<Uint8Array>,
50106
) {
51-
this.deletePendingRequest(gatewayId, requestId);
52-
this.pendingRequests.push({ gatewayId, requestId, request });
107+
const exists =
108+
this.getPendingRequest(gatewayId, requestId) !== undefined;
109+
if (exists) {
110+
logger()?.warn({
111+
msg: "attempting to set pending request twice, replacing existing",
112+
gatewayId: tunnelId.gatewayIdToString(gatewayId),
113+
requestId: tunnelId.requestIdToString(requestId),
114+
});
115+
// Delete existing pending request before adding the new one
116+
this.deletePendingRequest(gatewayId, requestId);
117+
}
118+
this.pendingRequests.push({
119+
gatewayId,
120+
requestId,
121+
request: {
122+
resolve: () => {},
123+
reject: () => {},
124+
actorId: this.actorId,
125+
gatewayId: gatewayId,
126+
requestId: requestId,
127+
clientMessageIndex,
128+
streamController,
129+
},
130+
});
131+
logger()?.debug({
132+
msg: "added pending request with stream controller",
133+
gatewayId: tunnelId.gatewayIdToString(gatewayId),
134+
requestId: tunnelId.requestIdToString(requestId),
135+
length: this.pendingRequests.length,
136+
});
53137
}
54138

55139
deletePendingRequest(
@@ -63,6 +147,12 @@ export class RunnerActor {
63147
);
64148
if (index !== -1) {
65149
this.pendingRequests.splice(index, 1);
150+
logger()?.debug({
151+
msg: "removed pending request",
152+
gatewayId: tunnelId.gatewayIdToString(gatewayId),
153+
requestId: tunnelId.requestIdToString(requestId),
154+
length: this.pendingRequests.length,
155+
});
66156
}
67157
}
68158

@@ -83,7 +173,11 @@ export class RunnerActor {
83173
requestId: protocol.RequestId,
84174
ws: WebSocketTunnelAdapter,
85175
) {
86-
this.deleteWebSocket(gatewayId, requestId);
176+
const exists = this.getWebSocket(gatewayId, requestId) !== undefined;
177+
if (exists) {
178+
logger()?.warn({ msg: "attempting to set websocket twice" });
179+
return;
180+
}
87181
this.webSockets.push({ gatewayId, requestId, ws });
88182
}
89183

0 commit comments

Comments
 (0)