From 7776183c1c40b056531f2feb6dc20f6fe4dad2f2 Mon Sep 17 00:00:00 2001 From: Evan Tahler Date: Sun, 8 Feb 2026 19:27:27 -0800 Subject: [PATCH 1/2] Fix fire-and-forget async calls causing unhandled promise rejections during shutdown Add .catch() handlers to all fire-and-forget async calls in Worker and Scheduler that were previously discarding Promise return values from timer callbacks. This prevents unhandled rejections when a shared Redis connection is closed during shutdown. Fixes #1260. Co-Authored-By: Claude Opus 4.6 --- src/core/scheduler.ts | 2 +- src/core/worker.ts | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/core/scheduler.ts b/src/core/scheduler.ts index a1cb7773..b575db43 100644 --- a/src/core/scheduler.ts +++ b/src/core/scheduler.ts @@ -159,7 +159,7 @@ export class Scheduler extends EventEmitter { private async pollAgainLater() { if (this.running === true) { this.timer = setTimeout(() => { - this.poll(); + this.poll().catch((e) => this.emit("error", e)); }, this.options.timeout); } } diff --git a/src/core/worker.ts b/src/core/worker.ts index cd6c1cc9..8b6e3506 100644 --- a/src/core/worker.ts +++ b/src/core/worker.ts @@ -148,7 +148,7 @@ export class Worker extends EventEmitter { this.started = true; this.emit("start", new Date()); await this.init(); - this.poll(); + this.poll().catch((e) => this.emit("error", e)); } } @@ -159,7 +159,9 @@ export class Worker extends EventEmitter { Math.round(new Date().getTime() / 1000), ); await this.ping(); - this.pingTimer = setInterval(this.ping.bind(this), this.options.timeout); + this.pingTimer = setInterval(() => { + this.ping().catch((e) => this.emit("error", e)); + }, this.options.timeout); } async end(): Promise { @@ -399,7 +401,7 @@ export class Worker extends EventEmitter { this.job = null; if (this.options.looping) { - this.poll(); + this.poll().catch((e) => this.emit("error", e)); } } @@ -443,7 +445,7 @@ export class Worker extends EventEmitter { this.emit("pause"); await new Promise((resolve) => { this.pollTimer = setTimeout(() => { - this.poll(); + this.poll().catch((e) => this.emit("error", e)); resolve(null); }, this.options.timeout); }); From 4dc8950864fe0b02cc0f1d94b389a90a0713623f Mon Sep 17 00:00:00 2001 From: Evan Tahler Date: Sun, 8 Feb 2026 19:41:13 -0800 Subject: [PATCH 2/2] Add tests for fire-and-forget error handling during shutdown Verify that Worker and Scheduler emit errors via the "error" event instead of causing unhandled promise rejections when Redis commands fail during ping/poll timer callbacks. Co-Authored-By: Claude Opus 4.6 --- __tests__/core/scheduler.ts | 30 ++++++++++++++++++++++++++++++ __tests__/core/worker.ts | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/__tests__/core/scheduler.ts b/__tests__/core/scheduler.ts index 0d6647d1..c8efac44 100644 --- a/__tests__/core/scheduler.ts +++ b/__tests__/core/scheduler.ts @@ -87,6 +87,36 @@ describe("scheduler", () => { await queue.end(); }); + test("emits error instead of unhandled rejection when poll fails", async () => { + const testScheduler = new Scheduler({ + connection: specHelper.cleanConnectionDetails(), + timeout: specHelper.timeout, + }); + await testScheduler.connect(); + await testScheduler.start(); + + const errorPromise = new Promise((resolve) => { + testScheduler.on("error", (err) => resolve(err)); + }); + + // stub redis.set to simulate a closed connection during tryForLeader + const originalSet = testScheduler.connection.redis.set.bind( + testScheduler.connection.redis, + ); + const redisError = new Error("Connection is closed"); + testScheduler.connection.redis.set = async () => { + throw redisError; + }; + + // wait for pollAgainLater to fire poll() which will fail + const emittedError = await errorPromise; + expect(emittedError).toBe(redisError); + + // restore and clean up + testScheduler.connection.redis.set = originalSet; + await testScheduler.end(); + }); + test("queues can see who the leader is", async () => { await scheduler.poll(); const leader = await queue.leader(); diff --git a/__tests__/core/worker.ts b/__tests__/core/worker.ts index c63efd62..5f05fd3a 100644 --- a/__tests__/core/worker.ts +++ b/__tests__/core/worker.ts @@ -358,3 +358,39 @@ describe("worker", () => { }); }); }); + +describe("worker error handling during shutdown", () => { + test("emits error instead of unhandled rejection when ping fails", async () => { + const testWorker = new Worker( + { + connection: specHelper.cleanConnectionDetails(), + timeout: specHelper.timeout, + queues: ["shutdown_test_queue"], + }, + jobs, + ); + await testWorker.connect(); + await testWorker.start(); + + const errorPromise = new Promise((resolve) => { + testWorker.on("error", (err) => resolve(err)); + }); + + // stub redis.set to simulate a closed connection during ping + const originalSet = testWorker.connection.redis.set.bind( + testWorker.connection.redis, + ); + const redisError = new Error("Connection is closed"); + testWorker.connection.redis.set = async () => { + throw redisError; + }; + + // wait for the ping interval to fire and fail + const emittedError = await errorPromise; + expect(emittedError).toBe(redisError); + + // restore and clean up + testWorker.connection.redis.set = originalSet; + await testWorker.end(); + }); +});