Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 24 additions & 32 deletions engine/sdks/typescript/runner/src/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,11 @@
#actors: Map<string, RunnerActor> = new Map();

// WebSocket
#pegboardWebSocket?: WebSocket;
__pegboardWebSocket?: WebSocket;
runnerId?: string;
#lastCommandIdx: number = -1;
#pingLoop?: NodeJS.Timeout;
#nextEventIdx: bigint = 0n;

Check warning on line 202 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedPrivateClassMembers

This private class member is defined but never used.
#started: boolean = false;
#shutdown: boolean = false;
#shuttingDown: boolean = false;
Expand Down Expand Up @@ -504,11 +504,8 @@
this.#kvRequests.clear();

// Close WebSocket
if (
this.#pegboardWebSocket &&
this.#pegboardWebSocket.readyState === 1
) {
const pegboardWebSocket = this.#pegboardWebSocket;
if (this.__webSocketReady()) {
const pegboardWebSocket = this.__pegboardWebSocket;
if (immediate) {
// Stop immediately
pegboardWebSocket.close(1000, "pegboard.runner_shutdown");
Expand Down Expand Up @@ -569,7 +566,7 @@
// the runner has already shut down
this.log?.debug({
msg: "no runner WebSocket to shutdown or already closed",
readyState: this.#pegboardWebSocket?.readyState,
readyState: this.__pegboardWebSocket?.readyState,
});
}

Expand Down Expand Up @@ -695,7 +692,7 @@

const WS = await importWebSocket();
const ws = new WS(this.pegboardUrl, protocols) as any as WebSocket;
this.#pegboardWebSocket = ws;
this.__pegboardWebSocket = ws;

this.log?.info({
msg: "connecting",
Expand Down Expand Up @@ -761,9 +758,6 @@
val: init,
});

// Process unsent KV requests
this.#processUnsentKvRequests();

// Start ping interval
const pingLoop = setInterval(() => {
if (ws.readyState === 1) {
Expand Down Expand Up @@ -836,8 +830,10 @@
runnerLostThreshold: this.#runnerLostThreshold,
});

// Resend events that haven't been acknowledged
// Resend pending events
this.#processUnsentKvRequests();
this.#resendUnacknowledgedEvents(init.lastEventIdx);
this.#tunnel?.resendBufferedEvents();

this.#config.onConnected();
} else if (message.tag === "ToClientCommands") {
Expand Down Expand Up @@ -1531,9 +1527,6 @@
): Promise<any> {
return new Promise((resolve, reject) => {
const requestId = this.#nextKvRequestId++;
const isConnected =
this.#pegboardWebSocket &&
this.#pegboardWebSocket.readyState === 1;

// Store the request
const requestEntry = {
Expand All @@ -1547,7 +1540,7 @@

this.#kvRequests.set(requestId, requestEntry);

if (isConnected) {
if (this.__webSocketReady()) {
// Send immediately
this.#sendSingleKvRequest(requestId);
}
Expand Down Expand Up @@ -1580,10 +1573,7 @@
}

#processUnsentKvRequests() {
if (
!this.#pegboardWebSocket ||
this.#pegboardWebSocket.readyState !== 1
) {
if (!this.__webSocketReady()) {
return;
}

Expand All @@ -1600,10 +1590,14 @@
}
}

__webSocketReady(): boolean {
return this.#pegboardWebSocket
? this.#pegboardWebSocket.readyState === 1
: false;
/** Asserts WebSocket exists and is ready. */
__webSocketReady(): this is this & {
__pegboardWebSocket: NonNullable<Runner["__pegboardWebSocket"]>;
} {
return (
!!this.__pegboardWebSocket &&
this.__pegboardWebSocket.readyState === 1
);
}

__sendToServer(message: protocol.ToServer) {
Expand All @@ -1613,11 +1607,8 @@
});

const encoded = protocol.encodeToServer(message);
if (
this.#pegboardWebSocket &&
this.#pegboardWebSocket.readyState === 1
) {
this.#pegboardWebSocket.send(encoded);
if (this.__webSocketReady()) {
this.__pegboardWebSocket.send(encoded);
} else {
this.log?.error({
msg: "WebSocket not available or not open for sending data",
Expand Down Expand Up @@ -1726,9 +1717,10 @@

if (eventsToResend.length === 0) return;

//this.#log?.log(
// `Resending ${eventsToResend.length} unacknowledged events from index ${Number(lastEventIdx) + 1}`,
//);
this.log?.info({
msg: "resending unacknowledged events",
fromIndex: lastEventIdx + 1n,
});

// Resend events in batches
this.__sendToServer({
Expand Down
32 changes: 29 additions & 3 deletions engine/sdks/typescript/runner/src/tunnel.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import type * as protocol from "@rivetkit/engine-runner-protocol";
import type {
GatewayId,
MessageId,

Check warning on line 4 in engine/sdks/typescript/runner/src/tunnel.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedImports

Several of these imports are unused.
RequestId,
} from "@rivetkit/engine-runner-protocol";
import type { Logger } from "pino";
import {
parse as uuidparse,
stringify as uuidstringify,
v4 as uuidv4,
} from "uuid";

Check warning on line 12 in engine/sdks/typescript/runner/src/tunnel.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedImports

This import is unused.
import type { Runner, RunnerActor } from "./mod";
import {
stringifyToClientTunnelMessageKind,
Expand Down Expand Up @@ -57,6 +57,13 @@
actorId: string;
}> = [];

/** Buffer for messages when not connected */
#bufferedMessages: Array<{
gatewayId: GatewayId;
requestId: RequestId;
messageKind: protocol.ToServerTunnelMessageKind;
}> = [];

get log(): Logger | undefined {
return this.#runner.log;
}
Expand All @@ -69,6 +76,24 @@
// No-op - kept for compatibility
}

resendBufferedEvents(): void {
if (this.#bufferedMessages.length === 0) {
return;
}

this.log?.info({
msg: "resending buffered tunnel messages",
count: this.#bufferedMessages.length,
});

const messages = this.#bufferedMessages;
this.#bufferedMessages = [];

for (const { gatewayId, requestId, messageKind } of messages) {
this.#sendMessage(gatewayId, requestId, messageKind);
}
}

shutdown() {
// NOTE: Pegboard WS already closed at this point, cannot send
// anything. All teardown logic is handled by pegboard-runner.
Expand Down Expand Up @@ -458,13 +483,14 @@
requestId: RequestId,
messageKind: protocol.ToServerTunnelMessageKind,
) {
// TODO: Switch this with runner WS
// Buffer message if not connected
if (!this.#runner.__webSocketReady()) {
this.log?.warn({
msg: "cannot send tunnel message, socket not connected to engine. tunnel data dropped.",
this.log?.debug({
msg: "buffering tunnel message, socket not connected to engine",
requestId: idToStr(requestId),
message: stringifyToServerTunnelMessageKind(messageKind),
});
this.#bufferedMessages.push({ gatewayId, requestId, messageKind });
return;
}

Expand Down
Loading