Skip to content

Commit e4cea89

Browse files
committed
add ensureCompiledOnStartup config flag to opt-out of auto-compilation
1 parent bcc4f76 commit e4cea89

File tree

6 files changed

+332
-3
lines changed

6 files changed

+332
-3
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@pgflow/core': patch
3+
'@pgflow/dsl': patch
4+
'@pgflow/edge-worker': patch
5+
---
6+
7+
Add automatic flow compilation at worker startup. Workers now call ensure_flow_compiled to verify flows are up-to-date. In development, mismatched flows are recompiled automatically. In production, mismatches cause errors. Use ensureCompiledOnStartup: false to opt-out.

pkgs/edge-worker/src/core/workerConfigTypes.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,14 @@ export type ResolvedQueueWorkerConfig = Required<Omit<QueueWorkerConfig, 'retryD
144144
* Configuration for the flow worker with two-phase polling
145145
*/
146146
export type FlowWorkerConfig = {
147+
/**
148+
* Whether to verify/compile flow at worker startup.
149+
* When true (default), worker calls pgflow.ensure_flow_compiled() before polling.
150+
* Set to false to skip compilation check (useful if flows are pre-compiled via CLI).
151+
* @default true
152+
*/
153+
ensureCompiledOnStartup?: boolean;
154+
147155
/**
148156
* How many tasks are processed at the same time
149157
* @default 10
@@ -201,7 +209,7 @@ export type FlowWorkerConfig = {
201209
/**
202210
* Resolved flow configuration with all defaults applied
203211
*/
204-
export type ResolvedFlowWorkerConfig = Required<Omit<FlowWorkerConfig, 'connectionString' | 'env'>> & {
212+
export type ResolvedFlowWorkerConfig = Required<Omit<FlowWorkerConfig, 'connectionString' | 'env' | 'ensureCompiledOnStartup'>> & {
205213
connectionString: string | undefined;
206214
env: Record<string, string | undefined>;
207215
};

pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { FlowShapeMismatchError } from './errors.js';
99
export interface FlowLifecycleConfig {
1010
heartbeatInterval?: number;
1111
isLocalEnvironment?: boolean;
12+
ensureCompiledOnStartup?: boolean;
1213
}
1314

1415
/**
@@ -25,6 +26,7 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
2526
private heartbeatInterval: number;
2627
private lastHeartbeat = 0;
2728
private isLocalEnvironment: boolean;
29+
private ensureCompiledOnStartup: boolean;
2830

2931
constructor(queries: Queries, flow: TFlow, logger: Logger, config?: FlowLifecycleConfig) {
3032
this.queries = queries;
@@ -33,6 +35,7 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
3335
this.workerState = new WorkerState(logger);
3436
this.heartbeatInterval = config?.heartbeatInterval ?? 5000;
3537
this.isLocalEnvironment = config?.isLocalEnvironment ?? false;
38+
this.ensureCompiledOnStartup = config?.ensureCompiledOnStartup ?? true;
3639
}
3740

3841
async acknowledgeStart(workerBootstrap: WorkerBootstrap): Promise<void> {
@@ -42,7 +45,11 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
4245
this._workerId = workerBootstrap.workerId;
4346

4447
// Compile/verify flow as part of Starting (before registering worker)
45-
await this.ensureFlowCompiled();
48+
if (this.ensureCompiledOnStartup) {
49+
await this.ensureFlowCompiled();
50+
} else {
51+
this.logger.info(`Skipping compilation check for flow '${this.flow.slug}' (ensureCompiledOnStartup=false)`);
52+
}
4653

4754
// Only register worker after successful compilation
4855
this.workerRow = await this.queries.onWorkerStarted({

pkgs/edge-worker/src/flow/createFlowWorker.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ export function createFlowWorker<TFlow extends AnyFlow, TResources extends Recor
9090
queries,
9191
flow,
9292
createLogger('FlowWorkerLifecycle'),
93-
{ isLocalEnvironment: platformAdapter.isLocalEnvironment }
93+
{
94+
isLocalEnvironment: platformAdapter.isLocalEnvironment,
95+
ensureCompiledOnStartup: config.ensureCompiledOnStartup ?? true
96+
}
9497
);
9598

9699
// Create frozen worker config ONCE for reuse across all task executions

pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,3 +307,138 @@ Deno.test(
307307
}
308308
})
309309
);
310+
311+
// Tests for ensureCompiledOnStartup config option
312+
313+
Deno.test(
314+
'skips compilation when ensureCompiledOnStartup is false',
315+
withPgNoTransaction(async (sql) => {
316+
await sql`select pgflow_tests.reset_db();`;
317+
318+
// Verify flow does NOT exist
319+
const [flowBefore] = await sql`
320+
SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow'
321+
`;
322+
assertEquals(flowBefore, undefined, 'Flow should not exist before worker startup');
323+
324+
// Create worker with ensureCompiledOnStartup: false
325+
const worker = createFlowWorker(
326+
TestCompilationFlow,
327+
{
328+
sql,
329+
ensureCompiledOnStartup: false, // SKIP compilation
330+
maxConcurrent: 1,
331+
batchSize: 10,
332+
maxPollSeconds: 1,
333+
pollIntervalMs: 200,
334+
},
335+
createLogger,
336+
createPlatformAdapterWithLocalEnv(sql, false)
337+
);
338+
339+
try {
340+
worker.startOnlyOnce({
341+
edgeFunctionName: 'test_compilation',
342+
workerId: crypto.randomUUID(),
343+
});
344+
await delay(100);
345+
346+
// Flow should NOT have been created (compilation was skipped)
347+
const [flowAfter] = await sql`
348+
SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow'
349+
`;
350+
assertEquals(flowAfter, undefined, 'Flow should NOT be created when compilation skipped');
351+
} finally {
352+
await worker.stop();
353+
}
354+
})
355+
);
356+
357+
Deno.test(
358+
'compiles flow when ensureCompiledOnStartup is explicitly true',
359+
withPgNoTransaction(async (sql) => {
360+
await sql`select pgflow_tests.reset_db();`;
361+
362+
// Verify flow does NOT exist
363+
const [flowBefore] = await sql`
364+
SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow'
365+
`;
366+
assertEquals(flowBefore, undefined, 'Flow should not exist before worker startup');
367+
368+
// Create worker with ensureCompiledOnStartup: true (explicit)
369+
const worker = createFlowWorker(
370+
TestCompilationFlow,
371+
{
372+
sql,
373+
ensureCompiledOnStartup: true, // EXPLICIT true
374+
maxConcurrent: 1,
375+
batchSize: 10,
376+
maxPollSeconds: 1,
377+
pollIntervalMs: 200,
378+
},
379+
createLogger,
380+
createPlatformAdapterWithLocalEnv(sql, false)
381+
);
382+
383+
try {
384+
worker.startOnlyOnce({
385+
edgeFunctionName: 'test_compilation',
386+
workerId: crypto.randomUUID(),
387+
});
388+
await delay(100);
389+
390+
// Flow SHOULD have been created
391+
const [flowAfter] = await sql`
392+
SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow'
393+
`;
394+
assertEquals(flowAfter?.flow_slug, 'test_compilation_flow', 'Flow should be created when ensureCompiledOnStartup is true');
395+
} finally {
396+
await worker.stop();
397+
}
398+
})
399+
);
400+
401+
Deno.test(
402+
'worker still registers and polls when ensureCompiledOnStartup is false',
403+
withPgNoTransaction(async (sql) => {
404+
await sql`select pgflow_tests.reset_db();`;
405+
406+
// Pre-compile the flow manually (simulating pre-compiled via CLI)
407+
await sql`SELECT pgflow.create_flow('test_compilation_flow')`;
408+
await sql`SELECT pgflow.add_step('test_compilation_flow', 'double')`;
409+
410+
const workerId = crypto.randomUUID();
411+
412+
// Create worker with ensureCompiledOnStartup: false
413+
const worker = createFlowWorker(
414+
TestCompilationFlow,
415+
{
416+
sql,
417+
ensureCompiledOnStartup: false, // Skip compilation check
418+
maxConcurrent: 1,
419+
batchSize: 10,
420+
maxPollSeconds: 1,
421+
pollIntervalMs: 200,
422+
},
423+
createLogger,
424+
createPlatformAdapterWithLocalEnv(sql, false)
425+
);
426+
427+
try {
428+
worker.startOnlyOnce({
429+
edgeFunctionName: 'test_compilation',
430+
workerId,
431+
});
432+
await delay(100);
433+
434+
// Worker should have registered (check workers table for this specific worker)
435+
const workers = await sql`
436+
SELECT * FROM pgflow.workers WHERE worker_id = ${workerId}
437+
`;
438+
assertEquals(workers.length, 1, 'Worker should be registered even when skipping compilation');
439+
assertEquals(workers[0].queue_name, 'test_compilation_flow', 'Worker should be registered for the correct queue');
440+
} finally {
441+
await worker.stop();
442+
}
443+
})
444+
);
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
import { assertEquals } from '@std/assert';
2+
import { FlowWorkerLifecycle } from '../../src/flow/FlowWorkerLifecycle.ts';
3+
import { Queries, type EnsureFlowCompiledResult } from '../../src/core/Queries.ts';
4+
import type { WorkerRow } from '../../src/core/types.ts';
5+
import { Flow, type FlowShape } from '@pgflow/dsl';
6+
import type { Logger } from '../../src/platform/types.ts';
7+
import type { postgres } from '../sql.ts';
8+
9+
// Mock Queries
10+
class MockQueries extends Queries {
11+
public ensureFlowCompiledCallCount = 0;
12+
13+
constructor() {
14+
// Pass null as sql since we'll override all methods
15+
super(null as unknown as postgres.Sql);
16+
}
17+
18+
override onWorkerStarted(params: { workerId: string; edgeFunctionName: string; queueName: string }): Promise<WorkerRow> {
19+
return Promise.resolve({
20+
worker_id: params.workerId,
21+
queue_name: params.queueName,
22+
function_name: params.edgeFunctionName,
23+
started_at: new Date().toISOString(),
24+
deprecated_at: null,
25+
last_heartbeat_at: new Date().toISOString(),
26+
});
27+
}
28+
29+
override sendHeartbeat(_workerRow: WorkerRow): Promise<{ is_deprecated: boolean }> {
30+
return Promise.resolve({ is_deprecated: false });
31+
}
32+
33+
override ensureFlowCompiled(
34+
_flowSlug: string,
35+
_shape: FlowShape,
36+
_mode: 'development' | 'production'
37+
): Promise<EnsureFlowCompiledResult> {
38+
this.ensureFlowCompiledCallCount++;
39+
return Promise.resolve({ status: 'verified', differences: [] });
40+
}
41+
}
42+
43+
// Real Flow for testing - using the DSL to create a valid flow
44+
const TestFlow = new Flow<{ value: number }>({ slug: 'test_flow' })
45+
.step({ slug: 'step1' }, (input) => input.run.value);
46+
47+
const createMockFlow = () => TestFlow;
48+
49+
const createLogger = (): Logger => ({
50+
debug: () => {},
51+
info: () => {},
52+
error: () => {},
53+
warn: () => {},
54+
});
55+
56+
Deno.test('FlowWorkerLifecycle - calls ensureFlowCompiled when ensureCompiledOnStartup is true', async () => {
57+
const mockQueries = new MockQueries();
58+
const mockFlow = createMockFlow();
59+
const logger = createLogger();
60+
61+
const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger, {
62+
ensureCompiledOnStartup: true
63+
});
64+
65+
await lifecycle.acknowledgeStart({
66+
workerId: 'test-worker-id',
67+
edgeFunctionName: 'test-function',
68+
});
69+
70+
assertEquals(mockQueries.ensureFlowCompiledCallCount, 1, 'ensureFlowCompiled should be called once');
71+
});
72+
73+
Deno.test('FlowWorkerLifecycle - skips ensureFlowCompiled when ensureCompiledOnStartup is false', async () => {
74+
const mockQueries = new MockQueries();
75+
const mockFlow = createMockFlow();
76+
const logger = createLogger();
77+
78+
const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger, {
79+
ensureCompiledOnStartup: false
80+
});
81+
82+
await lifecycle.acknowledgeStart({
83+
workerId: 'test-worker-id',
84+
edgeFunctionName: 'test-function',
85+
});
86+
87+
assertEquals(mockQueries.ensureFlowCompiledCallCount, 0, 'ensureFlowCompiled should NOT be called');
88+
});
89+
90+
Deno.test('FlowWorkerLifecycle - calls ensureFlowCompiled by default (no config)', async () => {
91+
const mockQueries = new MockQueries();
92+
const mockFlow = createMockFlow();
93+
const logger = createLogger();
94+
95+
const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger);
96+
97+
await lifecycle.acknowledgeStart({
98+
workerId: 'test-worker-id',
99+
edgeFunctionName: 'test-function',
100+
});
101+
102+
assertEquals(mockQueries.ensureFlowCompiledCallCount, 1, 'ensureFlowCompiled should be called by default');
103+
});
104+
105+
Deno.test('FlowWorkerLifecycle - calls ensureFlowCompiled by default (empty config)', async () => {
106+
const mockQueries = new MockQueries();
107+
const mockFlow = createMockFlow();
108+
const logger = createLogger();
109+
110+
const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger, {});
111+
112+
await lifecycle.acknowledgeStart({
113+
workerId: 'test-worker-id',
114+
edgeFunctionName: 'test-function',
115+
});
116+
117+
assertEquals(mockQueries.ensureFlowCompiledCallCount, 1, 'ensureFlowCompiled should be called with empty config');
118+
});
119+
120+
Deno.test('FlowWorkerLifecycle - logs skip message when ensureCompiledOnStartup is false', async () => {
121+
const logs: string[] = [];
122+
const testLogger: Logger = {
123+
debug: () => {},
124+
info: (msg: string) => logs.push(msg),
125+
error: () => {},
126+
warn: () => {},
127+
};
128+
129+
const mockQueries = new MockQueries();
130+
const mockFlow = createMockFlow();
131+
132+
const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, testLogger, {
133+
ensureCompiledOnStartup: false
134+
});
135+
136+
await lifecycle.acknowledgeStart({
137+
workerId: 'test-worker-id',
138+
edgeFunctionName: 'test-function',
139+
});
140+
141+
const skipLog = logs.find(log => log.includes('Skipping compilation'));
142+
assertEquals(skipLog !== undefined, true, 'Should log skip message');
143+
assertEquals(skipLog?.includes('ensureCompiledOnStartup=false'), true, 'Skip message should mention the config flag');
144+
});
145+
146+
Deno.test('FlowWorkerLifecycle - does not log skip message when ensureCompiledOnStartup is true', async () => {
147+
const logs: string[] = [];
148+
const testLogger: Logger = {
149+
debug: () => {},
150+
info: (msg: string) => logs.push(msg),
151+
error: () => {},
152+
warn: () => {},
153+
};
154+
155+
const mockQueries = new MockQueries();
156+
const mockFlow = createMockFlow();
157+
158+
const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, testLogger, {
159+
ensureCompiledOnStartup: true
160+
});
161+
162+
await lifecycle.acknowledgeStart({
163+
workerId: 'test-worker-id',
164+
edgeFunctionName: 'test-function',
165+
});
166+
167+
const skipLog = logs.find(log => log.includes('Skipping compilation'));
168+
assertEquals(skipLog, undefined, 'Should NOT log skip message when compilation is enabled');
169+
});

0 commit comments

Comments
 (0)