11import type * as protocol from "@rivetkit/engine-runner-protocol" ;
22import type { PendingRequest } from "./tunnel" ;
33import type { WebSocketTunnelAdapter } from "./websocket-tunnel-adapter" ;
4- import { arraysEqual } from "./utils" ;
4+ import { arraysEqual , promiseWithResolvers } from "./utils" ;
5+ import { logger } from "./log" ;
6+ import * as tunnelId from "./tunnel-id" ;
57
68export interface ActorConfig {
79 name : string ;
@@ -24,11 +26,28 @@ export class RunnerActor {
2426 requestId : protocol . RequestId ;
2527 ws : WebSocketTunnelAdapter ;
2628 } > = [ ] ;
29+ actorStartPromise : ReturnType < typeof promiseWithResolvers < void > > ;
2730
28- constructor ( actorId : string , generation : number , config : ActorConfig ) {
31+ /**
32+ * If restoreHibernatingRequests has been called. This is used to assert
33+ * that the caller is implemented correctly.
34+ **/
35+ hibernationRestored : boolean = false ;
36+
37+ constructor (
38+ actorId : string ,
39+ generation : number ,
40+ config : ActorConfig ,
41+ /**
42+ * List of hibernating requests provided by the gateway on actor start.
43+ * This represents the WebSocket connections that the gateway knows about.
44+ **/
45+ public hibernatingRequests : readonly protocol . HibernatingRequest [ ] ,
46+ ) {
2947 this . actorId = actorId ;
3048 this . generation = generation ;
3149 this . config = config ;
50+ this . actorStartPromise = promiseWithResolvers ( ) ;
3251 }
3352
3453 // Pending request methods
@@ -43,13 +62,78 @@ export class RunnerActor {
4362 ) ?. request ;
4463 }
4564
46- setPendingRequest (
65+ createPendingRequest (
66+ gatewayId : protocol . GatewayId ,
67+ requestId : protocol . RequestId ,
68+ clientMessageIndex : number ,
69+ ) {
70+ const exists =
71+ this . getPendingRequest ( gatewayId , requestId ) !== undefined ;
72+ if ( exists ) {
73+ logger ( ) ?. warn ( {
74+ msg : "attempting to set pending request twice, replacing existing" ,
75+ gatewayId : tunnelId . gatewayIdToString ( gatewayId ) ,
76+ requestId : tunnelId . requestIdToString ( requestId ) ,
77+ } ) ;
78+ // Delete existing pending request before adding the new one
79+ this . deletePendingRequest ( gatewayId , requestId ) ;
80+ }
81+ this . pendingRequests . push ( {
82+ gatewayId,
83+ requestId,
84+ request : {
85+ resolve : ( ) => { } ,
86+ reject : ( ) => { } ,
87+ actorId : this . actorId ,
88+ gatewayId : gatewayId ,
89+ requestId : requestId ,
90+ clientMessageIndex,
91+ } ,
92+ } ) ;
93+ logger ( ) ?. debug ( {
94+ msg : "added pending request" ,
95+ gatewayId : tunnelId . gatewayIdToString ( gatewayId ) ,
96+ requestId : tunnelId . requestIdToString ( requestId ) ,
97+ length : this . pendingRequests . length ,
98+ } ) ;
99+ }
100+
101+ createPendingRequestWithStreamController (
47102 gatewayId : protocol . GatewayId ,
48103 requestId : protocol . RequestId ,
49- request : PendingRequest ,
104+ clientMessageIndex : number ,
105+ streamController : ReadableStreamDefaultController < Uint8Array > ,
50106 ) {
51- this . deletePendingRequest ( gatewayId , requestId ) ;
52- this . pendingRequests . push ( { gatewayId, requestId, request } ) ;
107+ const exists =
108+ this . getPendingRequest ( gatewayId , requestId ) !== undefined ;
109+ if ( exists ) {
110+ logger ( ) ?. warn ( {
111+ msg : "attempting to set pending request twice, replacing existing" ,
112+ gatewayId : tunnelId . gatewayIdToString ( gatewayId ) ,
113+ requestId : tunnelId . requestIdToString ( requestId ) ,
114+ } ) ;
115+ // Delete existing pending request before adding the new one
116+ this . deletePendingRequest ( gatewayId , requestId ) ;
117+ }
118+ this . pendingRequests . push ( {
119+ gatewayId,
120+ requestId,
121+ request : {
122+ resolve : ( ) => { } ,
123+ reject : ( ) => { } ,
124+ actorId : this . actorId ,
125+ gatewayId : gatewayId ,
126+ requestId : requestId ,
127+ clientMessageIndex,
128+ streamController,
129+ } ,
130+ } ) ;
131+ logger ( ) ?. debug ( {
132+ msg : "added pending request with stream controller" ,
133+ gatewayId : tunnelId . gatewayIdToString ( gatewayId ) ,
134+ requestId : tunnelId . requestIdToString ( requestId ) ,
135+ length : this . pendingRequests . length ,
136+ } ) ;
53137 }
54138
55139 deletePendingRequest (
@@ -63,6 +147,12 @@ export class RunnerActor {
63147 ) ;
64148 if ( index !== - 1 ) {
65149 this . pendingRequests . splice ( index , 1 ) ;
150+ logger ( ) ?. debug ( {
151+ msg : "removed pending request" ,
152+ gatewayId : tunnelId . gatewayIdToString ( gatewayId ) ,
153+ requestId : tunnelId . requestIdToString ( requestId ) ,
154+ length : this . pendingRequests . length ,
155+ } ) ;
66156 }
67157 }
68158
@@ -83,7 +173,11 @@ export class RunnerActor {
83173 requestId : protocol . RequestId ,
84174 ws : WebSocketTunnelAdapter ,
85175 ) {
86- this . deleteWebSocket ( gatewayId , requestId ) ;
176+ const exists = this . getWebSocket ( gatewayId , requestId ) !== undefined ;
177+ if ( exists ) {
178+ logger ( ) ?. warn ( { msg : "attempting to set websocket twice" } ) ;
179+ return ;
180+ }
87181 this . webSockets . push ( { gatewayId, requestId, ws } ) ;
88182 }
89183
0 commit comments