Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions rivetkit-typescript/packages/rivetkit/src/actor/conn/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,21 @@ export enum DriverReadyState {
}

export interface ConnDriver {
/** The type of driver. Used for debug purposes only. */
type: string;

/**
* Unique request ID provided by the underlying provider. If none is
* available for this conn driver, a random UUID is generated.
**/
requestId: string;

/** ArrayBuffer version of requestId if relevant. */
requestIdBuf: ArrayBuffer | undefined;

/**
* If the connection can be hibernated. If true, this will allow the actor to go to sleep while the connection is still active.
**/
hibernatable: boolean;

sendMessage?(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export type ConnHttpState = Record<never, never>;

export function createHttpSocket(): ConnDriver {
return {
type: "http",
requestId: crypto.randomUUID(),
requestIdBuf: undefined,
hibernatable: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import { DriverReadyState } from "../driver";
* Unlike the standard HTTP driver, this provides connection lifecycle management
* for tracking the HTTP request through the actor's onRequest handler.
*/
export function createRawHttpSocket(): ConnDriver {
export function createRawRequestSocket(): ConnDriver {
return {
type: "raw-request",
requestId: crypto.randomUUID(),
requestIdBuf: undefined,
hibernatable: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export function createRawWebSocketSocket(
closePromise: Promise<void>,
): ConnDriver {
return {
type: "raw-websocket",
requestId,
requestIdBuf,
hibernatable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export function createWebSocketSocket(
closePromise: Promise<void>,
): ConnDriver {
return {
type: "websocket",
requestId,
requestIdBuf,
hibernatable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type { ActorConfig, InitContext } from "../config";
import type { ConnDriver } from "../conn/driver";
import { createHttpSocket } from "../conn/drivers/http";
import {
CONN_DRIVER_SYMBOL,
CONN_PERSIST_SYMBOL,
CONN_SEND_MESSAGE_SYMBOL,
CONN_STATE_ENABLED_SYMBOL,
Expand Down Expand Up @@ -138,6 +139,7 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
return Array.from(
this.#connectionManager.connections.entries(),
).map(([id, conn]) => ({
type: conn[CONN_DRIVER_SYMBOL]?.type,
id,
params: conn.params as any,
state: conn[CONN_STATE_ENABLED_SYMBOL]
Expand Down
2 changes: 1 addition & 1 deletion rivetkit-typescript/packages/rivetkit/src/actor/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,6 @@ export {
createActorRouter,
} from "./router";
export {
handleRawWebSocketHandler,
handleRawWebSocket as handleRawWebSocketHandler,
handleWebSocketConnect,
} from "./router-endpoints";
52 changes: 26 additions & 26 deletions rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import {
promiseWithResolvers,
} from "@/utils";
import { createHttpSocket } from "./conn/drivers/http";
import { createRawHttpSocket } from "./conn/drivers/raw-http";
import { createRawRequestSocket } from "./conn/drivers/raw-request";
import { createRawWebSocketSocket } from "./conn/drivers/raw-websocket";
import { createWebSocketSocket } from "./conn/drivers/websocket";
import type { ActorDriver } from "./driver";
Expand Down Expand Up @@ -377,7 +377,31 @@ export async function handleAction(
});
}

export async function handleRawWebSocketHandler(
export async function handleRawRequest(
req: Request,
actorDriver: ActorDriver,
actorId: string,
): Promise<Response> {
const actor = await actorDriver.loadActor(actorId);

// Track connection outside of scope for cleanup
let createdConn: AnyConn | undefined;

try {
const conn = await actor.createConn(createRawRequestSocket(), {}, req);

createdConn = conn;

return await actor.handleRawRequest(req, {});
} finally {
// Clean up the connection after the request completes
if (createdConn) {
actor.connDisconnected(createdConn, true);
}
}
}

export async function handleRawWebSocket(
req: Request | undefined,
path: string,
actorDriver: ActorDriver,
Expand Down Expand Up @@ -548,30 +572,6 @@ export function getRequestConnParams(req: HonoRequest): unknown {
}
}

export async function handleRawHttpHandler(
req: Request,
actorDriver: ActorDriver,
actorId: string,
): Promise<Response> {
const actor = await actorDriver.loadActor(actorId);

// Track connection outside of scope for cleanup
let createdConn: AnyConn | undefined;

try {
const conn = await actor.createConn(createRawHttpSocket(), {}, req);

createdConn = conn;

return await actor.handleRawRequest(req, {});
} finally {
// Clean up the connection after the request completes
if (createdConn) {
actor.connDisconnected(createdConn, true);
}
}
}

/**
* Truncase the PATH_WEBSOCKET_PREFIX path prefix in order to pass a clean
* path to the onWebSocket handler.
Expand Down
10 changes: 4 additions & 6 deletions rivetkit-typescript/packages/rivetkit/src/actor/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import {
type ConnectWebSocketOutput,
type ConnsMessageOpts,
handleAction,
handleRawHttpHandler,
handleRawWebSocketHandler,
handleRawRequest,
handleRawWebSocket,
handleWebSocketConnect,
} from "@/actor/router-endpoints";
import {
Expand Down Expand Up @@ -171,7 +171,6 @@ export function createActorRouter(
);
});

// Raw HTTP endpoints - /request/*
router.all("/request/*", async (c) => {
// TODO: This is not a clean way of doing this since `/http/` might exist mid-path
// Strip the /http prefix from the URL to get the original path
Expand All @@ -193,14 +192,13 @@ export function createActorRouter(
to: correctedRequest.url,
});

return await handleRawHttpHandler(
return await handleRawRequest(
correctedRequest,
actorDriver,
c.env.actorId,
);
});

// Raw WebSocket endpoint - /websocket/*
router.get(`${PATH_WEBSOCKET_PREFIX}*`, async (c) => {
const upgradeWebSocket = runConfig.getUpgradeWebSocket?.();
if (upgradeWebSocket) {
Expand All @@ -216,7 +214,7 @@ export function createActorRouter(
pathWithQuery,
});

return await handleRawWebSocketHandler(
return await handleRawWebSocket(
c.req.raw,
pathWithQuery,
actorDriver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { deserializeActorKey } from "@/actor/keys";
import { EncodingSchema } from "@/actor/protocol/serde";
import { type ActorRouter, createActorRouter } from "@/actor/router";
import {
handleRawWebSocketHandler,
handleRawWebSocket,
handleWebSocketConnect,
truncateRawWebSocketPathPrefix,
} from "@/actor/router-endpoints";
Expand Down Expand Up @@ -582,7 +582,7 @@ export class EngineActorDriver implements ActorDriver {
requestIdBuf,
);
} else if (url.pathname.startsWith(PATH_WEBSOCKET_PREFIX)) {
wsHandlerPromise = handleRawWebSocketHandler(
wsHandlerPromise = handleRawWebSocket(
request,
url.pathname + url.search,
this,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import invariant from "invariant";
import { generateConnRequestId } from "@/actor/conn/mod";
import { type ActorRouter, createActorRouter } from "@/actor/router";
import {
handleRawWebSocketHandler,
handleRawWebSocket,
handleWebSocketConnect,
} from "@/actor/router-endpoints";
import { createClientWithDriver } from "@/client/client";
Expand Down Expand Up @@ -182,7 +182,7 @@ export class FileSystemManagerDriver implements ManagerDriver {
) {
// Handle websocket proxy
// Use the full path with query parameters
const wsHandler = await handleRawWebSocketHandler(
const wsHandler = await handleRawWebSocket(
undefined,
path,
this.#actorDriver,
Expand Down Expand Up @@ -239,7 +239,7 @@ export class FileSystemManagerDriver implements ManagerDriver {
) {
// Handle websocket proxy
// Use the full path with query parameters
const wsHandler = await handleRawWebSocketHandler(
const wsHandler = await handleRawWebSocket(
c.req.raw,
path,
this.#actorDriver,
Expand Down
Loading