Skip to content
Open
30 changes: 30 additions & 0 deletions openapi3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1572,6 +1572,21 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/taskNotFoundResponse'
'409':
description: >-
task was claimed by another worker.
This occurs when multiple workers attempt to dequeue the same task simultaneously.
The client should retry the dequeue operation to get a different task.
content:
application/json:
schema:
allOf:
- $ref: '#/components/schemas/baseErrorResponse'
- type: object
properties:
code:
enum:
- TASK_STATUS_UPDATE_FAILED
'500':
description: Internal server error or invalid state transition
content:
Expand Down Expand Up @@ -1820,6 +1835,21 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/taskNotFoundResponse'
'409':
description: >-
task status was modified by another request.
This occurs when multiple workers attempt to update the same task simultaneously.
The current state of the task has changed since it was retrieved.
content:
application/json:
schema:
allOf:
- $ref: '#/components/schemas/baseErrorResponse'
- type: object
properties:
code:
enum:
- TASK_STATUS_UPDATE_FAILED
'500':
description: Internal server error
content:
Expand Down
7 changes: 6 additions & 1 deletion src/api/v1/tasks/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,11 @@ export class TaskControllerV1 {
} catch (err) {
if (err instanceof TaskNotFoundError) {
(err as HttpError).status = httpStatus.NOT_FOUND;
} else if (err instanceof TaskStatusUpdateFailedError) {
// Race condition: resource was modified by another request
(err as HttpError).status = httpStatus.CONFLICT;
} else if (badRequestErrors.some((e) => err instanceof e)) {
(err as HttpError).status = httpStatus.BAD_REQUEST;
this.logger.error({ msg: `Task status update failed: invalid status transition`, status: req.body.status, err });
}

return next(err);
Expand All @@ -134,6 +136,9 @@ export class TaskControllerV1 {
} catch (err) {
if (err instanceof TaskNotFoundError) {
(err as HttpError).status = httpStatus.NOT_FOUND;
} else if (err instanceof TaskStatusUpdateFailedError) {
// Race condition: another worker already dequeued this task
(err as HttpError).status = httpStatus.CONFLICT;
} else if (internalErrors.some((e) => err instanceof e)) {
(err as HttpError).status = httpStatus.INTERNAL_SERVER_ERROR;
}
Expand Down
1 change: 1 addition & 0 deletions src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type SuccessMessagesObj = {
export const SERVICE_NAME = readPackageJsonSync().name ?? 'unknown_service';
export const DEFAULT_SERVER_PORT = 80;
export const DB_CONNECTION_TIMEOUT = 5000;
export const TX_TIMEOUT_MS = 15000;
export const NODE_VERSION = process.versions.node;

export const IGNORED_OUTGOING_TRACE_ROUTES = [/^.*\/v1\/metrics.*$/];
Expand Down
8 changes: 7 additions & 1 deletion src/db/createConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { hostname } from 'node:os';
import { commonDbFullV1Type } from '@map-colonies/schemas';
import type { PoolConfig } from 'pg';
import { PrismaPg } from '@prisma/adapter-pg';
import { TX_TIMEOUT_MS } from '@src/common/constants';
import { PrismaClient } from '../db/prisma/generated/client';

interface SchemaExistsResult {
Expand Down Expand Up @@ -37,7 +38,12 @@ export const createConnectionOptions = (dbConfig: DbConfig): PoolConfig => {
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
export function createPrismaClient(poolConfig: PoolConfig, schema: string) {
const adapter = new PrismaPg(poolConfig, { schema });
const prisma = new PrismaClient({ adapter }).$extends({
const prisma = new PrismaClient({
adapter,
transactionOptions: {
timeout: TX_TIMEOUT_MS,
},
}).$extends({
query: {
// eslint-disable-next-line @typescript-eslint/promise-function-async
$allOperations({ args, query }) {
Expand Down
24 changes: 24 additions & 0 deletions src/openapi.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1839,6 +1839,18 @@ export interface operations {
'application/json': components['schemas']['taskNotFoundResponse'];
};
};
/** @description Race condition detected: task was claimed by another worker. This occurs when multiple workers attempt to dequeue the same task simultaneously. The client should retry the dequeue operation to get a different task. */
409: {
headers: {
[name: string]: unknown;
};
content: {
'application/json': components['schemas']['baseErrorResponse'] & {
/** @enum {unknown} */
code?: 'TASK_STATUS_UPDATE_FAILED';
};
};
};
/** @description Internal server error or invalid state transition */
500: {
headers: {
Expand Down Expand Up @@ -2065,6 +2077,18 @@ export interface operations {
'application/json': components['schemas']['taskNotFoundResponse'];
};
};
/** @description Race condition detected: task status was modified by another request. This occurs when multiple workers attempt to update the same task simultaneously. The current state of the task has changed since it was retrieved. */
409: {
headers: {
[name: string]: unknown;
};
content: {
'application/json': components['schemas']['baseErrorResponse'] & {
/** @enum {unknown} */
code?: 'TASK_STATUS_UPDATE_FAILED';
};
};
};
/** @description Internal server error */
500: {
headers: {
Expand Down
43 changes: 31 additions & 12 deletions src/stages/models/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { INFRA_CONVENTIONS } from '@map-colonies/semantic-conventions';
import type { PrismaClient } from '@prismaClient';
import { JobOperationStatus, Prisma, StageOperationStatus } from '@prismaClient';
import { JobManager } from '@src/jobs/models/manager';
import { SERVICES, XSTATE_DONE_STATE } from '@common/constants';
import { SERVICES, TX_TIMEOUT_MS, XSTATE_DONE_STATE } from '@common/constants';
import { resolveTraceContext } from '@src/common/utils/tracingHelpers';
import { jobStateMachine } from '@src/jobs/models/jobStateMachine';
import { illegalStatusTransitionErrorMessage, prismaKnownErrors } from '@src/common/errors';
Expand Down Expand Up @@ -242,9 +242,12 @@ export class StageManager {
});

if (!tx) {
return this.prisma.$transaction(async (newTx) => {
await this.executeUpdateStatus(stageId, status, newTx);
});
return this.prisma.$transaction(
async (newTx) => {
await this.executeUpdateStatus(stageId, status, newTx);
},
{ timeout: TX_TIMEOUT_MS }
);
}

await this.executeUpdateStatus(stageId, status, tx);
Expand Down Expand Up @@ -304,24 +307,40 @@ export class StageManager {

// update stage status if it was initialized by first task
// and the stage is not already in progress
// Race condition protection: Only transition if stage is PENDING
// Multiple concurrent tasks may trigger this check simultaneously
if (updatedSummary.inProgress > 0 && stage.status === StageOperationStatus.PENDING) {
await this.updateStatus(stageId, StageOperationStatus.IN_PROGRESS, tx);
trace.getActiveSpan()?.addEvent('Stage set to IN_PROGRESS because first task started', { stageId });
}
}

@withSpanAsyncV4
private async executeUpdateStatus(stageId: string, status: StageOperationStatus, tx: PrismaTransaction): Promise<void> {
private async executeUpdateStatus(stageId: string, targetStatus: StageOperationStatus, tx: PrismaTransaction): Promise<void> {
const stage = await this.getStageEntityById(stageId, { includeJob: true, tx });

if (!stage) {
throw new StageNotFoundError(stagesErrorMessages.stageNotFound);
}

// Idempotent status update: if already in target status, no-op
// This prevents errors during race conditions where multiple workers
// try to set the same status (e.g., multiple tasks setting stage to IN_PROGRESS)
/* v8 ignore next 4 -- @preserve */
if (stage.status === targetStatus) {
this.logger.debug({
msg: 'Stage already in target status, skipping transition',
stageId,
targetStatus,
});
return;
}

//#region validate status transition rules
const previousStageOrder = stage.order - 1;

// can't move to PENDING if previous stage is not COMPLETED
if (status === StageOperationStatus.PENDING && previousStageOrder > 0) {
if (targetStatus === StageOperationStatus.PENDING && previousStageOrder > 0) {
const previousStage = await tx.stage.findFirst({
where: {
jobId: stage.jobId,
Expand All @@ -334,12 +353,12 @@ export class StageManager {
}
}

const nextStatusChange = OperationStatusMapper[status];
const nextStatusChange = OperationStatusMapper[targetStatus];
const updateActor = createActor(stageStateMachine, { snapshot: stage.xstate }).start();
const isValidStatus = updateActor.getSnapshot().can({ type: nextStatusChange });

if (!isValidStatus) {
throw new IllegalStageStatusTransitionError(illegalStatusTransitionErrorMessage(stage.status, status));
throw new IllegalStageStatusTransitionError(illegalStatusTransitionErrorMessage(stage.status, targetStatus));
}
//#endregion
updateActor.send({ type: nextStatusChange });
Expand All @@ -350,7 +369,7 @@ export class StageManager {
id: stageId,
},
data: {
status,
status: targetStatus,
xstate: newPersistedSnapshot,
},
};
Expand All @@ -360,7 +379,7 @@ export class StageManager {
//#region update related entities
// Update job completion when a stage is completed
// If the stage is marked as completed, and there is a next stage in the job, update the next stage status to PENDING
if (status === StageOperationStatus.COMPLETED) {
if (targetStatus === StageOperationStatus.COMPLETED) {
const nextStageOrder = stage.order + 1;
const nextStage = await tx.stage.findFirst({
where: {
Expand All @@ -386,11 +405,11 @@ export class StageManager {
}
}

if (status === StageOperationStatus.IN_PROGRESS && stage.job.status === JobOperationStatus.PENDING) {
if (targetStatus === StageOperationStatus.IN_PROGRESS && stage.job.status === JobOperationStatus.PENDING) {
// Update job status to IN_PROGRESS
await this.jobManager.updateStatus(stage.job.id, JobOperationStatus.IN_PROGRESS, tx);
trace.getActiveSpan()?.addEvent('Job status set to IN_PROGRESS because first stage is being processed', { jobId: stage.jobId });
} else if (status === StageOperationStatus.FAILED) {
} else if (targetStatus === StageOperationStatus.FAILED) {
// Update job status to FAILED
await this.jobManager.updateStatus(stage.jobId, JobOperationStatus.FAILED, tx);
trace.getActiveSpan()?.addEvent('Job set to FAILED because its stage failed', { jobId: stage.jobId });
Expand Down
53 changes: 53 additions & 0 deletions src/tasks/DAL/taskRepository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { inject, Lifecycle, scoped } from 'tsyringe';
import { type Logger } from '@map-colonies/js-logger';
import { PrismaClient, Task } from '@prismaClient';
import { SERVICES } from '@src/common/constants';
import type { PrismaTransaction } from '@src/db/types';
import type { TaskPrismaObject } from '../models/models';

@scoped(Lifecycle.ContainerScoped)
export class TaskRepository {
public constructor(
@inject(SERVICES.LOGGER) private readonly logger: Logger,
@inject(SERVICES.PRISMA) private readonly prisma: PrismaClient
) {}

/**
* Finds and locks the next available high-priority task for processing.
* * Uses a row-level lock with `SKIP LOCKED` to allow multiple concurrent
* workers to claim different tasks without blocking each other.
* * @param stageType - The stage category to pull tasks from.
* @param tx - The current database transaction.
* @returns The locked task or null if no eligible tasks are found.
*/
public async findAndLockTaskForDequeue(stageType: string, tx: PrismaTransaction): Promise<TaskPrismaObject | null> {
this.logger.debug({ msg: 'Finding task for dequeue', stageType });

const tasks = await tx.$queryRaw<Task[]>`
SELECT t.*
FROM "job_manager"."task" t
INNER JOIN "job_manager"."stage" s ON t."stage_id" = s.id
INNER JOIN "job_manager"."job" j ON s."job_id" = j.id
WHERE s.type = ${stageType}
AND t.status IN ('Pending', 'Retried')
AND s.status IN ('Pending', 'In-Progress')
AND j.status IN ('Pending', 'In-Progress')
ORDER BY j.priority ASC
LIMIT 1
FOR UPDATE OF t SKIP LOCKED
`;

if (tasks.length === 0) {
return null;
}

// Note: $queryRaw returns raw database values, not Prisma-mapped values
// We need to re-fetch the task using Prisma to get properly mapped enum values
const rawTask = tasks[0]!;
const task = await tx.task.findUnique({
where: { id: rawTask.id },
});
Comment on lines +44 to +49
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its bad practice to query again as it increases the load on the database. prisma recommends using TypedSql in their docs.
https://www.prisma.io/docs/orm/prisma-client/using-raw-sql/typedsql


return task;
}
}
Loading
Loading