Skip to content

Commit 2a24121

Browse files
committed
chore(rivetkit): buffer outbound tunnel messages (#3509)
1 parent dbcce0a commit 2a24121

File tree

2 files changed

+53
-35
lines changed

2 files changed

+53
-35
lines changed

engine/sdks/typescript/runner/src/mod.ts

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ export class Runner {
195195
#actors: Map<string, RunnerActor> = new Map();
196196

197197
// WebSocket
198-
#pegboardWebSocket?: WebSocket;
198+
__pegboardWebSocket?: WebSocket;
199199
runnerId?: string;
200200
#lastCommandIdx: number = -1;
201201
#pingLoop?: NodeJS.Timeout;
@@ -504,11 +504,8 @@ export class Runner {
504504
this.#kvRequests.clear();
505505

506506
// Close WebSocket
507-
if (
508-
this.#pegboardWebSocket &&
509-
this.#pegboardWebSocket.readyState === 1
510-
) {
511-
const pegboardWebSocket = this.#pegboardWebSocket;
507+
if (this.__webSocketReady()) {
508+
const pegboardWebSocket = this.__pegboardWebSocket;
512509
if (immediate) {
513510
// Stop immediately
514511
pegboardWebSocket.close(1000, "pegboard.runner_shutdown");
@@ -569,7 +566,7 @@ export class Runner {
569566
// the runner has already shut down
570567
this.log?.debug({
571568
msg: "no runner WebSocket to shutdown or already closed",
572-
readyState: this.#pegboardWebSocket?.readyState,
569+
readyState: this.__pegboardWebSocket?.readyState,
573570
});
574571
}
575572

@@ -695,7 +692,7 @@ export class Runner {
695692

696693
const WS = await importWebSocket();
697694
const ws = new WS(this.pegboardUrl, protocols) as any as WebSocket;
698-
this.#pegboardWebSocket = ws;
695+
this.__pegboardWebSocket = ws;
699696

700697
this.log?.info({
701698
msg: "connecting",
@@ -761,9 +758,6 @@ export class Runner {
761758
val: init,
762759
});
763760

764-
// Process unsent KV requests
765-
this.#processUnsentKvRequests();
766-
767761
// Start ping interval
768762
const pingLoop = setInterval(() => {
769763
if (ws.readyState === 1) {
@@ -836,8 +830,10 @@ export class Runner {
836830
runnerLostThreshold: this.#runnerLostThreshold,
837831
});
838832

839-
// Resend events that haven't been acknowledged
833+
// Resend pending events
834+
this.#processUnsentKvRequests();
840835
this.#resendUnacknowledgedEvents(init.lastEventIdx);
836+
this.#tunnel?.resendBufferedEvents();
841837

842838
this.#config.onConnected();
843839
} else if (message.tag === "ToClientCommands") {
@@ -1531,9 +1527,6 @@ export class Runner {
15311527
): Promise<any> {
15321528
return new Promise((resolve, reject) => {
15331529
const requestId = this.#nextKvRequestId++;
1534-
const isConnected =
1535-
this.#pegboardWebSocket &&
1536-
this.#pegboardWebSocket.readyState === 1;
15371530

15381531
// Store the request
15391532
const requestEntry = {
@@ -1547,7 +1540,7 @@ export class Runner {
15471540

15481541
this.#kvRequests.set(requestId, requestEntry);
15491542

1550-
if (isConnected) {
1543+
if (this.__webSocketReady()) {
15511544
// Send immediately
15521545
this.#sendSingleKvRequest(requestId);
15531546
}
@@ -1580,10 +1573,7 @@ export class Runner {
15801573
}
15811574

15821575
#processUnsentKvRequests() {
1583-
if (
1584-
!this.#pegboardWebSocket ||
1585-
this.#pegboardWebSocket.readyState !== 1
1586-
) {
1576+
if (!this.__webSocketReady()) {
15871577
return;
15881578
}
15891579

@@ -1600,10 +1590,14 @@ export class Runner {
16001590
}
16011591
}
16021592

1603-
__webSocketReady(): boolean {
1604-
return this.#pegboardWebSocket
1605-
? this.#pegboardWebSocket.readyState === 1
1606-
: false;
1593+
/** Asserts WebSocket exists and is ready. */
1594+
__webSocketReady(): this is this & {
1595+
__pegboardWebSocket: NonNullable<Runner["__pegboardWebSocket"]>;
1596+
} {
1597+
return (
1598+
!!this.__pegboardWebSocket &&
1599+
this.__pegboardWebSocket.readyState === 1
1600+
);
16071601
}
16081602

16091603
__sendToServer(message: protocol.ToServer) {
@@ -1613,11 +1607,8 @@ export class Runner {
16131607
});
16141608

16151609
const encoded = protocol.encodeToServer(message);
1616-
if (
1617-
this.#pegboardWebSocket &&
1618-
this.#pegboardWebSocket.readyState === 1
1619-
) {
1620-
this.#pegboardWebSocket.send(encoded);
1610+
if (this.__webSocketReady()) {
1611+
this.__pegboardWebSocket.send(encoded);
16211612
} else {
16221613
this.log?.error({
16231614
msg: "WebSocket not available or not open for sending data",
@@ -1726,9 +1717,10 @@ export class Runner {
17261717

17271718
if (eventsToResend.length === 0) return;
17281719

1729-
//this.#log?.log(
1730-
// `Resending ${eventsToResend.length} unacknowledged events from index ${Number(lastEventIdx) + 1}`,
1731-
//);
1720+
this.log?.info({
1721+
msg: "resending unacknowledged events",
1722+
fromIndex: lastEventIdx + 1n,
1723+
});
17321724

17331725
// Resend events in batches
17341726
this.__sendToServer({

engine/sdks/typescript/runner/src/tunnel.ts

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ export class Tunnel {
5757
actorId: string;
5858
}> = [];
5959

60+
/** Buffer for messages when not connected */
61+
#bufferedMessages: Array<{
62+
gatewayId: GatewayId;
63+
requestId: RequestId;
64+
messageKind: protocol.ToServerTunnelMessageKind;
65+
}> = [];
66+
6067
get log(): Logger | undefined {
6168
return this.#runner.log;
6269
}
@@ -69,6 +76,24 @@ export class Tunnel {
6976
// No-op - kept for compatibility
7077
}
7178

79+
resendBufferedEvents(): void {
80+
if (this.#bufferedMessages.length === 0) {
81+
return;
82+
}
83+
84+
this.log?.info({
85+
msg: "resending buffered tunnel messages",
86+
count: this.#bufferedMessages.length,
87+
});
88+
89+
const messages = this.#bufferedMessages;
90+
this.#bufferedMessages = [];
91+
92+
for (const { gatewayId, requestId, messageKind } of messages) {
93+
this.#sendMessage(gatewayId, requestId, messageKind);
94+
}
95+
}
96+
7297
shutdown() {
7398
// NOTE: Pegboard WS already closed at this point, cannot send
7499
// anything. All teardown logic is handled by pegboard-runner.
@@ -458,13 +483,14 @@ export class Tunnel {
458483
requestId: RequestId,
459484
messageKind: protocol.ToServerTunnelMessageKind,
460485
) {
461-
// TODO: Switch this with runner WS
486+
// Buffer message if not connected
462487
if (!this.#runner.__webSocketReady()) {
463-
this.log?.warn({
464-
msg: "cannot send tunnel message, socket not connected to engine. tunnel data dropped.",
488+
this.log?.debug({
489+
msg: "buffering tunnel message, socket not connected to engine",
465490
requestId: idToStr(requestId),
466491
message: stringifyToServerTunnelMessageKind(messageKind),
467492
});
493+
this.#bufferedMessages.push({ gatewayId, requestId, messageKind });
468494
return;
469495
}
470496

0 commit comments

Comments
 (0)