@@ -3,9 +3,17 @@ import type { ILifecycle, WorkerBootstrap, WorkerRow } from '../core/types.js';
33import type { Logger } from '../platform/types.js' ;
44import { States , WorkerState } from '../core/WorkerState.js' ;
55import type { AnyFlow } from '@pgflow/dsl' ;
6+ import { extractFlowShape } from '@pgflow/dsl' ;
7+ import {
8+ isLocalSupabase ,
9+ KNOWN_LOCAL_ANON_KEY ,
10+ KNOWN_LOCAL_SERVICE_ROLE_KEY ,
11+ } from '../shared/localDetection.js' ;
12+ import { FlowShapeMismatchError } from './errors.js' ;
613
714export interface FlowLifecycleConfig {
815 heartbeatInterval ?: number ;
16+ env ?: Record < string , string | undefined > ;
917}
1018
1119/**
@@ -21,13 +29,15 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
2129 private _workerId ?: string ;
2230 private heartbeatInterval : number ;
2331 private lastHeartbeat = 0 ;
32+ private env ?: Record < string , string | undefined > ;
2433
2534 constructor ( queries : Queries , flow : TFlow , logger : Logger , config ?: FlowLifecycleConfig ) {
2635 this . queries = queries ;
2736 this . flow = flow ;
2837 this . logger = logger ;
2938 this . workerState = new WorkerState ( logger ) ;
3039 this . heartbeatInterval = config ?. heartbeatInterval ?? 5000 ;
40+ this . env = config ?. env ;
3141 }
3242
3343 async acknowledgeStart ( workerBootstrap : WorkerBootstrap ) : Promise < void > {
@@ -36,6 +46,10 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
3646 // Store workerId for supplier pattern
3747 this . _workerId = workerBootstrap . workerId ;
3848
49+ // Compile/verify flow as part of Starting (before registering worker)
50+ await this . ensureFlowCompiled ( ) ;
51+
52+ // Only register worker after successful compilation
3953 this . workerRow = await this . queries . onWorkerStarted ( {
4054 queueName : this . queueName ,
4155 ...workerBootstrap ,
@@ -44,6 +58,37 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
4458 this . workerState . transitionTo ( States . Running ) ;
4559 }
4660
61+ private async ensureFlowCompiled ( ) : Promise < void > {
62+ this . logger . info ( `Ensuring flow '${ this . flow . slug } ' is compiled...` ) ;
63+
64+ const shape = extractFlowShape ( this . flow ) ;
65+ const mode = this . detectCompilationMode ( ) ;
66+
67+ const result = await this . queries . ensureFlowCompiled (
68+ this . flow . slug ,
69+ shape ,
70+ mode
71+ ) ;
72+
73+ if ( result . status === 'mismatch' ) {
74+ throw new FlowShapeMismatchError ( this . flow . slug , result . differences ) ;
75+ }
76+
77+ this . logger . info ( `Flow '${ this . flow . slug } ' ${ result . status } (mode: ${ mode } )` ) ;
78+ }
79+
80+ private detectCompilationMode ( ) : 'development' | 'production' {
81+ // Use provided env if available, otherwise fall back to global detection
82+ if ( this . env ) {
83+ const anonKey = this . env [ 'SUPABASE_ANON_KEY' ] ;
84+ const serviceRoleKey = this . env [ 'SUPABASE_SERVICE_ROLE_KEY' ] ;
85+ const isLocal = anonKey === KNOWN_LOCAL_ANON_KEY ||
86+ serviceRoleKey === KNOWN_LOCAL_SERVICE_ROLE_KEY ;
87+ return isLocal ? 'development' : 'production' ;
88+ }
89+ return isLocalSupabase ( ) ? 'development' : 'production' ;
90+ }
91+
4792 acknowledgeStop ( ) {
4893 this . workerState . transitionTo ( States . Stopping ) ;
4994
0 commit comments