From f8786184a808836b27ca70a2db85e6127a17662b Mon Sep 17 00:00:00 2001 From: Giovani Guizzo Date: Mon, 26 Jan 2026 20:21:40 -0300 Subject: [PATCH] fix: enhance stale job handling with JobTransitioner and improve test coverage --- .../src/routines/release-stale-jobs.test.ts | 93 ++++++++++++++++++- .../engine/src/routines/release-stale-jobs.ts | 15 ++- sidequest.jobs.js | 9 -- tests/integration/sidequest.jobs.js | 3 - 4 files changed, 100 insertions(+), 20 deletions(-) delete mode 100644 sidequest.jobs.js delete mode 100644 tests/integration/sidequest.jobs.js diff --git a/packages/engine/src/routines/release-stale-jobs.test.ts b/packages/engine/src/routines/release-stale-jobs.test.ts index 47fa1f90..1100cd3f 100644 --- a/packages/engine/src/routines/release-stale-jobs.test.ts +++ b/packages/engine/src/routines/release-stale-jobs.test.ts @@ -13,7 +13,90 @@ describe("release-stale-jobs.ts", () => { expect(updateJobSpy).not.toHaveBeenCalled(); }); - sidequestTest("should release stale jobs by setting state to waiting", async ({ backend }) => { + sidequestTest("should release stale claimed jobs by setting state to waiting", async ({ backend }) => { + const mockStaleJob = { + id: 1, + queue: "default", + state: "claimed", + script: "/path/to/script.js", + class: "TestJob", + args: [], + constructor_args: [], + attempt: 1, + max_attempts: 3, + claimed_at: new Date(Date.now() - 60000), + } as unknown as JobData; + + const staleJobsSpy = vi.spyOn(backend, "staleJobs").mockResolvedValue([mockStaleJob]); + const updateJobSpy = vi.spyOn(backend, "updateJob").mockImplementation((job) => Promise.resolve(job as JobData)); + + await releaseStaleJobs(backend, 600_000, 60_000); + + expect(staleJobsSpy).toHaveBeenCalledOnce(); + expect(updateJobSpy).toHaveBeenCalledOnce(); + + // Claimed jobs should go back to waiting without using JobTransitioner + expect(mockStaleJob.state).toBe("waiting"); + expect(updateJobSpy).toHaveBeenCalledWith(mockStaleJob); + }); + + sidequestTest("should retry stale running jobs using JobTransitioner", async ({ backend }) => { + const mockStaleJob = { + id: 2, + queue: "high", + state: "running", + script: "/path/to/another-script.js", + class: "AnotherTestJob", + args: ["arg1", "arg2"], + constructor_args: [], + attempt: 2, + max_attempts: 5, + claimed_at: new Date(Date.now() - 120000), + } as unknown as JobData; + + const staleJobsSpy = vi.spyOn(backend, "staleJobs").mockResolvedValue([mockStaleJob]); + const updateJobSpy = vi.spyOn(backend, "updateJob").mockImplementation((job) => { + return Promise.resolve(job as JobData); + }); + + await releaseStaleJobs(backend, 600_000, 60_000); + + expect(staleJobsSpy).toHaveBeenCalledOnce(); + expect(updateJobSpy).toHaveBeenCalledOnce(); + + // Running jobs should be retried via JobTransitioner, which sets state to waiting + expect(mockStaleJob.state).toBe("waiting"); + }); + + sidequestTest("should fail stale running job at max attempts", async ({ backend }) => { + const mockStaleJob = { + id: 3, + queue: "critical", + state: "running", + script: "/path/to/critical-script.js", + class: "CriticalJob", + args: [], + constructor_args: [], + attempt: 3, + max_attempts: 3, + claimed_at: new Date(Date.now() - 180000), + } as unknown as JobData; + + const staleJobsSpy = vi.spyOn(backend, "staleJobs").mockResolvedValue([mockStaleJob]); + const updateJobSpy = vi.spyOn(backend, "updateJob").mockImplementation((job) => { + return Promise.resolve(job as JobData); + }); + + await releaseStaleJobs(backend, 600_000, 60_000); + + expect(staleJobsSpy).toHaveBeenCalledOnce(); + expect(updateJobSpy).toHaveBeenCalledOnce(); + + // Job at max attempts should be marked as failed, not retried + expect(mockStaleJob.state).toBe("failed"); + }); + + sidequestTest("should handle mixed stale jobs (claimed and running)", async ({ backend }) => { const mockStaleJobs = [ { id: 1, @@ -42,18 +125,18 @@ describe("release-stale-jobs.ts", () => { ] as unknown as JobData[]; const staleJobsSpy = vi.spyOn(backend, "staleJobs").mockResolvedValue(mockStaleJobs); - const updateJobSpy = vi.spyOn(backend, "updateJob").mockImplementation((job) => Promise.resolve(job as JobData)); + const updateJobSpy = vi.spyOn(backend, "updateJob").mockImplementation((job) => { + return Promise.resolve(job as JobData); + }); await releaseStaleJobs(backend, 600_000, 60_000); expect(staleJobsSpy).toHaveBeenCalledOnce(); expect(updateJobSpy).toHaveBeenCalledTimes(2); + // Both should end up in waiting state expect(mockStaleJobs[0].state).toBe("waiting"); expect(mockStaleJobs[1].state).toBe("waiting"); - - expect(updateJobSpy).toHaveBeenNthCalledWith(1, mockStaleJobs[0]); - expect(updateJobSpy).toHaveBeenNthCalledWith(2, mockStaleJobs[1]); }); sidequestTest("should handle single stale job", async ({ backend }) => { diff --git a/packages/engine/src/routines/release-stale-jobs.ts b/packages/engine/src/routines/release-stale-jobs.ts index 7909b0b7..cc78ee30 100644 --- a/packages/engine/src/routines/release-stale-jobs.ts +++ b/packages/engine/src/routines/release-stale-jobs.ts @@ -1,6 +1,7 @@ import { Backend } from "@sidequest/backend"; -import { logger } from "@sidequest/core"; +import { logger, RetryTransition } from "@sidequest/core"; import { inspect } from "util"; +import { JobTransitioner } from "../job"; /** * Finds and releases stale jobs, making them available for processing again. @@ -16,8 +17,16 @@ export async function releaseStaleJobs(backend: Backend, maxStaleMs: number, max logger("Engine").info(`Stale jobs found, making them available to process`); logger("Engine").debug(`Stale jobs: ${inspect(staleJobs)}`); for (const jobData of staleJobs) { - jobData.state = "waiting"; - await backend.updateJob(jobData); + if (jobData.state === "running") { + // We need to use the JobTransitioner to properly handle retries and state transitions + // This fixes the issue where the release of a stale job incremented the retry count and + // did not respect the maxRetries setting. + await JobTransitioner.apply(backend, jobData, new RetryTransition("Stale job released for retry")); + } else { + // If it's "claimed", then the attempt count was not incremented, so we can just set it back to "waiting" + jobData.state = "waiting"; + await backend.updateJob(jobData); + } } } else { logger("Engine").debug(`No stale jobs found`); diff --git a/sidequest.jobs.js b/sidequest.jobs.js deleted file mode 100644 index fb064e2c..00000000 --- a/sidequest.jobs.js +++ /dev/null @@ -1,9 +0,0 @@ -import { - EnqueueFromWithinJob, - FailingJob, - RetryJob, - SuccessJob, - TimeoutJob, -} from "./tests/integration/jobs/test-jobs.js"; - -export { EnqueueFromWithinJob, FailingJob, RetryJob, SuccessJob, TimeoutJob }; diff --git a/tests/integration/sidequest.jobs.js b/tests/integration/sidequest.jobs.js deleted file mode 100644 index 68cd8c60..00000000 --- a/tests/integration/sidequest.jobs.js +++ /dev/null @@ -1,3 +0,0 @@ -import { EnqueueFromWithinJob, FailingJob, RetryJob, SuccessJob, TimeoutJob } from "./jobs/test-jobs.js"; - -export { EnqueueFromWithinJob, FailingJob, RetryJob, SuccessJob, TimeoutJob };