From dac49571a578ecb977e44c2a7641c15d2ca69fcf Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Sun, 27 Jul 2025 13:22:19 +0200 Subject: [PATCH 1/2] Reshaped and simplified Workflow definition --- src/packages/emmett/src/index.ts | 1 + src/packages/emmett/src/typing/index.ts | 1 - src/packages/emmett/src/typing/workflow.ts | 108 ------ src/packages/emmett/src/workflows/index.ts | 2 + src/packages/emmett/src/workflows/workflow.ts | 39 +++ .../workflow.unit.spec.ts | 320 +++++++++--------- .../emmett/src/workflows/workflowHandler.ts | 0 .../emmett/src/workflows/workflowProcessor.ts | 78 +++++ 8 files changed, 281 insertions(+), 268 deletions(-) delete mode 100644 src/packages/emmett/src/typing/workflow.ts create mode 100644 src/packages/emmett/src/workflows/index.ts create mode 100644 src/packages/emmett/src/workflows/workflow.ts rename src/packages/emmett/src/{typing => workflows}/workflow.unit.spec.ts (65%) create mode 100644 src/packages/emmett/src/workflows/workflowHandler.ts create mode 100644 src/packages/emmett/src/workflows/workflowProcessor.ts diff --git a/src/packages/emmett/src/index.ts b/src/packages/emmett/src/index.ts index 45dcae02..d88c4fc3 100644 --- a/src/packages/emmett/src/index.ts +++ b/src/packages/emmett/src/index.ts @@ -14,3 +14,4 @@ export * from './testing'; export * from './typing'; export * from './utils'; export * from './validation'; +export * from './workflows'; diff --git a/src/packages/emmett/src/typing/index.ts b/src/packages/emmett/src/typing/index.ts index 2ea5fce1..86bc77e2 100644 --- a/src/packages/emmett/src/typing/index.ts +++ b/src/packages/emmett/src/typing/index.ts @@ -6,7 +6,6 @@ export * from './message'; export * from './messageHandling'; export * from './decider'; -export * from './workflow'; export type Brand = K & { readonly __brand: T }; export type Flavour = K & { readonly __brand?: T }; diff --git a/src/packages/emmett/src/typing/workflow.ts b/src/packages/emmett/src/typing/workflow.ts deleted file mode 100644 index c35aea0e..00000000 --- a/src/packages/emmett/src/typing/workflow.ts +++ /dev/null @@ -1,108 +0,0 @@ -import type { AnyCommand } from './command'; -import type { AnyEvent } from './event'; - -/// Inspired by https://blog.bittacklr.be/the-workflow-pattern.html - -export type Workflow< - Input extends AnyEvent | AnyCommand, - State, - Output extends AnyEvent | AnyCommand, -> = { - decide: (command: Input, state: State) => WorkflowOutput[]; - evolve: (currentState: State, event: WorkflowEvent) => State; - initialState: () => State; -}; - -export type WorkflowEvent = Extract< - Output, - { kind?: 'Event' } ->; - -export type WorkflowCommand = Extract< - Output, - { kind?: 'Command' } ->; - -export type WorkflowOutput = - | { action: 'Reply'; message: TOutput } - | { action: 'Send'; message: WorkflowCommand } - | { action: 'Publish'; message: WorkflowEvent } - | { - action: 'Schedule'; - message: TOutput; - when: { afterInMs: number } | { at: Date }; - } - | { action: 'Complete' } - | { action: 'Accept' } - | { action: 'Ignore'; reason: string } - | { action: 'Error'; reason: string }; - -export const reply = ( - message: TOutput, -): WorkflowOutput => { - return { - action: 'Reply', - message, - }; -}; - -export const send = ( - message: WorkflowCommand, -): WorkflowOutput => { - return { - action: 'Send', - message, - }; -}; - -export const publish = ( - message: WorkflowEvent, -): WorkflowOutput => { - return { - action: 'Publish', - message, - }; -}; - -export const schedule = ( - message: TOutput, - when: { afterInMs: number } | { at: Date }, -): WorkflowOutput => { - return { - action: 'Schedule', - message, - when, - }; -}; - -export const complete = < - TOutput extends AnyEvent | AnyCommand, ->(): WorkflowOutput => { - return { - action: 'Complete', - }; -}; - -export const ignore = ( - reason: string, -): WorkflowOutput => { - return { - action: 'Ignore', - reason, - }; -}; - -export const error = ( - reason: string, -): WorkflowOutput => { - return { - action: 'Error', - reason, - }; -}; - -export const accept = < - TOutput extends AnyEvent | AnyCommand, ->(): WorkflowOutput => { - return { action: 'Accept' }; -}; diff --git a/src/packages/emmett/src/workflows/index.ts b/src/packages/emmett/src/workflows/index.ts new file mode 100644 index 00000000..71c4e253 --- /dev/null +++ b/src/packages/emmett/src/workflows/index.ts @@ -0,0 +1,2 @@ +export * from './workflow'; +export * from './workflowProcessor'; diff --git a/src/packages/emmett/src/workflows/workflow.ts b/src/packages/emmett/src/workflows/workflow.ts new file mode 100644 index 00000000..b411992f --- /dev/null +++ b/src/packages/emmett/src/workflows/workflow.ts @@ -0,0 +1,39 @@ +import type { EmmettError } from '../errors'; +import type { AnyCommand } from '../typing/command'; +import type { AnyEvent } from '../typing/event'; + +/// Inspired by https://blog.bittacklr.be/the-workflow-pattern.html + +export type Workflow< + Input extends AnyEvent | AnyCommand, + State, + Output extends AnyEvent | AnyCommand, +> = { + name?: string; + decide: (command: Input, state: State) => WorkflowOutput; + evolve: (currentState: State, event: WorkflowEvent) => State; + initialState: () => State; +}; + +export type WorkflowEvent = Extract< + Output, + { kind?: 'Event' } +>; + +export type WorkflowCommand = Extract< + Output, + { kind?: 'Command' } +>; + +export type WorkflowOutput = + Output | Output[]; + +export const Workflow = < + Input extends AnyEvent | AnyCommand, + State, + Output extends AnyEvent | AnyCommand, +>( + workflow: Workflow, +): Workflow => { + return workflow; +}; diff --git a/src/packages/emmett/src/typing/workflow.unit.spec.ts b/src/packages/emmett/src/workflows/workflow.unit.spec.ts similarity index 65% rename from src/packages/emmett/src/typing/workflow.unit.spec.ts rename to src/packages/emmett/src/workflows/workflow.unit.spec.ts index 5adc938e..761234e1 100644 --- a/src/packages/emmett/src/typing/workflow.unit.spec.ts +++ b/src/packages/emmett/src/workflows/workflow.unit.spec.ts @@ -1,16 +1,11 @@ -import type { Command } from './command'; -import type { Event } from './event'; +import { type Command } from '../typing/command'; +import type { Event } from '../typing/event'; import { - accept, - complete, - error, - ignore, - publish, - send, type Workflow, type WorkflowEvent, type WorkflowOutput, -} from './workflow'; +} from '../workflows/workflow'; +import { workflowProcessor } from './workflowProcessor'; export type CheckOut = Command< 'CheckOut', @@ -107,7 +102,7 @@ export type GroupCheckoutTimedOut = Event< >; //////////////////////////////////////////// -////////// Entity +////////// State /////////////////////////////////////////// export type GroupCheckout = @@ -131,7 +126,7 @@ export enum GuestStayStatus { } //////////////////////////////////////////// -////////// Workflow Definition +////////// Workflow Inputs & Outputs /////////////////////////////////////////// export type GroupCheckoutInput = @@ -147,118 +142,6 @@ export type GroupCheckoutOutput = | GroupCheckoutFailed | GroupCheckoutTimedOut; -export enum IgnoredReason { - GroupCheckoutAlreadyInitiated = 'GroupCheckoutAlreadyInitiated', - GuestCheckoutWasNotPartOfGroupCheckout = 'GuestCheckoutWasNotPartOfGroupCheckout', - GuestCheckoutAlreadyFinished = 'GuestCheckoutAlreadyFinished', - GroupCheckoutAlreadyFinished = 'GroupCheckoutAlreadyFinished', - GroupCheckoutDoesNotExist = 'GroupCheckoutDoesNotExist', -} - -export type ErrorReason = 'UnknownInputType'; - -//////////////////////////////////////////// -////////// Decide -/////////////////////////////////////////// - -export const decide = ( - input: GroupCheckoutInput, - state: GroupCheckout, -): WorkflowOutput[] => { - const { type, data } = input; - - switch (type) { - case 'InitiateGroupCheckout': { - if (state.status !== 'NotExisting') - return [ignore(IgnoredReason.GroupCheckoutAlreadyInitiated)]; - - const checkoutGuestStays = data.guestStayAccountIds.map((id) => { - return send({ - type: 'CheckOut', - data: { - guestStayAccountId: id, - groupCheckoutId: data.groupCheckoutId, - }, - metadata: { - now: data.now, - }, - }); - }); - - return [ - ...checkoutGuestStays, - publish({ - type: 'GroupCheckoutInitiated', - data: { - groupCheckoutId: data.groupCheckoutId, - guestStayAccountIds: data.guestStayAccountIds, - initiatedAt: data.now, - clerkId: data.clerkId, - }, - }), - ]; - } - case 'GuestCheckedOut': - case 'GuestCheckoutFailed': { - if (!data.groupCheckoutId) - return [ignore(IgnoredReason.GuestCheckoutWasNotPartOfGroupCheckout)]; - - if (state.status === 'NotExisting') - return [ignore(IgnoredReason.GroupCheckoutDoesNotExist)]; - - if (state.status === 'Finished') - return [ignore(IgnoredReason.GuestCheckoutAlreadyFinished)]; - - const { guestStayAccountId, groupCheckoutId } = data; - - const guestCheckoutStatus = - state.guestStayAccountIds.get(guestStayAccountId); - - if (isAlreadyClosed(guestCheckoutStatus)) - return [ignore(IgnoredReason.GuestCheckoutAlreadyFinished)]; - - const guestStayAccountIds = state.guestStayAccountIds.set( - guestStayAccountId, - type === 'GuestCheckedOut' - ? GuestStayStatus.Completed - : GuestStayStatus.Failed, - ); - - const now = - type === 'GuestCheckedOut' ? data.checkedOutAt : data.failedAt; - - return areAnyOngoingCheckouts(guestStayAccountIds) - ? [accept()] - : [ - publish(finished(groupCheckoutId, state.guestStayAccountIds, now)), - complete(), - ]; - } - case 'TimeoutGroupCheckout': { - if (state.status === 'NotExisting') - return [ignore(IgnoredReason.GroupCheckoutDoesNotExist)]; - - if (state.status === 'Finished') - return [ignore(IgnoredReason.GroupCheckoutAlreadyFinished)]; - - return [ - publish( - timedOut( - data.groupCheckoutId, - state.guestStayAccountIds, - data.timeOutAt, - ), - ), - complete(), - ]; - } - default: { - const _notExistingEventType: never = type; - return [error('UnknownInputType')]; - } - } -}; - //////////////////////////////////////////// ////////// Evolve /////////////////////////////////////////// @@ -312,6 +195,34 @@ export const evolve = ( } }; +//////////////////////////////////////////// +////////// Decide +/////////////////////////////////////////// + +export const decide = ( + input: GroupCheckoutInput, + state: GroupCheckout, +): WorkflowOutput => { + const { type } = input; + + switch (type) { + case 'InitiateGroupCheckout': { + return initiateGroupCheckout(input, state); + } + case 'GuestCheckedOut': + case 'GuestCheckoutFailed': { + return onCheckoutFinished(input, state); + } + case 'TimeoutGroupCheckout': { + return timedOut(input, state); + } + } +}; + +//////////////////////////////////////////// +////////// Workflow Definition +//////////////////////////////////////////// + export const GroupCheckoutWorkflow: Workflow< GroupCheckoutInput, GroupCheckout, @@ -322,25 +233,119 @@ export const GroupCheckoutWorkflow: Workflow< initialState, }; -export const isAlreadyClosed = (status: GuestStayStatus | undefined) => - status === GuestStayStatus.Completed || status === GuestStayStatus.Failed; +//////////////////////////////////////////// +////////// Workflow Processor +//////////////////////////////////////////// -const areAnyOngoingCheckouts = ( - guestStayAccounts: Map, -) => [...guestStayAccounts.values()].some((status) => !isAlreadyClosed(status)); +export const groupCheckoutWorkflowProcessor = workflowProcessor({ + processorId: 'GroupCheckoutWorkflow', + workflow: GroupCheckoutWorkflow, + inputs: { + commands: ['InitiateGroupCheckout', 'TimeoutGroupCheckout'], + events: ['GuestCheckedOut', 'GuestCheckoutFailed'], + }, + outputs: { + commands: ['CheckOut'], + events: [ + 'GroupCheckoutCompleted', + 'GroupCheckoutFailed', + 'GroupCheckoutTimedOut', + ], + }, +}); -const areAllCompleted = (guestStayAccounts: Map) => - [...guestStayAccounts.values()].some( - (status) => status === GuestStayStatus.Completed, +//////////////////////////////////////////// +////////// Logic +/////////////////////////////////////////// + +const initiateGroupCheckout = ( + { data }: InitiateGroupCheckout, + state: GroupCheckout, +): [GroupCheckoutInitiated, ...CheckOut[]] | [] => { + if (state.status !== 'NotExisting') return []; + + const checkoutGuestStays: CheckOut[] = data.guestStayAccountIds.map((id) => ({ + type: 'CheckOut', + data: { + guestStayAccountId: id, + groupCheckoutId: data.groupCheckoutId, + }, + metadata: { + now: data.now, + }, + })); + + return [ + { + type: 'GroupCheckoutInitiated', + data: { + groupCheckoutId: data.groupCheckoutId, + guestStayAccountIds: data.guestStayAccountIds, + initiatedAt: data.now, + clerkId: data.clerkId, + }, + }, + ...checkoutGuestStays, + ]; +}; + +const onCheckoutFinished = ( + { type, data }: GuestCheckedOut | GuestCheckoutFailed, + state: GroupCheckout, +): GroupCheckoutCompleted | GroupCheckoutFailed | [] => { + if ( + !data.groupCheckoutId || + state.status === 'NotExisting' || + state.status === 'Finished' + ) + return []; + + const { guestStayAccountId, groupCheckoutId } = data; + + const guestCheckoutStatus = state.guestStayAccountIds.get(guestStayAccountId); + + if (isAlreadyClosed(guestCheckoutStatus)) return []; + + const guestStayAccountIds = state.guestStayAccountIds.set( + guestStayAccountId, + type === 'GuestCheckedOut' + ? GuestStayStatus.Completed + : GuestStayStatus.Failed, ); -const checkoutsWith = ( - guestStayAccounts: Map, - status: GuestStayStatus, -): string[] => - [...guestStayAccounts.entries()] - .filter((s) => s[1] === status) - .map((s) => s[0]); + const now = type === 'GuestCheckedOut' ? data.checkedOutAt : data.failedAt; + + return areAnyOngoingCheckouts(guestStayAccountIds) + ? [] + : finished(groupCheckoutId, state.guestStayAccountIds, now); +}; + +const timedOut = ( + command: TimeoutGroupCheckout, + state: GroupCheckout, +): GroupCheckoutTimedOut | [] => { + if (state.status === 'NotExisting' || state.status === 'Finished') return []; + + return { + type: 'GroupCheckoutTimedOut', + data: { + groupCheckoutId: command.data.groupCheckoutId, + incompleteCheckouts: checkoutsWith( + state.guestStayAccountIds, + GuestStayStatus.Pending, + ), + completedCheckouts: checkoutsWith( + state.guestStayAccountIds, + GuestStayStatus.Completed, + ), + failedCheckouts: checkoutsWith( + state.guestStayAccountIds, + GuestStayStatus.Failed, + ), + timedOutAt: command.data.timeOutAt, + }, + }; +}; const finished = ( groupCheckoutId: string, @@ -373,25 +378,22 @@ const finished = ( }; }; -const timedOut = ( - groupCheckoutId: string, +export const isAlreadyClosed = (status: GuestStayStatus | undefined) => + status === GuestStayStatus.Completed || status === GuestStayStatus.Failed; + +const areAnyOngoingCheckouts = ( guestStayAccounts: Map, - now: Date, -): GroupCheckoutTimedOut => { - return { - type: 'GroupCheckoutTimedOut', - data: { - groupCheckoutId, - incompleteCheckouts: checkoutsWith( - guestStayAccounts, - GuestStayStatus.Pending, - ), - completedCheckouts: checkoutsWith( - guestStayAccounts, - GuestStayStatus.Completed, - ), - failedCheckouts: checkoutsWith(guestStayAccounts, GuestStayStatus.Failed), - timedOutAt: now, - }, - }; -}; +) => [...guestStayAccounts.values()].some((status) => !isAlreadyClosed(status)); + +const areAllCompleted = (guestStayAccounts: Map) => + [...guestStayAccounts.values()].some( + (status) => status === GuestStayStatus.Completed, + ); + +const checkoutsWith = ( + guestStayAccounts: Map, + status: GuestStayStatus, +): string[] => + [...guestStayAccounts.entries()] + .filter((s) => s[1] === status) + .map((s) => s[0]); diff --git a/src/packages/emmett/src/workflows/workflowHandler.ts b/src/packages/emmett/src/workflows/workflowHandler.ts new file mode 100644 index 00000000..e69de29b diff --git a/src/packages/emmett/src/workflows/workflowProcessor.ts b/src/packages/emmett/src/workflows/workflowProcessor.ts new file mode 100644 index 00000000..8ecc7e34 --- /dev/null +++ b/src/packages/emmett/src/workflows/workflowProcessor.ts @@ -0,0 +1,78 @@ +import { + MessageProcessor, + MessageProcessorType, + reactor, + type BaseMessageProcessorOptions, +} from '../processors'; +import type { + AnyCommand, + AnyEvent, + AnyReadEventMetadata, + AnyRecordedMessageMetadata, + CanHandle, + DefaultRecord, + GlobalPositionTypeOfRecordedMessageMetadata, + MessageTypeOf, + RecordedMessage, +} from '../typing'; +import type { Workflow, WorkflowCommand, WorkflowEvent } from './workflow'; + +export type WorkflowOptions< + Input extends AnyEvent | AnyCommand, + State, + Output extends AnyEvent | AnyCommand, + MessageMetadataType extends AnyReadEventMetadata = AnyReadEventMetadata, + HandlerContext extends DefaultRecord = DefaultRecord, + CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, +> = Omit< + BaseMessageProcessorOptions< + Input, + MessageMetadataType, + HandlerContext, + CheckpointType + >, + 'type' | 'canHandle' +> & { + workflow: Workflow; + inputs: { + commands: CanHandle>; + events: CanHandle>; + }; + outputs: { + commands: MessageTypeOf>[]; + events: MessageTypeOf>[]; + }; +}; + +export const workflowProcessor = < + Input extends AnyEvent | AnyCommand, + State, + Output extends AnyEvent | AnyCommand, + MetaDataType extends AnyRecordedMessageMetadata = AnyRecordedMessageMetadata, + HandlerContext extends DefaultRecord = DefaultRecord, + CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, +>( + options: WorkflowOptions< + Input, + State, + Output, + MetaDataType, + HandlerContext, + CheckpointType + >, +): MessageProcessor => { + const { workflow: _workflow, ...rest } = options; + + return reactor({ + ...rest, + canHandle: [...options.inputs.commands, ...options.inputs.events], + type: MessageProcessorType.PROJECTOR, + eachMessage: async ( + _message: RecordedMessage, + _context: HandlerContext, + ) => { + // if (!options.input.includes(message.type)) return; + // await projection.handle([message], context); + }, + }); +}; From 2e020acfea06b3e7e1e8d486db35889488315cb5 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Sun, 27 Jul 2025 17:39:08 +0200 Subject: [PATCH 2/2] Added getWorkflowId to route messages to specific workflow --- src/packages/emmett/src/workflows/workflow.unit.spec.ts | 5 +++-- src/packages/emmett/src/workflows/workflowProcessor.ts | 3 +++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/packages/emmett/src/workflows/workflow.unit.spec.ts b/src/packages/emmett/src/workflows/workflow.unit.spec.ts index 761234e1..7ea7a27f 100644 --- a/src/packages/emmett/src/workflows/workflow.unit.spec.ts +++ b/src/packages/emmett/src/workflows/workflow.unit.spec.ts @@ -211,7 +211,7 @@ export const decide = ( } case 'GuestCheckedOut': case 'GuestCheckoutFailed': { - return onCheckoutFinished(input, state); + return completeGroupCheckout(input, state); } case 'TimeoutGroupCheckout': { return timedOut(input, state); @@ -240,6 +240,7 @@ export const GroupCheckoutWorkflow: Workflow< export const groupCheckoutWorkflowProcessor = workflowProcessor({ processorId: 'GroupCheckoutWorkflow', workflow: GroupCheckoutWorkflow, + getWorkflowId: (input) => input.data.groupCheckoutId ?? null, inputs: { commands: ['InitiateGroupCheckout', 'TimeoutGroupCheckout'], events: ['GuestCheckedOut', 'GuestCheckoutFailed'], @@ -289,7 +290,7 @@ const initiateGroupCheckout = ( ]; }; -const onCheckoutFinished = ( +const completeGroupCheckout = ( { type, data }: GuestCheckedOut | GuestCheckoutFailed, state: GroupCheckout, ): GroupCheckoutCompleted | GroupCheckoutFailed | [] => { diff --git a/src/packages/emmett/src/workflows/workflowProcessor.ts b/src/packages/emmett/src/workflows/workflowProcessor.ts index 8ecc7e34..449311fd 100644 --- a/src/packages/emmett/src/workflows/workflowProcessor.ts +++ b/src/packages/emmett/src/workflows/workflowProcessor.ts @@ -34,6 +34,9 @@ export type WorkflowOptions< 'type' | 'canHandle' > & { workflow: Workflow; + getWorkflowId: ( + input: RecordedMessage, + ) => string | null; inputs: { commands: CanHandle>; events: CanHandle>;