From 8e1638dc121349684dc2b90ffffd72f43cc45406 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Thu, 20 Nov 2025 16:48:45 -0800 Subject: [PATCH] chore(rivetkit): buffer outbound tunnel messages --- engine/sdks/typescript/runner/src/mod.ts | 56 +++++++++------------ engine/sdks/typescript/runner/src/tunnel.ts | 32 ++++++++++-- 2 files changed, 53 insertions(+), 35 deletions(-) diff --git a/engine/sdks/typescript/runner/src/mod.ts b/engine/sdks/typescript/runner/src/mod.ts index f91d3f70e0..9f41be6ad8 100644 --- a/engine/sdks/typescript/runner/src/mod.ts +++ b/engine/sdks/typescript/runner/src/mod.ts @@ -195,7 +195,7 @@ export class Runner { #actors: Map = new Map(); // WebSocket - #pegboardWebSocket?: WebSocket; + __pegboardWebSocket?: WebSocket; runnerId?: string; #lastCommandIdx: number = -1; #pingLoop?: NodeJS.Timeout; @@ -504,11 +504,8 @@ export class Runner { this.#kvRequests.clear(); // Close WebSocket - if ( - this.#pegboardWebSocket && - this.#pegboardWebSocket.readyState === 1 - ) { - const pegboardWebSocket = this.#pegboardWebSocket; + if (this.__webSocketReady()) { + const pegboardWebSocket = this.__pegboardWebSocket; if (immediate) { // Stop immediately pegboardWebSocket.close(1000, "pegboard.runner_shutdown"); @@ -569,7 +566,7 @@ export class Runner { // the runner has already shut down this.log?.debug({ msg: "no runner WebSocket to shutdown or already closed", - readyState: this.#pegboardWebSocket?.readyState, + readyState: this.__pegboardWebSocket?.readyState, }); } @@ -695,7 +692,7 @@ export class Runner { const WS = await importWebSocket(); const ws = new WS(this.pegboardUrl, protocols) as any as WebSocket; - this.#pegboardWebSocket = ws; + this.__pegboardWebSocket = ws; this.log?.info({ msg: "connecting", @@ -761,9 +758,6 @@ export class Runner { val: init, }); - // Process unsent KV requests - this.#processUnsentKvRequests(); - // Start ping interval const pingLoop = setInterval(() => { if (ws.readyState === 1) { @@ -836,8 +830,10 @@ export class Runner { runnerLostThreshold: this.#runnerLostThreshold, }); - // Resend events that haven't been acknowledged + // Resend pending events + this.#processUnsentKvRequests(); this.#resendUnacknowledgedEvents(init.lastEventIdx); + this.#tunnel?.resendBufferedEvents(); this.#config.onConnected(); } else if (message.tag === "ToClientCommands") { @@ -1531,9 +1527,6 @@ export class Runner { ): Promise { return new Promise((resolve, reject) => { const requestId = this.#nextKvRequestId++; - const isConnected = - this.#pegboardWebSocket && - this.#pegboardWebSocket.readyState === 1; // Store the request const requestEntry = { @@ -1547,7 +1540,7 @@ export class Runner { this.#kvRequests.set(requestId, requestEntry); - if (isConnected) { + if (this.__webSocketReady()) { // Send immediately this.#sendSingleKvRequest(requestId); } @@ -1580,10 +1573,7 @@ export class Runner { } #processUnsentKvRequests() { - if ( - !this.#pegboardWebSocket || - this.#pegboardWebSocket.readyState !== 1 - ) { + if (!this.__webSocketReady()) { return; } @@ -1600,10 +1590,14 @@ export class Runner { } } - __webSocketReady(): boolean { - return this.#pegboardWebSocket - ? this.#pegboardWebSocket.readyState === 1 - : false; + /** Asserts WebSocket exists and is ready. */ + __webSocketReady(): this is this & { + __pegboardWebSocket: NonNullable; + } { + return ( + !!this.__pegboardWebSocket && + this.__pegboardWebSocket.readyState === 1 + ); } __sendToServer(message: protocol.ToServer) { @@ -1613,11 +1607,8 @@ export class Runner { }); const encoded = protocol.encodeToServer(message); - if ( - this.#pegboardWebSocket && - this.#pegboardWebSocket.readyState === 1 - ) { - this.#pegboardWebSocket.send(encoded); + if (this.__webSocketReady()) { + this.__pegboardWebSocket.send(encoded); } else { this.log?.error({ msg: "WebSocket not available or not open for sending data", @@ -1726,9 +1717,10 @@ export class Runner { if (eventsToResend.length === 0) return; - //this.#log?.log( - // `Resending ${eventsToResend.length} unacknowledged events from index ${Number(lastEventIdx) + 1}`, - //); + this.log?.info({ + msg: "resending unacknowledged events", + fromIndex: lastEventIdx + 1n, + }); // Resend events in batches this.__sendToServer({ diff --git a/engine/sdks/typescript/runner/src/tunnel.ts b/engine/sdks/typescript/runner/src/tunnel.ts index 24c2692ed2..7c67dd3c13 100644 --- a/engine/sdks/typescript/runner/src/tunnel.ts +++ b/engine/sdks/typescript/runner/src/tunnel.ts @@ -57,6 +57,13 @@ export class Tunnel { actorId: string; }> = []; + /** Buffer for messages when not connected */ + #bufferedMessages: Array<{ + gatewayId: GatewayId; + requestId: RequestId; + messageKind: protocol.ToServerTunnelMessageKind; + }> = []; + get log(): Logger | undefined { return this.#runner.log; } @@ -69,6 +76,24 @@ export class Tunnel { // No-op - kept for compatibility } + resendBufferedEvents(): void { + if (this.#bufferedMessages.length === 0) { + return; + } + + this.log?.info({ + msg: "resending buffered tunnel messages", + count: this.#bufferedMessages.length, + }); + + const messages = this.#bufferedMessages; + this.#bufferedMessages = []; + + for (const { gatewayId, requestId, messageKind } of messages) { + this.#sendMessage(gatewayId, requestId, messageKind); + } + } + shutdown() { // NOTE: Pegboard WS already closed at this point, cannot send // anything. All teardown logic is handled by pegboard-runner. @@ -458,13 +483,14 @@ export class Tunnel { requestId: RequestId, messageKind: protocol.ToServerTunnelMessageKind, ) { - // TODO: Switch this with runner WS + // Buffer message if not connected if (!this.#runner.__webSocketReady()) { - this.log?.warn({ - msg: "cannot send tunnel message, socket not connected to engine. tunnel data dropped.", + this.log?.debug({ + msg: "buffering tunnel message, socket not connected to engine", requestId: idToStr(requestId), message: stringifyToServerTunnelMessageKind(messageKind), }); + this.#bufferedMessages.push({ gatewayId, requestId, messageKind }); return; }