Skip to content

Commit afc0cf5

Browse files
committed
chore(rivetkit): implement new hibernating ws protocol
1 parent 450d9e5 commit afc0cf5

File tree

26 files changed

+2397
-2149
lines changed

26 files changed

+2397
-2149
lines changed

engine/sdks/typescript/runner-protocol/src/index.ts

Lines changed: 1 addition & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/sdks/typescript/runner/src/mod.ts

Lines changed: 139 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ import type { Logger } from "pino";
33
import type WebSocket from "ws";
44
import { logger, setLogger } from "./log.js";
55
import { stringifyCommandWrapper, stringifyEvent } from "./stringify";
6-
import { Tunnel } from "./tunnel";
6+
import { HibernatingWebSocketMetadata, Tunnel } from "./tunnel";
77
import {
88
calculateBackoff,
99
parseWebSocketCloseReason,
1010
unreachable,
1111
} from "./utils";
1212
import { importWebSocket } from "./websocket.js";
13-
import type { WebSocketTunnelAdapter } from "./websocket-tunnel-adapter";
13+
14+
export { HibernatingWebSocketMetadata } from "./tunnel";
1415

1516
const KV_EXPIRE: number = 30_000;
1617
const PROTOCOL_VERSION: number = 3;
@@ -51,38 +52,151 @@ export interface RunnerConfig {
5152
onConnected: () => void;
5253
onDisconnected: (code: number, reason: string) => void;
5354
onShutdown: () => void;
55+
56+
/** Called when receiving a network request. */
5457
fetch: (
5558
runner: Runner,
5659
actorId: string,
5760
requestId: protocol.RequestId,
5861
request: Request,
5962
) => Promise<Response>;
60-
websocket?: (
63+
64+
/**
65+
* Called when receiving a WebSocket connection.
66+
*
67+
* All event listeners must be added synchronously inside this function or
68+
* else events may be missed. The open event will fire immediately after
69+
* this function finishes.
70+
*
71+
* Any errors thrown here will disconnect the WebSocket immediately.
72+
*
73+
* ## Hibernating Web Sockets
74+
*
75+
* ### Implementation Requirements
76+
*
77+
* **Requirement 1: Persist HWS Immediately**
78+
*
79+
* This is responsible for persisting hibernatable WebSockets immediately
80+
* (do not wait for open event). It is not time sensitive to flush the
81+
* connection state. If this fails to persist the HWS, the client's
82+
* WebSocket will be disconnected on next wake in
83+
* `Tunnel::restoreHibernatingRequests` since the connection entry will not
84+
* exist.
85+
*
86+
* **Requirement 2: Persist Message Index On `message`**
87+
*
88+
* In the `message` event listener, this handler must persist the message
89+
* index from the event. The request ID is available at
90+
* `event.rivetRequestId` and message index at `event.rivetMessageIndex`.
91+
*
92+
* The message index should not be flushed immediately. Instead, this
93+
* should:
94+
*
95+
* - Debounce calls to persist the message index
96+
* - After each persist, call
97+
* `Runner::sendHibernatableWebSocketMessageAck` to acknowledge the
98+
* message
99+
*
100+
* This mechanism allows us to buffer messages on the gateway so we can
101+
* batch-persist events on our end on a given interval.
102+
*
103+
* If this fails to persist, then the gateway will replay unacked
104+
* messages when the actor starts again.
105+
*
106+
* **Requirement 3: Remove HWS From Storage On `close`**
107+
*
108+
* This handler should add an event listener for `close` to remove the
109+
* connection from storage.
110+
*
111+
* If the connection remove fails to persist, the close event will be
112+
* called again on the next actor start in
113+
* `Tunnel::restoreHibernatingRequests` since there will be no request for
114+
* the given connection.
115+
*
116+
* ### Restoring Connections
117+
*
118+
* `loadAll` will be called from `Tunnel::restoreHibernatingRequests` to
119+
* restore this connection on the next actor wake.
120+
*
121+
* `restoreHibernatingRequests` is responsible for both making sure that
122+
* new connections are registered with the actor and zombie connections are
123+
* appropriately cleaned up.
124+
*
125+
* ### No Open Event On Restoration
126+
*
127+
* When restoring a HWS, the open event will not be called again. It will
128+
* go straight to the message or close event.
129+
*/
130+
websocket: (
61131
runner: Runner,
62132
actorId: string,
63133
ws: any,
64134
requestId: protocol.RequestId,
65135
request: Request,
66-
) => Promise<void>;
136+
) => void;
137+
138+
hibernatableWebSocket: {
139+
/**
140+
* Determines if a WebSocket can continue to live while an actor goes to
141+
* sleep.
142+
*/
143+
canHibernate: (
144+
actorId: string,
145+
requestId: ArrayBuffer,
146+
request: Request,
147+
) => boolean;
148+
149+
/**
150+
* Returns all hibernatable WebSockets that are stored for this actor.
151+
*
152+
* This is called on actor start.
153+
*
154+
* This list will be diffed with the list of hibernating requests in
155+
* the ActorStart message.
156+
*
157+
* This that are connected but not loaded (i.e. were not successfully
158+
* persisted to this actor) will be disconnected.
159+
*
160+
* This that are not connected but were loaded (i.e. disconnected but
161+
* this actor has not received the event yet) will also be
162+
* disconnected.
163+
*/
164+
loadAll(actorId: string): HibernatingWebSocketMetadata[];
165+
166+
/**
167+
* Notify the HWS message index needs to be persisted in the background.
168+
*
169+
* The message index should not be flushed immediately. Instead, this
170+
* should:
171+
*
172+
* - Debounce calls to persist the message index
173+
* - After each persist, call
174+
* `Runner::sendHibernatableWebSocketMessageAck` to acknowledge the
175+
* message
176+
*
177+
* This mechanism allows us to buffer messages on the gateway so we can
178+
* batch-persist events on our end on a given interval.
179+
*
180+
* If this fails to persist, then the gateway will replay unacked
181+
* messages when the actor starts again.
182+
*/
183+
persistMessageIndex: (
184+
actorId: string,
185+
requestId: protocol.RequestId,
186+
messageIndex: number,
187+
) => void;
188+
};
189+
67190
onActorStart: (
68191
actorId: string,
69192
generation: number,
70193
config: ActorConfig,
71194
) => Promise<void>;
195+
72196
onActorStop: (actorId: string, generation: number) => Promise<void>;
73-
getActorHibernationConfig: (
74-
actorId: string,
75-
requestId: ArrayBuffer,
76-
request: Request,
77-
) => HibernationConfig;
78197
noAutoShutdown?: boolean;
79198
}
80199

81-
export interface HibernationConfig {
82-
enabled: boolean;
83-
lastMsgIndex: number | undefined;
84-
}
85-
86200
export interface KvListOptions {
87201
reverse?: boolean;
88202
limit?: number;
@@ -105,7 +219,6 @@ export class Runner {
105219
}
106220

107221
#actors: Map<string, ActorInstance> = new Map();
108-
#actorWebSockets: Map<string, Set<WebSocketTunnelAdapter>> = new Map();
109222

110223
// WebSocket
111224
#pegboardWebSocket?: WebSocket;
@@ -809,6 +922,8 @@ export class Runner {
809922
}
810923

811924
#handleCommandStartActor(commandWrapper: protocol.CommandWrapper) {
925+
if (!this.#tunnel) throw new Error("missing tunnel on actor start");
926+
812927
const startCommand = commandWrapper.inner
813928
.val as protocol.CommandStartActor;
814929

@@ -850,6 +965,11 @@ export class Runner {
850965
// Send stopped state update if start failed
851966
this.forceStopActor(actorId, generation);
852967
});
968+
969+
this.#tunnel.restoreHibernatingRequests(
970+
actorId,
971+
startCommand.hibernatingRequestIds,
972+
);
853973
}
854974

855975
#handleCommandStopActor(commandWrapper: protocol.CommandWrapper) {
@@ -1427,8 +1547,10 @@ export class Runner {
14271547
}
14281548
}
14291549

1430-
sendWebsocketMessageAck(requestId: ArrayBuffer, index: number) {
1431-
this.#tunnel?.__ackWebsocketMessage(requestId, index);
1550+
sendHibernatableWebSocketMessageAck(requestId: ArrayBuffer, index: number) {
1551+
if (!this.#tunnel)
1552+
throw new Error("missing tunnel to send message ack");
1553+
this.#tunnel.sendHibernatableWebSocketMessageAck(requestId, index);
14321554
}
14331555

14341556
getServerlessInitPacket(): string | undefined {

engine/sdks/typescript/runner/src/stringify.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ export function stringifyToServerTunnelMessageKind(
4646
case "ToServerResponseAbort":
4747
return "ToServerResponseAbort";
4848
case "ToServerWebSocketOpen": {
49-
const { canHibernate, lastMsgIndex } = kind.val;
50-
return `ToServerWebSocketOpen{canHibernate: ${canHibernate}, lastMsgIndex: ${stringifyBigInt(lastMsgIndex)}}`;
49+
const { canHibernate } = kind.val;
50+
return `ToServerWebSocketOpen{canHibernate: ${canHibernate}}`;
5151
}
5252
case "ToServerWebSocketMessage": {
5353
const { data, binary } = kind.val;

0 commit comments

Comments
 (0)