Skip to content

Commit 4f192f3

Browse files
committed
fix(rivetkit): fix graceful runner shutdown
1 parent 34ee3a4 commit 4f192f3

File tree

3 files changed

+145
-64
lines changed

3 files changed

+145
-64
lines changed

engine/sdks/typescript/runner/src/mod.ts

Lines changed: 117 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import * as protocol from "@rivetkit/engine-runner-protocol";
22
import type { Logger } from "pino";
33
import type WebSocket from "ws";
4+
import { type ActorConfig, RunnerActor } from "./actor";
45
import { logger, setLogger } from "./log.js";
56
import { stringifyToClient, stringifyToServer } from "./stringify";
67
import { type HibernatingWebSocketMetadata, Tunnel } from "./tunnel";
@@ -10,7 +11,6 @@ import {
1011
unreachable,
1112
} from "./utils";
1213
import { importWebSocket } from "./websocket.js";
13-
import { RunnerActor, type ActorConfig } from "./actor";
1414

1515
export type { HibernatingWebSocketMetadata };
1616
export * as tunnelId from "./tunnel-id";
@@ -22,7 +22,7 @@ const RUNNER_PING_INTERVAL = 3_000;
2222

2323
/** Warn once the backlog significantly exceeds the server's ack batch size. */
2424
const EVENT_BACKLOG_WARN_THRESHOLD = 10_000;
25-
const SIGNAL_HANDLERS: (() => void)[] = [];
25+
const SIGNAL_HANDLERS: (() => void | Promise<void>)[] = [];
2626

2727
export interface RunnerConfig {
2828
logger?: Logger;
@@ -202,6 +202,7 @@ export class Runner {
202202
#nextEventIdx: bigint = 0n;
203203
#started: boolean = false;
204204
#shutdown: boolean = false;
205+
#shuttingDown: boolean = false;
205206
#reconnectAttempt: number = 0;
206207
#reconnectTimeout?: NodeJS.Timeout;
207208

@@ -260,13 +261,6 @@ export class Runner {
260261

261262
// MARK: Manage actors
262263
sleepActor(actorId: string, generation?: number) {
263-
if (this.#shutdown) {
264-
this.log?.warn({
265-
msg: "runner is shut down, cannot sleep actor",
266-
});
267-
return;
268-
}
269-
270264
const actor = this.getActor(actorId, generation);
271265
if (!actor) return;
272266

@@ -420,39 +414,50 @@ export class Runner {
420414

421415
if (!this.#config.noAutoShutdown) {
422416
if (!SIGNAL_HANDLERS.length) {
423-
process.on("SIGTERM", () => {
417+
process.on("SIGTERM", async () => {
424418
this.log?.debug("received SIGTERM");
425419

426420
for (const handler of SIGNAL_HANDLERS) {
427-
handler();
421+
await handler();
428422
}
429423

430-
process.exit(0);
424+
// TODO: Add back
425+
// process.exit(0);
431426
});
432-
process.on("SIGINT", () => {
427+
process.on("SIGINT", async () => {
433428
this.log?.debug("received SIGINT");
434429

435430
for (const handler of SIGNAL_HANDLERS) {
436-
handler();
431+
await handler();
437432
}
438433

439-
process.exit(0);
434+
// TODO: Add back
435+
// process.exit(0);
440436
});
441437

442438
this.log?.debug({
443439
msg: "added SIGTERM listeners",
444440
});
445441
}
446442

447-
SIGNAL_HANDLERS.push(() => {
443+
SIGNAL_HANDLERS.push(async () => {
448444
const weak = new WeakRef(this);
449-
weak.deref()?.shutdown(false, false);
445+
await weak.deref()?.shutdown(false, false);
450446
});
451447
}
452448
}
453449

454450
// MARK: Shutdown
455451
async shutdown(immediate: boolean, exit: boolean = false) {
452+
// Prevent concurrent shutdowns
453+
if (this.#shuttingDown) {
454+
this.log?.debug({
455+
msg: "shutdown already in progress, ignoring",
456+
});
457+
return;
458+
}
459+
this.#shuttingDown = true;
460+
456461
this.log?.info({
457462
msg: "starting shutdown",
458463
immediate,
@@ -515,8 +520,10 @@ export class Runner {
515520
readyState: pegboardWebSocket.readyState,
516521
});
517522

518-
// NOTE: We don't use #sendToServer here because that function checks if the runner is
519-
// shut down
523+
// Start stopping
524+
//
525+
// The runner workflow will send StopActor commands for all
526+
// actors
520527
this.__sendToServer({
521528
tag: "ToServerStopping",
522529
val: null,
@@ -536,7 +543,8 @@ export class Runner {
536543
});
537544
});
538545

539-
// TODO: Wait for all actors to stop before closing ws
546+
// Wait for all actors to stop before closing ws
547+
await this.#waitForActorsToStop(pegboardWebSocket);
540548

541549
this.log?.info({
542550
msg: "closing WebSocket",
@@ -571,9 +579,96 @@ export class Runner {
571579
this.#tunnel = undefined;
572580
}
573581

582+
this.#config.onShutdown();
583+
574584
if (exit) process.exit(0);
585+
}
575586

576-
this.#config.onShutdown();
587+
/**
588+
* Wait for all actors to stop before proceeding with shutdown.
589+
*
590+
* This method polls every 100ms to check if all actors have been stopped.
591+
*
592+
* It will resolve early if:
593+
* - All actors are stopped
594+
* - The WebSocket connection is closed
595+
* - The shutdown timeout is reached (120 seconds)
596+
*/
597+
async #waitForActorsToStop(ws: WebSocket): Promise<void> {
598+
const shutdownTimeout = 120_000; // 120 seconds
599+
const shutdownCheckInterval = 100; // Check every 100ms
600+
const progressLogInterval = 5_000; // Log progress every 5 seconds
601+
const shutdownStartTs = Date.now();
602+
let lastProgressLogTs = 0; // Ensure first log happens immediately
603+
604+
return new Promise<void>((resolve) => {
605+
const checkActors = () => {
606+
const now = Date.now();
607+
const elapsed = now - shutdownStartTs;
608+
const wsIsClosed = ws.readyState === 2 || ws.readyState === 3;
609+
610+
if (this.#actors.size === 0) {
611+
this.log?.info({
612+
msg: "all actors stopped",
613+
elapsed,
614+
});
615+
return true;
616+
} else if (wsIsClosed) {
617+
this.log?.warn({
618+
msg: "websocket closed before all actors stopped",
619+
remainingActors: this.#actors.size,
620+
elapsed,
621+
});
622+
return true;
623+
} else if (elapsed >= shutdownTimeout) {
624+
this.log?.warn({
625+
msg: "shutdown timeout reached, forcing close",
626+
remainingActors: this.#actors.size,
627+
elapsed,
628+
});
629+
return true;
630+
} else {
631+
// Log progress every 5 seconds
632+
if (now - lastProgressLogTs >= progressLogInterval) {
633+
this.log?.info({
634+
msg: "waiting for actors to stop",
635+
remainingActors: this.#actors.size,
636+
elapsed,
637+
});
638+
lastProgressLogTs = now;
639+
}
640+
return false;
641+
}
642+
};
643+
644+
// Check immediately first
645+
if (checkActors()) {
646+
this.log?.debug({
647+
msg: "actors check completed immediately",
648+
});
649+
resolve();
650+
return;
651+
}
652+
653+
this.log?.debug({
654+
msg: "starting actor wait interval",
655+
checkInterval: shutdownCheckInterval,
656+
});
657+
658+
const interval = setInterval(() => {
659+
this.log?.debug({
660+
msg: "actor wait interval tick",
661+
actorCount: this.#actors.size,
662+
});
663+
if (checkActors()) {
664+
this.log?.debug({
665+
msg: "actors check completed, clearing interval",
666+
});
667+
clearInterval(interval);
668+
resolve();
669+
}
670+
}, shutdownCheckInterval);
671+
});
577672
}
578673

579674
// MARK: Networking
@@ -1014,13 +1109,6 @@ export class Runner {
10141109
generation: number,
10151110
intentType: "sleep" | "stop",
10161111
) {
1017-
if (this.#shutdown) {
1018-
console.trace("send actor intent", actorId, intentType);
1019-
this.log?.warn({
1020-
msg: "Runner is shut down, cannot send actor intent",
1021-
});
1022-
return;
1023-
}
10241112
let actorIntent: protocol.ActorIntent;
10251113

10261114
if (intentType === "sleep") {
@@ -1062,12 +1150,6 @@ export class Runner {
10621150
generation: number,
10631151
stateType: "running" | "stopped",
10641152
) {
1065-
if (this.#shutdown) {
1066-
this.log?.warn({
1067-
msg: "Runner is shut down, cannot send actor state update",
1068-
});
1069-
return;
1070-
}
10711153
let actorState: protocol.ActorState;
10721154

10731155
if (stateType === "running") {
@@ -1108,13 +1190,6 @@ export class Runner {
11081190
}
11091191

11101192
#sendCommandAcknowledgment() {
1111-
if (this.#shutdown) {
1112-
this.log?.warn({
1113-
msg: "Runner is shut down, cannot send command acknowledgment",
1114-
});
1115-
return;
1116-
}
1117-
11181193
if (this.#lastCommandIdx < 0) {
11191194
// No commands received yet, nothing to acknowledge
11201195
return;
@@ -1423,11 +1498,6 @@ export class Runner {
14231498
const actor = this.getActor(actorId, generation);
14241499
if (!actor) return;
14251500

1426-
if (this.#shutdown) {
1427-
console.warn("Runner is shut down, cannot set alarm");
1428-
return;
1429-
}
1430-
14311501
const alarmEvent: protocol.EventActorSetAlarm = {
14321502
actorId,
14331503
generation: actor.generation,
@@ -1460,11 +1530,6 @@ export class Runner {
14601530
requestData: protocol.KvRequestData,
14611531
): Promise<any> {
14621532
return new Promise((resolve, reject) => {
1463-
if (this.#shutdown) {
1464-
reject(new Error("Runner is shut down"));
1465-
return;
1466-
}
1467-
14681533
const requestId = this.#nextKvRequestId++;
14691534
const isConnected =
14701535
this.#pegboardWebSocket &&
@@ -1541,14 +1606,7 @@ export class Runner {
15411606
: false;
15421607
}
15431608

1544-
__sendToServer(message: protocol.ToServer, allowShutdown: boolean = false) {
1545-
if (!allowShutdown && this.#shutdown) {
1546-
this.log?.warn({
1547-
msg: "Runner is shut down, cannot send message to server",
1548-
});
1549-
return;
1550-
}
1551-
1609+
__sendToServer(message: protocol.ToServer) {
15521610
this.log?.debug({
15531611
msg: "sending runner message",
15541612
data: stringifyToServer(message),

examples/counter/scripts/connect.ts

Lines changed: 0 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,12 @@ export class ActorConnRaw {
197197
resolve,
198198
reject,
199199
});
200+
logger().debug({
201+
msg: "added action to in-flight map",
202+
actionId,
203+
actionName: opts.name,
204+
inFlightCount: this.#actionsInFlight.size,
205+
});
200206

201207
this.#sendMessage({
202208
body: {
@@ -460,9 +466,11 @@ enc
460466
} else if (response.body.tag === "ActionResponse") {
461467
// Action response OK
462468
const { id: actionId } = response.body.val;
463-
logger().trace({
469+
logger().debug({
464470
msg: "received action response",
465-
actionId,
471+
actionId: Number(actionId),
472+
inFlightCount: this.#actionsInFlight.size,
473+
inFlightIds: Array.from(this.#actionsInFlight.keys()),
466474
});
467475

468476
const inFlight = this.#takeActionInFlight(Number(actionId));
@@ -561,9 +569,27 @@ enc
561569
#takeActionInFlight(id: number): ActionInFlight {
562570
const inFlight = this.#actionsInFlight.get(id);
563571
if (!inFlight) {
572+
logger().error({
573+
msg: "action not found in in-flight map",
574+
lookupId: id,
575+
inFlightCount: this.#actionsInFlight.size,
576+
inFlightIds: Array.from(this.#actionsInFlight.keys()),
577+
inFlightActions: Array.from(
578+
this.#actionsInFlight.entries(),
579+
).map(([id, action]) => ({
580+
id,
581+
name: action.name,
582+
})),
583+
});
564584
throw new errors.InternalError(`No in flight response for ${id}`);
565585
}
566586
this.#actionsInFlight.delete(id);
587+
logger().debug({
588+
msg: "removed action from in-flight map",
589+
actionId: id,
590+
actionName: inFlight.name,
591+
inFlightCount: this.#actionsInFlight.size,
592+
});
567593
return inFlight;
568594
}
569595

0 commit comments

Comments
 (0)