Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions __tests__/core/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,36 @@ describe("scheduler", () => {
await queue.end();
});

test("emits error instead of unhandled rejection when poll fails", async () => {
const testScheduler = new Scheduler({
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
});
await testScheduler.connect();
await testScheduler.start();

const errorPromise = new Promise<Error>((resolve) => {
testScheduler.on("error", (err) => resolve(err));
});

// stub redis.set to simulate a closed connection during tryForLeader
const originalSet = testScheduler.connection.redis.set.bind(
testScheduler.connection.redis,
);
const redisError = new Error("Connection is closed");
testScheduler.connection.redis.set = async () => {
throw redisError;
};

// wait for pollAgainLater to fire poll() which will fail
const emittedError = await errorPromise;
expect(emittedError).toBe(redisError);

// restore and clean up
testScheduler.connection.redis.set = originalSet;
await testScheduler.end();
});

test("queues can see who the leader is", async () => {
await scheduler.poll();
const leader = await queue.leader();
Expand Down
36 changes: 36 additions & 0 deletions __tests__/core/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,3 +358,39 @@ describe("worker", () => {
});
});
});

describe("worker error handling during shutdown", () => {
test("emits error instead of unhandled rejection when ping fails", async () => {
const testWorker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: ["shutdown_test_queue"],
},
jobs,
);
await testWorker.connect();
await testWorker.start();

const errorPromise = new Promise<Error>((resolve) => {
testWorker.on("error", (err) => resolve(err));
});

// stub redis.set to simulate a closed connection during ping
const originalSet = testWorker.connection.redis.set.bind(
testWorker.connection.redis,
);
const redisError = new Error("Connection is closed");
testWorker.connection.redis.set = async () => {
throw redisError;
};

// wait for the ping interval to fire and fail
const emittedError = await errorPromise;
expect(emittedError).toBe(redisError);

// restore and clean up
testWorker.connection.redis.set = originalSet;
await testWorker.end();
});
});
2 changes: 1 addition & 1 deletion src/core/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ export class Scheduler extends EventEmitter {
private async pollAgainLater() {
if (this.running === true) {
this.timer = setTimeout(() => {
this.poll();
this.poll().catch((e) => this.emit("error", e));
}, this.options.timeout);
}
}
Expand Down
10 changes: 6 additions & 4 deletions src/core/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ export class Worker extends EventEmitter {
this.started = true;
this.emit("start", new Date());
await this.init();
this.poll();
this.poll().catch((e) => this.emit("error", e));
}
}

Expand All @@ -159,7 +159,9 @@ export class Worker extends EventEmitter {
Math.round(new Date().getTime() / 1000),
);
await this.ping();
this.pingTimer = setInterval(this.ping.bind(this), this.options.timeout);
this.pingTimer = setInterval(() => {
this.ping().catch((e) => this.emit("error", e));
}, this.options.timeout);
}

async end(): Promise<void> {
Expand Down Expand Up @@ -399,7 +401,7 @@ export class Worker extends EventEmitter {
this.job = null;

if (this.options.looping) {
this.poll();
this.poll().catch((e) => this.emit("error", e));
}
}

Expand Down Expand Up @@ -443,7 +445,7 @@ export class Worker extends EventEmitter {
this.emit("pause");
await new Promise((resolve) => {
this.pollTimer = setTimeout(() => {
this.poll();
this.poll().catch((e) => this.emit("error", e));
resolve(null);
}, this.options.timeout);
});
Expand Down