diff --git a/__tests__/core/scheduler.ts b/__tests__/core/scheduler.ts index 0d6647d1..c8efac44 100644 --- a/__tests__/core/scheduler.ts +++ b/__tests__/core/scheduler.ts @@ -87,6 +87,36 @@ describe("scheduler", () => { await queue.end(); }); + test("emits error instead of unhandled rejection when poll fails", async () => { + const testScheduler = new Scheduler({ + connection: specHelper.cleanConnectionDetails(), + timeout: specHelper.timeout, + }); + await testScheduler.connect(); + await testScheduler.start(); + + const errorPromise = new Promise((resolve) => { + testScheduler.on("error", (err) => resolve(err)); + }); + + // stub redis.set to simulate a closed connection during tryForLeader + const originalSet = testScheduler.connection.redis.set.bind( + testScheduler.connection.redis, + ); + const redisError = new Error("Connection is closed"); + testScheduler.connection.redis.set = async () => { + throw redisError; + }; + + // wait for pollAgainLater to fire poll() which will fail + const emittedError = await errorPromise; + expect(emittedError).toBe(redisError); + + // restore and clean up + testScheduler.connection.redis.set = originalSet; + await testScheduler.end(); + }); + test("queues can see who the leader is", async () => { await scheduler.poll(); const leader = await queue.leader(); diff --git a/__tests__/core/worker.ts b/__tests__/core/worker.ts index c63efd62..5f05fd3a 100644 --- a/__tests__/core/worker.ts +++ b/__tests__/core/worker.ts @@ -358,3 +358,39 @@ describe("worker", () => { }); }); }); + +describe("worker error handling during shutdown", () => { + test("emits error instead of unhandled rejection when ping fails", async () => { + const testWorker = new Worker( + { + connection: specHelper.cleanConnectionDetails(), + timeout: specHelper.timeout, + queues: ["shutdown_test_queue"], + }, + jobs, + ); + await testWorker.connect(); + await testWorker.start(); + + const errorPromise = new Promise((resolve) => { + testWorker.on("error", (err) => resolve(err)); + }); + + // stub redis.set to simulate a closed connection during ping + const originalSet = testWorker.connection.redis.set.bind( + testWorker.connection.redis, + ); + const redisError = new Error("Connection is closed"); + testWorker.connection.redis.set = async () => { + throw redisError; + }; + + // wait for the ping interval to fire and fail + const emittedError = await errorPromise; + expect(emittedError).toBe(redisError); + + // restore and clean up + testWorker.connection.redis.set = originalSet; + await testWorker.end(); + }); +}); diff --git a/src/core/scheduler.ts b/src/core/scheduler.ts index a1cb7773..b575db43 100644 --- a/src/core/scheduler.ts +++ b/src/core/scheduler.ts @@ -159,7 +159,7 @@ export class Scheduler extends EventEmitter { private async pollAgainLater() { if (this.running === true) { this.timer = setTimeout(() => { - this.poll(); + this.poll().catch((e) => this.emit("error", e)); }, this.options.timeout); } } diff --git a/src/core/worker.ts b/src/core/worker.ts index cd6c1cc9..8b6e3506 100644 --- a/src/core/worker.ts +++ b/src/core/worker.ts @@ -148,7 +148,7 @@ export class Worker extends EventEmitter { this.started = true; this.emit("start", new Date()); await this.init(); - this.poll(); + this.poll().catch((e) => this.emit("error", e)); } } @@ -159,7 +159,9 @@ export class Worker extends EventEmitter { Math.round(new Date().getTime() / 1000), ); await this.ping(); - this.pingTimer = setInterval(this.ping.bind(this), this.options.timeout); + this.pingTimer = setInterval(() => { + this.ping().catch((e) => this.emit("error", e)); + }, this.options.timeout); } async end(): Promise { @@ -399,7 +401,7 @@ export class Worker extends EventEmitter { this.job = null; if (this.options.looping) { - this.poll(); + this.poll().catch((e) => this.emit("error", e)); } } @@ -443,7 +445,7 @@ export class Worker extends EventEmitter { this.emit("pause"); await new Promise((resolve) => { this.pollTimer = setTimeout(() => { - this.poll(); + this.poll().catch((e) => this.emit("error", e)); resolve(null); }, this.options.timeout); });