Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions packages/common/src/services/decision/createInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { User } from '@op/supabase/lib';
import { CommonError, NotFoundError, UnauthorizedError } from '../../utils';
import { assertUserByAuthId } from '../assert';
import { generateUniqueProfileSlug } from '../profile/utils';
import { createTransitionsForProcess } from './createTransitionsForProcess';
import type { InstanceData, ProcessSchema } from './types';

export interface CreateInstanceInput {
Expand Down Expand Up @@ -93,9 +92,8 @@ export const createInstance = async ({
return newInstance;
});

// Create transitions for the process phases
// This is critical - if transitions can't be created, the process won't auto-advance
await createTransitionsForProcess({ processInstance: instance });
// Note: Transitions are created when the instance is published (status changes from DRAFT to PUBLISHED)
// Draft instances don't need transitions since they won't be processed

return instance;
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,28 @@ import { decisionProcessTransitions } from '@op/db/schema';
import type { ProcessInstance } from '@op/db/schema';

import { CommonError } from '../../utils';
import type { InstanceData, PhaseConfiguration } from './types';
import type { DecisionInstanceData } from './schemas/instanceData';

export interface CreateTransitionsInput {
/**
* Creates scheduled transition records for phases with date-based advancement.
* Each transition fires when the current phase's end date arrives.
*/
export async function createTransitionsForProcess({
processInstance,
}: {
processInstance: ProcessInstance;
}

export interface CreateTransitionsResult {
}): Promise<{
transitions: Array<{
id: string;
fromStateId: string | null;
toStateId: string;
scheduledDate: Date;
}>;
}

/**
* Creates transition records for all phases in a process instance.
* Each transition represents the end of one phase and the start of the next.
*/
export async function createTransitionsForProcess({
processInstance,
}: CreateTransitionsInput): Promise<CreateTransitionsResult> {
}> {
try {
const instanceData = processInstance.instanceData as InstanceData;
// Type assertion: instanceData is `unknown` in DB to support legacy formats for viewing,
// but this function is only called for new DecisionInstanceData processes
const instanceData = processInstance.instanceData as DecisionInstanceData;
const phases = instanceData.phases;

if (!phases || phases.length === 0) {
Expand All @@ -35,39 +33,57 @@ export async function createTransitionsForProcess({
);
}

const transitionsToCreate = phases.map(
(phase: PhaseConfiguration, index: number) => {
const fromStateId = index > 0 ? phases[index - 1]?.phaseId : null;
const toStateId = phase.phaseId;
// For phases like 'results' that only have a start date (no end), use the start date
const scheduledDate = phase.startDate;
// Create transitions for phases that use date-based advancement
// A transition is created FROM a phase (when it ends) TO the next phase
const transitionsToCreate: Array<{
processInstanceId: string;
fromStateId: string;
toStateId: string;
scheduledDate: string;
}> = [];

if (!scheduledDate) {
throw new CommonError(
`Phase ${index + 1} (${toStateId}) must have either a scheduled end date or start date`,
);
}
for (let index = 0; index < phases.length - 1; index++) {
const currentPhase = phases[index]!;
const nextPhase = phases[index + 1]!;

return {
processInstanceId: processInstance.id,
fromStateId,
toStateId,
scheduledDate: new Date(scheduledDate).toISOString(),
};
},
);
// Only create transition if current phase uses date-based advancement
if (currentPhase.rules?.advancement?.method !== 'date') {
continue;
}

// Schedule transition when the current phase ends
const scheduledDate = currentPhase.endDate;

if (!scheduledDate) {
throw new CommonError(
`Phase "${currentPhase.phaseId}" must have an end date for date-based advancement (instance: ${processInstance.id})`,
);
}

// DB columns are named fromStateId/toStateId but store phase IDs
transitionsToCreate.push({
processInstanceId: processInstance.id,
fromStateId: currentPhase.phaseId,
toStateId: nextPhase.phaseId,
scheduledDate: new Date(scheduledDate).toISOString(),
});
}

if (transitionsToCreate.length === 0) {
return { transitions: [] };
}

const createdTransitions = await db
.insert(decisionProcessTransitions)
.values(transitionsToCreate)
.returning();

return {
transitions: createdTransitions.map((t) => ({
id: t.id,
fromStateId: t.fromStateId,
toStateId: t.toStateId,
scheduledDate: new Date(t.scheduledDate),
transitions: createdTransitions.map((transition) => ({
id: transition.id,
fromStateId: transition.fromStateId,
toStateId: transition.toStateId,
scheduledDate: new Date(transition.scheduledDate),
})),
};
} catch (error) {
Expand Down
6 changes: 6 additions & 0 deletions packages/common/src/services/decision/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,9 @@ export type { VoteData } from '@op/db/schema';

// Types
export * from './types';
export type {
DecisionSchemaDefinition,
PhaseDefinition,
PhaseRules,
ProcessConfig,
} from './schemas/types';
93 changes: 62 additions & 31 deletions packages/common/src/services/decision/transitionMonitor.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
import { db, eq, lte, sql } from '@op/db/client';
import { and, db, eq, isNull, lte, sql } from '@op/db/client';
import {
type DecisionProcess,
type DecisionProcessTransition,
type ProcessInstance,
ProcessStatus,
decisionProcessTransitions,
decisionProcesses,
processInstances,
} from '@op/db/schema';
import pMap from 'p-map';

import { CommonError } from '../../utils';
// import { processResults } from './processResults';
import type { ProcessSchema, StateDefinition } from './types';
import type { DecisionInstanceData } from './schemas/instanceData';

/** Transition with nested process instance and process relations */
type TransitionWithRelations = DecisionProcessTransition & {
processInstance: ProcessInstance & {
process: DecisionProcess;
};
};

export interface ProcessDecisionsTransitionsResult {
processed: number;
Expand All @@ -33,14 +43,30 @@ export async function processDecisionsTransitions(): Promise<ProcessDecisionsTra
};

try {
const dueTransitions = await db._query.decisionProcessTransitions.findMany({
where: (transitions, { and, isNull }) =>
// Query due transitions, filtering to only include published instances
// Draft, completed, and cancelled instances should not have their transitions processed
const dueTransitions = await db
.select({
id: decisionProcessTransitions.id,
processInstanceId: decisionProcessTransitions.processInstanceId,
fromStateId: decisionProcessTransitions.fromStateId,
toStateId: decisionProcessTransitions.toStateId,
scheduledDate: decisionProcessTransitions.scheduledDate,
completedAt: decisionProcessTransitions.completedAt,
})
.from(decisionProcessTransitions)
.innerJoin(
processInstances,
eq(decisionProcessTransitions.processInstanceId, processInstances.id),
)
.where(
and(
isNull(transitions.completedAt),
lte(transitions.scheduledDate, now),
isNull(decisionProcessTransitions.completedAt),
lte(decisionProcessTransitions.scheduledDate, now),
eq(processInstances.status, ProcessStatus.PUBLISHED),
),
orderBy: (transitions, { asc }) => [asc(transitions.scheduledDate)],
});
)
.orderBy(decisionProcessTransitions.scheduledDate);

// Group transitions by processInstanceId to avoid race conditions
// within the same process instance
Expand All @@ -57,7 +83,7 @@ export async function processDecisionsTransitions(): Promise<ProcessDecisionsTra
}
}

// Process each process instance's transitions in parallel (up to 4 processes at a time)
// Process each process instance's transitions in parallel (up to 5 processes at a time)
// but process transitions WITHIN each process sequentially to prevent race conditions per process
await pMap(
Array.from(transitionsByProcess.entries()),
Expand All @@ -84,7 +110,7 @@ export async function processDecisionsTransitions(): Promise<ProcessDecisionsTra
}
}
},
{ concurrency: 4 },
{ concurrency: 5 },
);

return result;
Expand All @@ -102,46 +128,51 @@ export async function processDecisionsTransitions(): Promise<ProcessDecisionsTra
* If transitioning to a final state (results phase), triggers results processing.
*/
async function processTransition(transitionId: string): Promise<void> {
const transition = await db._query.decisionProcessTransitions.findFirst({
where: eq(decisionProcessTransitions.id, transitionId),
});
// Fetch transition with related process instance and process in a single query
const transitionResult = await db._query.decisionProcessTransitions.findFirst(
{
where: eq(decisionProcessTransitions.id, transitionId),
with: {
processInstance: {
with: {
process: true,
},
},
},
},
);

if (!transition) {
if (!transitionResult) {
throw new CommonError(
`Transition not found: ${transitionId}. It may have been deleted or the ID is invalid.`,
);
}

// Type assertion for the nested relations (Drizzle's type inference doesn't handle nested `with` well)
const transition = transitionResult as TransitionWithRelations;

if (transition.completedAt) {
return;
}

// Get the process instance to check if we're transitioning to a final state
const processInstance = await db._query.processInstances.findFirst({
where: eq(processInstances.id, transition.processInstanceId),
});

const processInstance = transition.processInstance;
if (!processInstance) {
throw new CommonError(
`Process instance not found: ${transition.processInstanceId}`,
);
}

// Get the process schema to check the state type
const process = await db._query.decisionProcesses.findFirst({
where: eq(decisionProcesses.id, processInstance.processId),
});

const process = processInstance.process;
if (!process) {
throw new CommonError(`Process not found: ${processInstance.processId}`);
}

const processSchema = process.processSchema as ProcessSchema;
const toState = processSchema.states.find(
(state: StateDefinition) => state.id === transition.toStateId,
);

const isTransitioningToFinalState = toState?.type === 'final';
// Determine if transitioning to final state using instanceData.phases
// In the new schema format, the last phase is always the final state
const instanceData = processInstance.instanceData as DecisionInstanceData;
const phases = instanceData.phases;
const lastPhaseId = phases[phases.length - 1]?.phaseId;
const isTransitioningToFinalState = transition.toStateId === lastPhaseId;

// Update both the process instance and transition in a single transaction
// to ensure atomicity and prevent partial state updates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { assertAccess, permission } from 'access-zones';

import { CommonError, NotFoundError } from '../../utils';
import { getProfileAccessUser } from '../access';
import { createTransitionsForProcess } from './createTransitionsForProcess';
import type {
DecisionInstanceData,
PhaseOverride,
Expand Down Expand Up @@ -161,14 +162,20 @@ export const updateDecisionInstance = async ({

// Determine the final status (updated or existing)
const finalStatus = status ?? existingInstance.status;
const wasPublished =
status === ProcessStatus.PUBLISHED &&
existingInstance.status === ProcessStatus.DRAFT;

// If status is DRAFT, remove all transitions
if (finalStatus === ProcessStatus.DRAFT) {
await tx
.delete(decisionProcessTransitions)
.where(eq(decisionProcessTransitions.processInstanceId, instanceId));
} else if (wasPublished) {
// When publishing a draft, create transitions for all date-based phases
await createTransitionsForProcess({ processInstance: updatedInstance });
} else if (phases && phases.length > 0) {
// If phases were updated and not DRAFT, update the corresponding transitions
// If phases were updated and already published, update the corresponding transitions
await updateTransitionsForProcess({
processInstance: updatedInstance,
tx,
Expand Down
Loading
Loading