11import * as protocol from "@rivetkit/engine-runner-protocol" ;
22import type { Logger } from "pino" ;
33import type WebSocket from "ws" ;
4+ import { type ActorConfig , RunnerActor } from "./actor" ;
45import { logger , setLogger } from "./log.js" ;
56import { stringifyToClient , stringifyToServer } from "./stringify" ;
67import { type HibernatingWebSocketMetadata , Tunnel } from "./tunnel" ;
@@ -10,7 +11,6 @@ import {
1011 unreachable ,
1112} from "./utils" ;
1213import { importWebSocket } from "./websocket.js" ;
13- import { RunnerActor , type ActorConfig } from "./actor" ;
1414
1515export type { HibernatingWebSocketMetadata } ;
1616export * as tunnelId from "./tunnel-id" ;
@@ -22,7 +22,7 @@ const RUNNER_PING_INTERVAL = 3_000;
2222
2323/** Warn once the backlog significantly exceeds the server's ack batch size. */
2424const EVENT_BACKLOG_WARN_THRESHOLD = 10_000 ;
25- const SIGNAL_HANDLERS : ( ( ) => void ) [ ] = [ ] ;
25+ const SIGNAL_HANDLERS : ( ( ) => void | Promise < void > ) [ ] = [ ] ;
2626
2727export interface RunnerConfig {
2828 logger ?: Logger ;
@@ -202,6 +202,7 @@ export class Runner {
202202 #nextEventIdx: bigint = 0n ;
203203 #started: boolean = false ;
204204 #shutdown: boolean = false ;
205+ #shuttingDown: boolean = false ;
205206 #reconnectAttempt: number = 0 ;
206207 #reconnectTimeout?: NodeJS . Timeout ;
207208
@@ -260,13 +261,6 @@ export class Runner {
260261
261262 // MARK: Manage actors
262263 sleepActor ( actorId : string , generation ?: number ) {
263- if ( this . #shutdown) {
264- this . log ?. warn ( {
265- msg : "runner is shut down, cannot sleep actor" ,
266- } ) ;
267- return ;
268- }
269-
270264 const actor = this . getActor ( actorId , generation ) ;
271265 if ( ! actor ) return ;
272266
@@ -420,39 +414,50 @@ export class Runner {
420414
421415 if ( ! this . #config. noAutoShutdown ) {
422416 if ( ! SIGNAL_HANDLERS . length ) {
423- process . on ( "SIGTERM" , ( ) => {
417+ process . on ( "SIGTERM" , async ( ) => {
424418 this . log ?. debug ( "received SIGTERM" ) ;
425419
426420 for ( const handler of SIGNAL_HANDLERS ) {
427- handler ( ) ;
421+ await handler ( ) ;
428422 }
429423
430- process . exit ( 0 ) ;
424+ // TODO: Add back
425+ // process.exit(0);
431426 } ) ;
432- process . on ( "SIGINT" , ( ) => {
427+ process . on ( "SIGINT" , async ( ) => {
433428 this . log ?. debug ( "received SIGINT" ) ;
434429
435430 for ( const handler of SIGNAL_HANDLERS ) {
436- handler ( ) ;
431+ await handler ( ) ;
437432 }
438433
439- process . exit ( 0 ) ;
434+ // TODO: Add back
435+ // process.exit(0);
440436 } ) ;
441437
442438 this . log ?. debug ( {
443439 msg : "added SIGTERM listeners" ,
444440 } ) ;
445441 }
446442
447- SIGNAL_HANDLERS . push ( ( ) => {
443+ SIGNAL_HANDLERS . push ( async ( ) => {
448444 const weak = new WeakRef ( this ) ;
449- weak . deref ( ) ?. shutdown ( false , false ) ;
445+ await weak . deref ( ) ?. shutdown ( false , false ) ;
450446 } ) ;
451447 }
452448 }
453449
454450 // MARK: Shutdown
455451 async shutdown ( immediate : boolean , exit : boolean = false ) {
452+ // Prevent concurrent shutdowns
453+ if ( this . #shuttingDown) {
454+ this . log ?. debug ( {
455+ msg : "shutdown already in progress, ignoring" ,
456+ } ) ;
457+ return ;
458+ }
459+ this . #shuttingDown = true ;
460+
456461 this . log ?. info ( {
457462 msg : "starting shutdown" ,
458463 immediate,
@@ -515,8 +520,10 @@ export class Runner {
515520 readyState : pegboardWebSocket . readyState ,
516521 } ) ;
517522
518- // NOTE: We don't use #sendToServer here because that function checks if the runner is
519- // shut down
523+ // Start stopping
524+ //
525+ // The runner workflow will send StopActor commands for all
526+ // actors
520527 this . __sendToServer ( {
521528 tag : "ToServerStopping" ,
522529 val : null ,
@@ -536,7 +543,8 @@ export class Runner {
536543 } ) ;
537544 } ) ;
538545
539- // TODO: Wait for all actors to stop before closing ws
546+ // Wait for all actors to stop before closing ws
547+ await this . #waitForActorsToStop( pegboardWebSocket ) ;
540548
541549 this . log ?. info ( {
542550 msg : "closing WebSocket" ,
@@ -571,9 +579,96 @@ export class Runner {
571579 this . #tunnel = undefined ;
572580 }
573581
582+ this . #config. onShutdown ( ) ;
583+
574584 if ( exit ) process . exit ( 0 ) ;
585+ }
575586
576- this . #config. onShutdown ( ) ;
587+ /**
588+ * Wait for all actors to stop before proceeding with shutdown.
589+ *
590+ * This method polls every 100ms to check if all actors have been stopped.
591+ *
592+ * It will resolve early if:
593+ * - All actors are stopped
594+ * - The WebSocket connection is closed
595+ * - The shutdown timeout is reached (120 seconds)
596+ */
597+ async #waitForActorsToStop( ws : WebSocket ) : Promise < void > {
598+ const shutdownTimeout = 120_000 ; // 120 seconds
599+ const shutdownCheckInterval = 100 ; // Check every 100ms
600+ const progressLogInterval = 5_000 ; // Log progress every 5 seconds
601+ const shutdownStartTs = Date . now ( ) ;
602+ let lastProgressLogTs = 0 ; // Ensure first log happens immediately
603+
604+ return new Promise < void > ( ( resolve ) => {
605+ const checkActors = ( ) => {
606+ const now = Date . now ( ) ;
607+ const elapsed = now - shutdownStartTs ;
608+ const wsIsClosed = ws . readyState === 2 || ws . readyState === 3 ;
609+
610+ if ( this . #actors. size === 0 ) {
611+ this . log ?. info ( {
612+ msg : "all actors stopped" ,
613+ elapsed,
614+ } ) ;
615+ return true ;
616+ } else if ( wsIsClosed ) {
617+ this . log ?. warn ( {
618+ msg : "websocket closed before all actors stopped" ,
619+ remainingActors : this . #actors. size ,
620+ elapsed,
621+ } ) ;
622+ return true ;
623+ } else if ( elapsed >= shutdownTimeout ) {
624+ this . log ?. warn ( {
625+ msg : "shutdown timeout reached, forcing close" ,
626+ remainingActors : this . #actors. size ,
627+ elapsed,
628+ } ) ;
629+ return true ;
630+ } else {
631+ // Log progress every 5 seconds
632+ if ( now - lastProgressLogTs >= progressLogInterval ) {
633+ this . log ?. info ( {
634+ msg : "waiting for actors to stop" ,
635+ remainingActors : this . #actors. size ,
636+ elapsed,
637+ } ) ;
638+ lastProgressLogTs = now ;
639+ }
640+ return false ;
641+ }
642+ } ;
643+
644+ // Check immediately first
645+ if ( checkActors ( ) ) {
646+ this . log ?. debug ( {
647+ msg : "actors check completed immediately" ,
648+ } ) ;
649+ resolve ( ) ;
650+ return ;
651+ }
652+
653+ this . log ?. debug ( {
654+ msg : "starting actor wait interval" ,
655+ checkInterval : shutdownCheckInterval ,
656+ } ) ;
657+
658+ const interval = setInterval ( ( ) => {
659+ this . log ?. debug ( {
660+ msg : "actor wait interval tick" ,
661+ actorCount : this . #actors. size ,
662+ } ) ;
663+ if ( checkActors ( ) ) {
664+ this . log ?. debug ( {
665+ msg : "actors check completed, clearing interval" ,
666+ } ) ;
667+ clearInterval ( interval ) ;
668+ resolve ( ) ;
669+ }
670+ } , shutdownCheckInterval ) ;
671+ } ) ;
577672 }
578673
579674 // MARK: Networking
@@ -1014,13 +1109,6 @@ export class Runner {
10141109 generation : number ,
10151110 intentType : "sleep" | "stop" ,
10161111 ) {
1017- if ( this . #shutdown) {
1018- console . trace ( "send actor intent" , actorId , intentType ) ;
1019- this . log ?. warn ( {
1020- msg : "Runner is shut down, cannot send actor intent" ,
1021- } ) ;
1022- return ;
1023- }
10241112 let actorIntent : protocol . ActorIntent ;
10251113
10261114 if ( intentType === "sleep" ) {
@@ -1062,12 +1150,6 @@ export class Runner {
10621150 generation : number ,
10631151 stateType : "running" | "stopped" ,
10641152 ) {
1065- if ( this . #shutdown) {
1066- this . log ?. warn ( {
1067- msg : "Runner is shut down, cannot send actor state update" ,
1068- } ) ;
1069- return ;
1070- }
10711153 let actorState : protocol . ActorState ;
10721154
10731155 if ( stateType === "running" ) {
@@ -1108,13 +1190,6 @@ export class Runner {
11081190 }
11091191
11101192 #sendCommandAcknowledgment( ) {
1111- if ( this . #shutdown) {
1112- this . log ?. warn ( {
1113- msg : "Runner is shut down, cannot send command acknowledgment" ,
1114- } ) ;
1115- return ;
1116- }
1117-
11181193 if ( this . #lastCommandIdx < 0 ) {
11191194 // No commands received yet, nothing to acknowledge
11201195 return ;
@@ -1423,11 +1498,6 @@ export class Runner {
14231498 const actor = this . getActor ( actorId , generation ) ;
14241499 if ( ! actor ) return ;
14251500
1426- if ( this . #shutdown) {
1427- console . warn ( "Runner is shut down, cannot set alarm" ) ;
1428- return ;
1429- }
1430-
14311501 const alarmEvent : protocol . EventActorSetAlarm = {
14321502 actorId,
14331503 generation : actor . generation ,
@@ -1460,11 +1530,6 @@ export class Runner {
14601530 requestData : protocol . KvRequestData ,
14611531 ) : Promise < any > {
14621532 return new Promise ( ( resolve , reject ) => {
1463- if ( this . #shutdown) {
1464- reject ( new Error ( "Runner is shut down" ) ) ;
1465- return ;
1466- }
1467-
14681533 const requestId = this . #nextKvRequestId++ ;
14691534 const isConnected =
14701535 this . #pegboardWebSocket &&
@@ -1541,14 +1606,7 @@ export class Runner {
15411606 : false ;
15421607 }
15431608
1544- __sendToServer ( message : protocol . ToServer , allowShutdown : boolean = false ) {
1545- if ( ! allowShutdown && this . #shutdown) {
1546- this . log ?. warn ( {
1547- msg : "Runner is shut down, cannot send message to server" ,
1548- } ) ;
1549- return ;
1550- }
1551-
1609+ __sendToServer ( message : protocol . ToServer ) {
15521610 this . log ?. debug ( {
15531611 msg : "sending runner message" ,
15541612 data : stringifyToServer ( message ) ,
0 commit comments