Skip to content

Commit 0b84bb0

Browse files
committed
add ensureCompiledOnStartup config flag to opt-out of auto-compilation (#499)
### TL;DR Add automatic flow compilation at worker startup with the ability to opt-out. ### What changed? - Added a new `ensureCompiledOnStartup` configuration option to `FlowWorkerConfig` (defaults to `true`) - When enabled (default), workers call `pgflow.ensure_flow_compiled()` at startup to verify flows are up-to-date - In development mode, mismatched flows are automatically recompiled - In production mode, mismatches cause errors - Added comprehensive tests for the new configuration option ### How to test? 1. Create a flow worker with default settings to see automatic compilation: ```typescript const worker = createFlowWorker(MyFlow, { /* config */ }); ``` 2. Opt-out of automatic compilation: ```typescript const worker = createFlowWorker(MyFlow, { ensureCompiledOnStartup: false, // other config }); ``` 3. Run tests to verify behavior: ``` deno test pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts deno test pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.compilation.test.ts ``` ### Why make this change? This change improves the developer experience by ensuring flows are properly compiled at worker startup. It helps catch flow definition mismatches early, automatically recompiling in development environments while failing fast in production. The opt-out option provides flexibility for environments where flows are pre-compiled via CLI or other means.
1 parent cf8811b commit 0b84bb0

File tree

6 files changed

+332
-3
lines changed

6 files changed

+332
-3
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@pgflow/core': patch
3+
'@pgflow/dsl': patch
4+
'@pgflow/edge-worker': patch
5+
---
6+
7+
Add automatic flow compilation at worker startup. Workers now call ensure_flow_compiled to verify flows are up-to-date. In development, mismatched flows are recompiled automatically. In production, mismatches cause errors. Use ensureCompiledOnStartup: false to opt-out.

pkgs/edge-worker/src/core/workerConfigTypes.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,14 @@ export type ResolvedQueueWorkerConfig = Required<Omit<QueueWorkerConfig, 'retryD
144144
* Configuration for the flow worker with two-phase polling
145145
*/
146146
export type FlowWorkerConfig = {
147+
/**
148+
* Whether to verify/compile flow at worker startup.
149+
* When true (default), worker calls pgflow.ensure_flow_compiled() before polling.
150+
* Set to false to skip compilation check (useful if flows are pre-compiled via CLI).
151+
* @default true
152+
*/
153+
ensureCompiledOnStartup?: boolean;
154+
147155
/**
148156
* How many tasks are processed at the same time
149157
* @default 10
@@ -201,7 +209,7 @@ export type FlowWorkerConfig = {
201209
/**
202210
* Resolved flow configuration with all defaults applied
203211
*/
204-
export type ResolvedFlowWorkerConfig = Required<Omit<FlowWorkerConfig, 'connectionString' | 'env'>> & {
212+
export type ResolvedFlowWorkerConfig = Required<Omit<FlowWorkerConfig, 'connectionString' | 'env' | 'ensureCompiledOnStartup'>> & {
205213
connectionString: string | undefined;
206214
env: Record<string, string | undefined>;
207215
};

pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { FlowShapeMismatchError } from './errors.js';
99
export interface FlowLifecycleConfig {
1010
heartbeatInterval?: number;
1111
isLocalEnvironment?: boolean;
12+
ensureCompiledOnStartup?: boolean;
1213
}
1314

1415
/**
@@ -25,6 +26,7 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
2526
private heartbeatInterval: number;
2627
private lastHeartbeat = 0;
2728
private isLocalEnvironment: boolean;
29+
private ensureCompiledOnStartup: boolean;
2830

2931
constructor(queries: Queries, flow: TFlow, logger: Logger, config?: FlowLifecycleConfig) {
3032
this.queries = queries;
@@ -33,6 +35,7 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
3335
this.workerState = new WorkerState(logger);
3436
this.heartbeatInterval = config?.heartbeatInterval ?? 5000;
3537
this.isLocalEnvironment = config?.isLocalEnvironment ?? false;
38+
this.ensureCompiledOnStartup = config?.ensureCompiledOnStartup ?? true;
3639
}
3740

3841
async acknowledgeStart(workerBootstrap: WorkerBootstrap): Promise<void> {
@@ -42,7 +45,11 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
4245
this._workerId = workerBootstrap.workerId;
4346

4447
// Compile/verify flow as part of Starting (before registering worker)
45-
await this.ensureFlowCompiled();
48+
if (this.ensureCompiledOnStartup) {
49+
await this.ensureFlowCompiled();
50+
} else {
51+
this.logger.info(`Skipping compilation check for flow '${this.flow.slug}' (ensureCompiledOnStartup=false)`);
52+
}
4653

4754
// Only register worker after successful compilation
4855
this.workerRow = await this.queries.onWorkerStarted({

pkgs/edge-worker/src/flow/createFlowWorker.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ export function createFlowWorker<TFlow extends AnyFlow, TResources extends Recor
9090
queries,
9191
flow,
9292
createLogger('FlowWorkerLifecycle'),
93-
{ isLocalEnvironment: platformAdapter.isLocalEnvironment }
93+
{
94+
isLocalEnvironment: platformAdapter.isLocalEnvironment,
95+
ensureCompiledOnStartup: config.ensureCompiledOnStartup ?? true
96+
}
9497
);
9598

9699
// Create frozen worker config ONCE for reuse across all task executions

pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,3 +307,138 @@ Deno.test(
307307
}
308308
})
309309
);
310+
311+
// Tests for ensureCompiledOnStartup config option
312+
313+
Deno.test(
314+
'skips compilation when ensureCompiledOnStartup is false',
315+
withPgNoTransaction(async (sql) => {
316+
await sql`select pgflow_tests.reset_db();`;
317+
318+
// Verify flow does NOT exist
319+
const [flowBefore] = await sql`
320+
SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow'
321+
`;
322+
assertEquals(flowBefore, undefined, 'Flow should not exist before worker startup');
323+
324+
// Create worker with ensureCompiledOnStartup: false
325+
const worker = createFlowWorker(
326+
TestCompilationFlow,
327+
{
328+
sql,
329+
ensureCompiledOnStartup: false, // SKIP compilation
330+
maxConcurrent: 1,
331+
batchSize: 10,
332+
maxPollSeconds: 1,
333+
pollIntervalMs: 200,
334+
},
335+
createLogger,
336+
createPlatformAdapterWithLocalEnv(sql, false)
337+
);
338+
339+
try {
340+
worker.startOnlyOnce({
341+
edgeFunctionName: 'test_compilation',
342+
workerId: crypto.randomUUID(),
343+
});
344+
await delay(100);
345+
346+
// Flow should NOT have been created (compilation was skipped)
347+
const [flowAfter] = await sql`
348+
SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow'
349+
`;
350+
assertEquals(flowAfter, undefined, 'Flow should NOT be created when compilation skipped');
351+
} finally {
352+
await worker.stop();
353+
}
354+
})
355+
);
356+
357+
Deno.test(
358+
'compiles flow when ensureCompiledOnStartup is explicitly true',
359+
withPgNoTransaction(async (sql) => {
360+
await sql`select pgflow_tests.reset_db();`;
361+
362+
// Verify flow does NOT exist
363+
const [flowBefore] = await sql`
364+
SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow'
365+
`;
366+
assertEquals(flowBefore, undefined, 'Flow should not exist before worker startup');
367+
368+
// Create worker with ensureCompiledOnStartup: true (explicit)
369+
const worker = createFlowWorker(
370+
TestCompilationFlow,
371+
{
372+
sql,
373+
ensureCompiledOnStartup: true, // EXPLICIT true
374+
maxConcurrent: 1,
375+
batchSize: 10,
376+
maxPollSeconds: 1,
377+
pollIntervalMs: 200,
378+
},
379+
createLogger,
380+
createPlatformAdapterWithLocalEnv(sql, false)
381+
);
382+
383+
try {
384+
worker.startOnlyOnce({
385+
edgeFunctionName: 'test_compilation',
386+
workerId: crypto.randomUUID(),
387+
});
388+
await delay(100);
389+
390+
// Flow SHOULD have been created
391+
const [flowAfter] = await sql`
392+
SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow'
393+
`;
394+
assertEquals(flowAfter?.flow_slug, 'test_compilation_flow', 'Flow should be created when ensureCompiledOnStartup is true');
395+
} finally {
396+
await worker.stop();
397+
}
398+
})
399+
);
400+
401+
Deno.test(
402+
'worker still registers and polls when ensureCompiledOnStartup is false',
403+
withPgNoTransaction(async (sql) => {
404+
await sql`select pgflow_tests.reset_db();`;
405+
406+
// Pre-compile the flow manually (simulating pre-compiled via CLI)
407+
await sql`SELECT pgflow.create_flow('test_compilation_flow')`;
408+
await sql`SELECT pgflow.add_step('test_compilation_flow', 'double')`;
409+
410+
const workerId = crypto.randomUUID();
411+
412+
// Create worker with ensureCompiledOnStartup: false
413+
const worker = createFlowWorker(
414+
TestCompilationFlow,
415+
{
416+
sql,
417+
ensureCompiledOnStartup: false, // Skip compilation check
418+
maxConcurrent: 1,
419+
batchSize: 10,
420+
maxPollSeconds: 1,
421+
pollIntervalMs: 200,
422+
},
423+
createLogger,
424+
createPlatformAdapterWithLocalEnv(sql, false)
425+
);
426+
427+
try {
428+
worker.startOnlyOnce({
429+
edgeFunctionName: 'test_compilation',
430+
workerId,
431+
});
432+
await delay(100);
433+
434+
// Worker should have registered (check workers table for this specific worker)
435+
const workers = await sql`
436+
SELECT * FROM pgflow.workers WHERE worker_id = ${workerId}
437+
`;
438+
assertEquals(workers.length, 1, 'Worker should be registered even when skipping compilation');
439+
assertEquals(workers[0].queue_name, 'test_compilation_flow', 'Worker should be registered for the correct queue');
440+
} finally {
441+
await worker.stop();
442+
}
443+
})
444+
);
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
import { assertEquals } from '@std/assert';
2+
import { FlowWorkerLifecycle } from '../../src/flow/FlowWorkerLifecycle.ts';
3+
import { Queries, type EnsureFlowCompiledResult } from '../../src/core/Queries.ts';
4+
import type { WorkerRow } from '../../src/core/types.ts';
5+
import { Flow, type FlowShape } from '@pgflow/dsl';
6+
import type { Logger } from '../../src/platform/types.ts';
7+
import type { postgres } from '../sql.ts';
8+
9+
// Mock Queries
10+
class MockQueries extends Queries {
11+
public ensureFlowCompiledCallCount = 0;
12+
13+
constructor() {
14+
// Pass null as sql since we'll override all methods
15+
super(null as unknown as postgres.Sql);
16+
}
17+
18+
override onWorkerStarted(params: { workerId: string; edgeFunctionName: string; queueName: string }): Promise<WorkerRow> {
19+
return Promise.resolve({
20+
worker_id: params.workerId,
21+
queue_name: params.queueName,
22+
function_name: params.edgeFunctionName,
23+
started_at: new Date().toISOString(),
24+
deprecated_at: null,
25+
last_heartbeat_at: new Date().toISOString(),
26+
});
27+
}
28+
29+
override sendHeartbeat(_workerRow: WorkerRow): Promise<{ is_deprecated: boolean }> {
30+
return Promise.resolve({ is_deprecated: false });
31+
}
32+
33+
override ensureFlowCompiled(
34+
_flowSlug: string,
35+
_shape: FlowShape,
36+
_mode: 'development' | 'production'
37+
): Promise<EnsureFlowCompiledResult> {
38+
this.ensureFlowCompiledCallCount++;
39+
return Promise.resolve({ status: 'verified', differences: [] });
40+
}
41+
}
42+
43+
// Real Flow for testing - using the DSL to create a valid flow
44+
const TestFlow = new Flow<{ value: number }>({ slug: 'test_flow' })
45+
.step({ slug: 'step1' }, (input) => input.run.value);
46+
47+
const createMockFlow = () => TestFlow;
48+
49+
const createLogger = (): Logger => ({
50+
debug: () => {},
51+
info: () => {},
52+
error: () => {},
53+
warn: () => {},
54+
});
55+
56+
Deno.test('FlowWorkerLifecycle - calls ensureFlowCompiled when ensureCompiledOnStartup is true', async () => {
57+
const mockQueries = new MockQueries();
58+
const mockFlow = createMockFlow();
59+
const logger = createLogger();
60+
61+
const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger, {
62+
ensureCompiledOnStartup: true
63+
});
64+
65+
await lifecycle.acknowledgeStart({
66+
workerId: 'test-worker-id',
67+
edgeFunctionName: 'test-function',
68+
});
69+
70+
assertEquals(mockQueries.ensureFlowCompiledCallCount, 1, 'ensureFlowCompiled should be called once');
71+
});
72+
73+
Deno.test('FlowWorkerLifecycle - skips ensureFlowCompiled when ensureCompiledOnStartup is false', async () => {
74+
const mockQueries = new MockQueries();
75+
const mockFlow = createMockFlow();
76+
const logger = createLogger();
77+
78+
const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger, {
79+
ensureCompiledOnStartup: false
80+
});
81+
82+
await lifecycle.acknowledgeStart({
83+
workerId: 'test-worker-id',
84+
edgeFunctionName: 'test-function',
85+
});
86+
87+
assertEquals(mockQueries.ensureFlowCompiledCallCount, 0, 'ensureFlowCompiled should NOT be called');
88+
});
89+
90+
Deno.test('FlowWorkerLifecycle - calls ensureFlowCompiled by default (no config)', async () => {
91+
const mockQueries = new MockQueries();
92+
const mockFlow = createMockFlow();
93+
const logger = createLogger();
94+
95+
const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger);
96+
97+
await lifecycle.acknowledgeStart({
98+
workerId: 'test-worker-id',
99+
edgeFunctionName: 'test-function',
100+
});
101+
102+
assertEquals(mockQueries.ensureFlowCompiledCallCount, 1, 'ensureFlowCompiled should be called by default');
103+
});
104+
105+
Deno.test('FlowWorkerLifecycle - calls ensureFlowCompiled by default (empty config)', async () => {
106+
const mockQueries = new MockQueries();
107+
const mockFlow = createMockFlow();
108+
const logger = createLogger();
109+
110+
const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger, {});
111+
112+
await lifecycle.acknowledgeStart({
113+
workerId: 'test-worker-id',
114+
edgeFunctionName: 'test-function',
115+
});
116+
117+
assertEquals(mockQueries.ensureFlowCompiledCallCount, 1, 'ensureFlowCompiled should be called with empty config');
118+
});
119+
120+
Deno.test('FlowWorkerLifecycle - logs skip message when ensureCompiledOnStartup is false', async () => {
121+
const logs: string[] = [];
122+
const testLogger: Logger = {
123+
debug: () => {},
124+
info: (msg: string) => logs.push(msg),
125+
error: () => {},
126+
warn: () => {},
127+
};
128+
129+
const mockQueries = new MockQueries();
130+
const mockFlow = createMockFlow();
131+
132+
const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, testLogger, {
133+
ensureCompiledOnStartup: false
134+
});
135+
136+
await lifecycle.acknowledgeStart({
137+
workerId: 'test-worker-id',
138+
edgeFunctionName: 'test-function',
139+
});
140+
141+
const skipLog = logs.find(log => log.includes('Skipping compilation'));
142+
assertEquals(skipLog !== undefined, true, 'Should log skip message');
143+
assertEquals(skipLog?.includes('ensureCompiledOnStartup=false'), true, 'Skip message should mention the config flag');
144+
});
145+
146+
Deno.test('FlowWorkerLifecycle - does not log skip message when ensureCompiledOnStartup is true', async () => {
147+
const logs: string[] = [];
148+
const testLogger: Logger = {
149+
debug: () => {},
150+
info: (msg: string) => logs.push(msg),
151+
error: () => {},
152+
warn: () => {},
153+
};
154+
155+
const mockQueries = new MockQueries();
156+
const mockFlow = createMockFlow();
157+
158+
const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, testLogger, {
159+
ensureCompiledOnStartup: true
160+
});
161+
162+
await lifecycle.acknowledgeStart({
163+
workerId: 'test-worker-id',
164+
edgeFunctionName: 'test-function',
165+
});
166+
167+
const skipLog = logs.find(log => log.includes('Skipping compilation'));
168+
assertEquals(skipLog, undefined, 'Should NOT log skip message when compilation is enabled');
169+
});

0 commit comments

Comments
 (0)