diff --git a/engine/packages/pegboard-gateway/src/keepalive_task.rs b/engine/packages/pegboard-gateway/src/keepalive_task.rs index 597892019a..2172fbdcd1 100644 --- a/engine/packages/pegboard-gateway/src/keepalive_task.rs +++ b/engine/packages/pegboard-gateway/src/keepalive_task.rs @@ -1,5 +1,6 @@ use anyhow::Result; use gas::prelude::*; +use pegboard::tunnel::id as tunnel_id; use pegboard::tunnel::id::{GatewayId, RequestId}; use rand::Rng; use std::time::Duration; @@ -12,7 +13,6 @@ use crate::shared_state::SharedState; /// next actor start. /// /// Only ran for hibernating requests. - pub async fn task( shared_state: SharedState, ctx: StandaloneCtx, @@ -30,10 +30,6 @@ pub async fn task( )); ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - // Discard the first tick since it fires immediately and we've already called this - // above - ping_interval.tick().await; - loop { tokio::select! { _ = ping_interval.tick() => {} @@ -46,6 +42,13 @@ pub async fn task( let jitter = { rand::thread_rng().gen_range(0..128) }; tokio::time::sleep(Duration::from_millis(jitter)).await; + tracing::debug!( + %actor_id, + gateway_id=%tunnel_id::gateway_id_to_string(&gateway_id), + request_id=%tunnel_id::request_id_to_string(&request_id), + "updating hws keepalive" + ); + tokio::try_join!( ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input { actor_id, diff --git a/engine/packages/pegboard-gateway/src/lib.rs b/engine/packages/pegboard-gateway/src/lib.rs index a431c53dbe..4af6ef1c6f 100644 --- a/engine/packages/pegboard-gateway/src/lib.rs +++ b/engine/packages/pegboard-gateway/src/lib.rs @@ -8,20 +8,20 @@ use hyper::{Request, Response, StatusCode}; use pegboard::tunnel::id::{self as tunnel_id, RequestId}; use rivet_error::*; use rivet_guard_core::{ - WebSocketHandle, custom_serve::{CustomServeTrait, HibernationResult}, errors::{ServiceUnavailable, WebSocketServiceUnavailable}, - proxy_service::{ResponseBody, is_ws_hibernate}, + proxy_service::{is_ws_hibernate, ResponseBody}, request_context::RequestContext, websocket_handle::WebSocketReceiver, + WebSocketHandle, }; use rivet_runner_protocol as protocol; use rivet_util::serde::HashableMap; use std::{sync::Arc, time::Duration}; -use tokio::sync::{Mutex, watch}; +use tokio::sync::{watch, Mutex}; use tokio_tungstenite::tungstenite::{ + protocol::frame::{coding::CloseCode, CloseFrame}, Message, - protocol::frame::{CloseFrame, coding::CloseCode}, }; use crate::shared_state::{InFlightRequestHandle, SharedState}; @@ -578,16 +578,6 @@ impl CustomServeTrait for PegboardGateway { client_ws: WebSocketHandle, request_id: RequestId, ) -> Result { - // Insert hibernating request entry before checking for pending messages - // This ensures the entry exists even if we immediately rewake the actor - self.ctx - .op(pegboard::ops::actor::hibernating_request::upsert::Input { - actor_id: self.actor_id, - gateway_id: self.shared_state.gateway_id(), - request_id, - }) - .await?; - // Immediately rewake if we have pending messages if self .shared_state diff --git a/engine/packages/pegboard-gateway/src/shared_state.rs b/engine/packages/pegboard-gateway/src/shared_state.rs index 57e1fd4cea..6c9d1bd395 100644 --- a/engine/packages/pegboard-gateway/src/shared_state.rs +++ b/engine/packages/pegboard-gateway/src/shared_state.rs @@ -2,8 +2,8 @@ use anyhow::Result; use gas::prelude::*; use pegboard::tunnel::id::{self as tunnel_id, GatewayId, RequestId}; use rivet_guard_core::errors::WebSocketServiceTimeout; -use rivet_runner_protocol::{self as protocol, PROTOCOL_VERSION, versioned}; -use scc::{HashMap, hash_map::Entry}; +use rivet_runner_protocol::{self as protocol, versioned, PROTOCOL_VERSION}; +use scc::{hash_map::Entry, HashMap}; use std::{ ops::Deref, sync::Arc, @@ -13,7 +13,7 @@ use tokio::sync::{mpsc, watch}; use universalpubsub::{NextOutput, PubSub, PublishOpts, Subscriber}; use vbare::OwnedVersionedData; -use crate::{WebsocketPendingLimitReached, metrics}; +use crate::{metrics, WebsocketPendingLimitReached}; const GC_INTERVAL: Duration = Duration::from_secs(15); const TUNNEL_PING_TIMEOUT: i64 = util::duration::seconds(30); @@ -512,7 +512,13 @@ impl SharedState { } } - if hs.last_ping.elapsed() > hibernation_timeout { + let hs_elapsed = hs.last_ping.elapsed(); + tracing::debug!( + hs_elapsed=%hs_elapsed.as_secs_f64(), + timeout=%hibernation_timeout.as_secs_f64(), + "checking hibernating state elapsed time" + ); + if hs_elapsed> hibernation_timeout { break 'reason Some(MsgGcReason::HibernationTimeout); } } else if req.msg_tx.is_closed() { diff --git a/engine/sdks/schemas/runner-protocol/v3.bare b/engine/sdks/schemas/runner-protocol/v3.bare index 207b01c828..e1c1cd369c 100644 --- a/engine/sdks/schemas/runner-protocol/v3.bare +++ b/engine/sdks/schemas/runner-protocol/v3.bare @@ -204,7 +204,7 @@ type CommandWrapper struct { # Message ID -type MessageIdParts struct { +type MessageId struct { # Globally unique ID gatewayId: GatewayId # Unique ID to the gateway @@ -213,8 +213,6 @@ type MessageIdParts struct { messageIndex: MessageIndex } -type MessageId data[10] - # Ack (deprecated, older protocols that have gc cycles to check for tunnel ack) type DeprecatedTunnelAck void diff --git a/engine/sdks/typescript/runner-protocol/src/index.ts b/engine/sdks/typescript/runner-protocol/src/index.ts index ffa2222ce5..941d1eea03 100644 --- a/engine/sdks/typescript/runner-protocol/src/index.ts +++ b/engine/sdks/typescript/runner-protocol/src/index.ts @@ -967,7 +967,7 @@ export function writeCommandWrapper(bc: bare.ByteCursor, x: CommandWrapper): voi writeCommand(bc, x.inner) } -export type MessageIdParts = { +export type MessageId = { /** * Globally unique ID */ @@ -982,7 +982,7 @@ export type MessageIdParts = { readonly messageIndex: MessageIndex } -export function readMessageIdParts(bc: bare.ByteCursor): MessageIdParts { +export function readMessageId(bc: bare.ByteCursor): MessageId { return { gatewayId: readGatewayId(bc), requestId: readRequestId(bc), @@ -990,42 +990,12 @@ export function readMessageIdParts(bc: bare.ByteCursor): MessageIdParts { } } -export function writeMessageIdParts(bc: bare.ByteCursor, x: MessageIdParts): void { +export function writeMessageId(bc: bare.ByteCursor, x: MessageId): void { writeGatewayId(bc, x.gatewayId) writeRequestId(bc, x.requestId) writeMessageIndex(bc, x.messageIndex) } -export function encodeMessageIdParts(x: MessageIdParts, config?: Partial): Uint8Array { - const fullConfig = config != null ? bare.Config(config) : DEFAULT_CONFIG - const bc = new bare.ByteCursor( - new Uint8Array(fullConfig.initialBufferLength), - fullConfig, - ) - writeMessageIdParts(bc, x) - return new Uint8Array(bc.view.buffer, bc.view.byteOffset, bc.offset) -} - -export function decodeMessageIdParts(bytes: Uint8Array): MessageIdParts { - const bc = new bare.ByteCursor(bytes, DEFAULT_CONFIG) - const result = readMessageIdParts(bc) - if (bc.offset < bc.view.byteLength) { - throw new bare.BareError(bc.offset, "remaining bytes") - } - return result -} - -export type MessageId = ArrayBuffer - -export function readMessageId(bc: bare.ByteCursor): MessageId { - return bare.readFixedData(bc, 10) -} - -export function writeMessageId(bc: bare.ByteCursor, x: MessageId): void { - assert(x.byteLength === 10) - bare.writeFixedData(bc, x) -} - export type DeprecatedTunnelAck = null function read9(bc: bare.ByteCursor): ReadonlyMap { diff --git a/engine/sdks/typescript/runner/src/actor.ts b/engine/sdks/typescript/runner/src/actor.ts index 272a69b63f..a2016f61f2 100644 --- a/engine/sdks/typescript/runner/src/actor.ts +++ b/engine/sdks/typescript/runner/src/actor.ts @@ -1,9 +1,8 @@ import type * as protocol from "@rivetkit/engine-runner-protocol"; +import { logger } from "./log"; import type { PendingRequest } from "./tunnel"; +import { arraysEqual, idToStr, promiseWithResolvers } from "./utils"; import type { WebSocketTunnelAdapter } from "./websocket-tunnel-adapter"; -import { arraysEqual, promiseWithResolvers } from "./utils"; -import { logger } from "./log"; -import * as tunnelId from "./tunnel-id"; export interface ActorConfig { name: string; @@ -72,8 +71,8 @@ export class RunnerActor { if (exists) { logger()?.warn({ msg: "attempting to set pending request twice, replacing existing", - gatewayId: tunnelId.gatewayIdToString(gatewayId), - requestId: tunnelId.requestIdToString(requestId), + gatewayId: idToStr(gatewayId), + requestId: idToStr(requestId), }); // Delete existing pending request before adding the new one this.deletePendingRequest(gatewayId, requestId); @@ -92,8 +91,8 @@ export class RunnerActor { }); logger()?.debug({ msg: "added pending request", - gatewayId: tunnelId.gatewayIdToString(gatewayId), - requestId: tunnelId.requestIdToString(requestId), + gatewayId: idToStr(gatewayId), + requestId: idToStr(requestId), length: this.pendingRequests.length, }); } @@ -109,8 +108,8 @@ export class RunnerActor { if (exists) { logger()?.warn({ msg: "attempting to set pending request twice, replacing existing", - gatewayId: tunnelId.gatewayIdToString(gatewayId), - requestId: tunnelId.requestIdToString(requestId), + gatewayId: idToStr(gatewayId), + requestId: idToStr(requestId), }); // Delete existing pending request before adding the new one this.deletePendingRequest(gatewayId, requestId); @@ -130,8 +129,8 @@ export class RunnerActor { }); logger()?.debug({ msg: "added pending request with stream controller", - gatewayId: tunnelId.gatewayIdToString(gatewayId), - requestId: tunnelId.requestIdToString(requestId), + gatewayId: idToStr(gatewayId), + requestId: idToStr(requestId), length: this.pendingRequests.length, }); } @@ -149,8 +148,8 @@ export class RunnerActor { this.pendingRequests.splice(index, 1); logger()?.debug({ msg: "removed pending request", - gatewayId: tunnelId.gatewayIdToString(gatewayId), - requestId: tunnelId.requestIdToString(requestId), + gatewayId: idToStr(gatewayId), + requestId: idToStr(requestId), length: this.pendingRequests.length, }); } diff --git a/engine/sdks/typescript/runner/src/mod.ts b/engine/sdks/typescript/runner/src/mod.ts index 22e0b3b9ac..f91d3f70e0 100644 --- a/engine/sdks/typescript/runner/src/mod.ts +++ b/engine/sdks/typescript/runner/src/mod.ts @@ -13,8 +13,8 @@ import { import { importWebSocket } from "./websocket.js"; export type { HibernatingWebSocketMetadata }; -export * as tunnelId from "./tunnel-id"; export { RunnerActor, type ActorConfig }; +export { idToStr } from "./utils"; const KV_EXPIRE: number = 30_000; const PROTOCOL_VERSION: number = 3; diff --git a/engine/sdks/typescript/runner/src/stringify.ts b/engine/sdks/typescript/runner/src/stringify.ts index fc9722f4b5..3001108b77 100644 --- a/engine/sdks/typescript/runner/src/stringify.ts +++ b/engine/sdks/typescript/runner/src/stringify.ts @@ -1,4 +1,5 @@ import type * as protocol from "@rivetkit/engine-runner-protocol"; +import { idToStr } from "./utils"; /** * Helper function to stringify ArrayBuffer for logging @@ -24,6 +25,13 @@ function stringifyMap(map: ReadonlyMap): string { return `Map(${map.size}){${entries}}`; } +/** + * Helper function to stringify MessageId for logging + */ +function stringifyMessageId(messageId: protocol.MessageId): string { + return `MessageId{gatewayId: ${idToStr(messageId.gatewayId)}, requestId: ${idToStr(messageId.requestId)}, messageIndex: ${messageId.messageIndex}}`; +} + /** * Stringify ToServerTunnelMessageKind for logging * Handles ArrayBuffers, BigInts, and Maps that can't be JSON.stringified @@ -111,15 +119,17 @@ export function stringifyToClientTunnelMessageKind( export function stringifyCommand(command: protocol.Command): string { switch (command.tag) { case "CommandStartActor": { - const { actorId, generation, config, hibernatingRequests } = command.val; + const { actorId, generation, config, hibernatingRequests } = + command.val; const keyStr = config.key === null ? "null" : `"${config.key}"`; const inputStr = config.input === null ? "null" : stringifyArrayBuffer(config.input); - const hibernatingRequestsStr = hibernatingRequests.length > 0 - ? `[${hibernatingRequests.map((hr) => `{gatewayId: ${stringifyArrayBuffer(hr.gatewayId)}, requestId: ${stringifyArrayBuffer(hr.requestId)}}`).join(", ")}]` - : "[]"; + const hibernatingRequestsStr = + hibernatingRequests.length > 0 + ? `[${hibernatingRequests.map((hr) => `{gatewayId: ${idToStr(hr.gatewayId)}, requestId: ${idToStr(hr.requestId)}}`).join(", ")}]` + : "[]"; return `CommandStartActor{actorId: "${actorId}", generation: ${generation}, config: {name: "${config.name}", key: ${keyStr}, createTs: ${stringifyBigInt(config.createTs)}, input: ${inputStr}}, hibernatingRequests: ${hibernatingRequestsStr}}`; } case "CommandStopActor": { @@ -193,8 +203,14 @@ export function stringifyEventWrapper(wrapper: protocol.EventWrapper): string { export function stringifyToServer(message: protocol.ToServer): string { switch (message.tag) { case "ToServerInit": { - const { name, version, totalSlots, lastCommandIdx, prepopulateActorNames, metadata } = - message.val; + const { + name, + version, + totalSlots, + lastCommandIdx, + prepopulateActorNames, + metadata, + } = message.val; const lastCommandIdxStr = lastCommandIdx === null ? "null" @@ -227,8 +243,7 @@ export function stringifyToServer(message: protocol.ToServer): string { } case "ToServerTunnelMessage": { const { messageId, messageKind } = message.val; - const messageIdStr = stringifyArrayBuffer(messageId); - return `ToServerTunnelMessage{messageId: ${messageIdStr}, messageKind: ${stringifyToServerTunnelMessageKind(messageKind)}}`; + return `ToServerTunnelMessage{messageId: ${stringifyMessageId(messageId)}, messageKind: ${stringifyToServerTunnelMessageKind(messageKind)}}`; } } } @@ -261,8 +276,7 @@ export function stringifyToClient(message: protocol.ToClient): string { } case "ToClientTunnelMessage": { const { messageId, messageKind } = message.val; - const messageIdStr = stringifyArrayBuffer(messageId); - return `ToClientTunnelMessage{messageId: ${messageIdStr}, messageKind: ${stringifyToClientTunnelMessageKind(messageKind)}}`; + return `ToClientTunnelMessage{messageId: ${stringifyMessageId(messageId)}, messageKind: ${stringifyToClientTunnelMessageKind(messageKind)}}`; } } } diff --git a/engine/sdks/typescript/runner/src/tunnel-id.ts b/engine/sdks/typescript/runner/src/tunnel-id.ts deleted file mode 100644 index 8474e4e4cb..0000000000 --- a/engine/sdks/typescript/runner/src/tunnel-id.ts +++ /dev/null @@ -1,104 +0,0 @@ -import * as protocol from "@rivetkit/engine-runner-protocol"; - -// Type aliases for the message ID components -export type GatewayId = ArrayBuffer; -export type RequestId = ArrayBuffer; -export type MessageIndex = number; -export type MessageId = ArrayBuffer; - -/** - * Build a MessageId from its components - */ -export function buildMessageId( - gatewayId: GatewayId, - requestId: RequestId, - messageIndex: MessageIndex, -): MessageId { - if (gatewayId.byteLength !== 4) { - throw new Error( - `invalid gateway id length: expected 4 bytes, got ${gatewayId.byteLength}`, - ); - } - if (requestId.byteLength !== 4) { - throw new Error( - `invalid request id length: expected 4 bytes, got ${requestId.byteLength}`, - ); - } - if (messageIndex < 0 || messageIndex > 0xffff) { - throw new Error( - `invalid message index: must be u16, got ${messageIndex}`, - ); - } - - const parts: protocol.MessageIdParts = { - gatewayId, - requestId, - messageIndex, - }; - - const encoded = protocol.encodeMessageIdParts(parts); - - if (encoded.byteLength !== 10) { - throw new Error( - `message id serialization produced wrong size: expected 10 bytes, got ${encoded.byteLength}`, - ); - } - - // Create a new ArrayBuffer from the Uint8Array - const messageId = new ArrayBuffer(10); - new Uint8Array(messageId).set(encoded); - return messageId; -} - -/** - * Parse a MessageId into its components - */ -export function parseMessageId(messageId: MessageId): protocol.MessageIdParts { - if (messageId.byteLength !== 10) { - throw new Error( - `invalid message id length: expected 10 bytes, got ${messageId.byteLength}`, - ); - } - const uint8Array = new Uint8Array(messageId); - return protocol.decodeMessageIdParts(uint8Array); -} - -/** - * Convert a GatewayId to a base64 string - */ -export function gatewayIdToString(gatewayId: GatewayId): string { - const uint8Array = new Uint8Array(gatewayId); - return bufferToBase64(uint8Array); -} - -/** - * Convert a RequestId to a base64 string - */ -export function requestIdToString(requestId: RequestId): string { - const uint8Array = new Uint8Array(requestId); - return bufferToBase64(uint8Array); -} - -/** - * Convert a MessageId to a base64 string - */ -export function messageIdToString(messageId: MessageId): string { - const uint8Array = new Uint8Array(messageId); - return bufferToBase64(uint8Array); -} - -// Helper functions for base64 encoding/decoding - -function bufferToBase64(buffer: Uint8Array): string { - // Use Node.js Buffer if available, otherwise use browser btoa - if (typeof Buffer !== "undefined") { - return Buffer.from(buffer).toString("base64"); - } else { - // Browser environment - let binary = ""; - for (let i = 0; i < buffer.byteLength; i++) { - binary += String.fromCharCode(buffer[i]); - } - return btoa(binary); - } -} diff --git a/engine/sdks/typescript/runner/src/tunnel.ts b/engine/sdks/typescript/runner/src/tunnel.ts index 93f8db5c31..24c2692ed2 100644 --- a/engine/sdks/typescript/runner/src/tunnel.ts +++ b/engine/sdks/typescript/runner/src/tunnel.ts @@ -1,8 +1,8 @@ import type * as protocol from "@rivetkit/engine-runner-protocol"; import type { + GatewayId, MessageId, RequestId, - GatewayId, } from "@rivetkit/engine-runner-protocol"; import type { Logger } from "pino"; import { @@ -15,8 +15,7 @@ import { stringifyToClientTunnelMessageKind, stringifyToServerTunnelMessageKind, } from "./stringify"; -import * as tunnelId from "./tunnel-id"; -import { arraysEqual, unreachable } from "./utils"; +import { arraysEqual, idToStr, unreachable } from "./utils"; import { HIBERNATABLE_SYMBOL, WebSocketTunnelAdapter, @@ -132,7 +131,7 @@ export class Tunnel { let connectedButNotLoadedCount = 0; let restoredCount = 0; for (const { gatewayId, requestId } of actor.hibernatingRequests) { - const requestIdStr = tunnelId.requestIdToString(requestId); + const requestIdStr = idToStr(requestId); const meta = metaEntries.find( (entry) => arraysEqual(entry.gatewayId, gatewayId) && @@ -224,7 +223,7 @@ export class Tunnel { // Process loaded but not connected (stale) - remove them let loadedButNotConnectedCount = 0; for (const meta of metaEntries) { - const requestIdStr = tunnelId.requestIdToString(meta.requestId); + const requestIdStr = idToStr(meta.requestId); const isConnected = actor.hibernatingRequests.some( (req) => arraysEqual(req.gatewayId, meta.gatewayId) && @@ -426,7 +425,7 @@ export class Tunnel { if (!entry) { this.log?.warn({ msg: "missing requestToActor entry", - requestId: tunnelId.requestIdToString(requestId), + requestId: idToStr(requestId), }); return undefined; } @@ -435,7 +434,7 @@ export class Tunnel { if (!actor) { this.log?.warn({ msg: "missing actor for requestToActor lookup", - requestId: tunnelId.requestIdToString(requestId), + requestId: idToStr(requestId), actorId: entry.actorId, }); return undefined; @@ -463,7 +462,7 @@ export class Tunnel { if (!this.#runner.__webSocketReady()) { this.log?.warn({ msg: "cannot send tunnel message, socket not connected to engine. tunnel data dropped.", - requestId: tunnelId.requestIdToString(requestId), + requestId: idToStr(requestId), message: stringifyToServerTunnelMessageKind(messageKind), }); return; @@ -473,8 +472,8 @@ export class Tunnel { // // We don't have to wait for the actor to start since we're not calling // any callbacks on the actor - const gatewayIdStr = tunnelId.gatewayIdToString(gatewayId); - const requestIdStr = tunnelId.requestIdToString(requestId); + const gatewayIdStr = idToStr(gatewayId); + const requestIdStr = idToStr(requestId); const actor = this.getRequestActor(gatewayId, requestId); if (!actor) { this.log?.warn({ @@ -502,12 +501,12 @@ export class Tunnel { } // Build message ID from gatewayId + requestId + messageIndex - const messageId = tunnelId.buildMessageId( + const messageId: protocol.MessageId = { gatewayId, requestId, - clientMessageIndex, - ); - const messageIdStr = tunnelId.messageIdToString(messageId); + messageIndex: clientMessageIndex, + }; + const messageIdStr = `${idToStr(messageId.gatewayId)}-${idToStr(messageId.requestId)}-${messageId.messageIndex}`; this.log?.debug({ msg: "sending tunnel msg", @@ -593,19 +592,15 @@ export class Tunnel { async handleTunnelMessage(message: protocol.ToClientTunnelMessage) { // Parse the gateway ID, request ID, and message index from the messageId - const messageIdParts = tunnelId.parseMessageId(message.messageId); - const gatewayId = messageIdParts.gatewayId; - const requestId = messageIdParts.requestId; + const { gatewayId, requestId, messageIndex } = message.messageId; - const gatewayIdStr = tunnelId.gatewayIdToString(gatewayId); - const requestIdStr = tunnelId.requestIdToString(requestId); - const messageIdStr = tunnelId.messageIdToString(message.messageId); + const gatewayIdStr = idToStr(gatewayId); + const requestIdStr = idToStr(requestId); this.log?.debug({ msg: "receive tunnel msg", - messageId: messageIdStr, gatewayId: gatewayIdStr, requestId: requestIdStr, - messageIndex: messageIdParts.messageIndex, + messageIndex: message.messageId.messageIndex, message: stringifyToClientTunnelMessageKind(message.messageKind), }); @@ -638,7 +633,7 @@ export class Tunnel { await this.#handleWebSocketMessage( gatewayId, requestId, - messageIdParts.messageIndex, + messageIndex, message.messageKind.val, ); break; @@ -664,7 +659,7 @@ export class Tunnel { req: protocol.ToClientRequestStart, ) { // Track this request for the actor - const requestIdStr = tunnelId.requestIdToString(requestId); + const requestIdStr = idToStr(requestId); const actor = await this.#runner.getAndWaitForActor(req.actorId); if (!actor) { this.log?.warn({ @@ -909,7 +904,7 @@ export class Tunnel { // // Sending a ToServerWebSocketClose will terminate the WebSocket early. - const requestIdStr = tunnelId.requestIdToString(requestId); + const requestIdStr = idToStr(requestId); // Validate actor exists const actor = await this.#runner.getAndWaitForActor(open.actorId); @@ -1058,7 +1053,7 @@ export class Tunnel { requestId: ArrayBuffer, clientMessageIndex: number, ) { - const requestIdStr = tunnelId.requestIdToString(requestId); + const requestIdStr = idToStr(requestId); this.log?.debug({ msg: "ack ws msg", diff --git a/engine/sdks/typescript/runner/src/utils.ts b/engine/sdks/typescript/runner/src/utils.ts index 6cd0cdd114..066e8554f0 100644 --- a/engine/sdks/typescript/runner/src/utils.ts +++ b/engine/sdks/typescript/runner/src/utils.ts @@ -150,3 +150,10 @@ export function promiseWithResolvers(): { }); return { promise, resolve, reject }; } + +export function idToStr(id: ArrayBuffer): string { + const bytes = new Uint8Array(id); + return Array.from(bytes) + .map((byte) => byte.toString(16).padStart(2, "0")) + .join(""); +} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts index 2136a66844..1cacf86331 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts @@ -1,17 +1,12 @@ -import * as cbor from "cbor-x"; +import { idToStr } from "@rivetkit/engine-runner"; import onChange from "on-change"; import { isCborSerializable, stringifyError } from "@/common/utils"; -import type * as persistSchema from "@/schemas/actor-persist/mod"; import { ACTOR_VERSIONED, CONN_VERSIONED, } from "@/schemas/actor-persist/versioned"; -import { - bufferToArrayBuffer, - promiseWithResolvers, - SinglePromiseQueue, -} from "@/utils"; -import { type AnyConn, CONN_STATE_MANAGER_SYMBOL, Conn } from "../conn/mod"; +import { promiseWithResolvers, SinglePromiseQueue } from "@/utils"; +import { type AnyConn, CONN_STATE_MANAGER_SYMBOL } from "../conn/mod"; import { convertConnToBarePersistedConn } from "../conn/persisted"; import type { ActorDriver } from "../driver"; import * as errors from "../errors"; @@ -19,7 +14,6 @@ import { isConnStatePath, isStatePath } from "../utils"; import { KEYS, makeConnKey } from "./kv"; import type { ActorInstance } from "./mod"; import { convertActorToBarePersisted, type PersistedActor } from "./persisted"; -import { tunnelId } from "@rivetkit/engine-runner"; export interface SaveStateOptions { /** @@ -431,12 +425,8 @@ export class StateManager { this.#actor.rLog.info({ msg: "persisting connection", connId, - gatewayId: tunnelId.gatewayIdToString( - hibernatableDataRaw.requestId, - ), - requestId: tunnelId.requestIdToString( - hibernatableDataRaw.requestId, - ), + gatewayId: idToStr(hibernatableDataRaw.requestId), + requestId: idToStr(hibernatableDataRaw.requestId), serverMessageIndex: hibernatableDataRaw.serverMessageIndex, clientMessageIndex: diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index b06d41e533..b72be61c38 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -3,7 +3,7 @@ import type { RunnerConfig as EngineRunnerConfig, HibernatingWebSocketMetadata, } from "@rivetkit/engine-runner"; -import { Runner, tunnelId } from "@rivetkit/engine-runner"; +import { idToStr, Runner } from "@rivetkit/engine-runner"; import * as cbor from "cbor-x"; import type { Context as HonoContext } from "hono"; import { streamSSE } from "hono/streaming"; @@ -730,8 +730,8 @@ export class EngineActorDriver implements ActorDriver { msg: "event listeners attached to restored websocket", actorId, connId: conn?.id, - gatewayId: tunnelId.gatewayIdToString(gatewayIdBuf), - requestId: tunnelId.requestIdToString(requestIdBuf), + gatewayId: idToStr(gatewayIdBuf), + requestId: idToStr(requestIdBuf), websocketType: websocket?.constructor?.name, hasMessageListener: !!websocket.addEventListener, }); @@ -778,8 +778,8 @@ export class EngineActorDriver implements ActorDriver { // Determine configuration for new WS logger().debug({ msg: "no existing hibernatable websocket found", - gatewayId: tunnelId.gatewayIdToString(gatewayId), - requestId: tunnelId.requestIdToString(requestId), + gatewayId: idToStr(gatewayId), + requestId: idToStr(requestId), }); if (path === PATH_CONNECT) { return true;