Skip to content

Commit c5ef186

Browse files
committed
add flow compilation at worker startup with SQL direct call
1 parent 170589a commit c5ef186

File tree

8 files changed

+448
-299
lines changed

8 files changed

+448
-299
lines changed

PLAN.md

Lines changed: 79 additions & 295 deletions
Large diffs are not rendered by default.

pkgs/edge-worker/deno.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkgs/edge-worker/src/core/Queries.ts

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
import type postgres from 'postgres';
22
import type { WorkerRow } from './types.js';
3+
import type { FlowShape, Json } from '@pgflow/dsl';
4+
5+
export type EnsureFlowCompiledStatus = 'compiled' | 'verified' | 'recompiled' | 'mismatch';
6+
7+
export interface EnsureFlowCompiledResult {
8+
status: EnsureFlowCompiledStatus;
9+
differences: string[];
10+
}
311

412
export class Queries {
513
constructor(private readonly sql: postgres.Sql) {}
@@ -40,7 +48,31 @@ export class Queries {
4048
WHERE w.worker_id = ${workerRow.worker_id}
4149
RETURNING (w.deprecated_at IS NOT NULL) AS is_deprecated;
4250
`;
43-
51+
4452
return result || { is_deprecated: false };
4553
}
54+
55+
async ensureFlowCompiled(
56+
flowSlug: string,
57+
shape: FlowShape,
58+
mode: 'development' | 'production'
59+
): Promise<EnsureFlowCompiledResult> {
60+
// SAFETY: FlowShape is JSON-compatible by construction (only strings, numbers,
61+
// arrays, and plain objects), but TypeScript can't prove this because FlowShape
62+
// uses specific property names while Json uses index signatures. This cast is
63+
// safe because we control both sides: extractFlowShape() builds the object and
64+
// this method consumes it - no untrusted input crosses this boundary.
65+
//
66+
// TODO: If FlowShape ever becomes part of a public API or accepts external input,
67+
// add a runtime assertion function (assertJsonCompatible) to validate at the boundary.
68+
const shapeJson = this.sql.json(shape as unknown as Json);
69+
const [result] = await this.sql<{ result: EnsureFlowCompiledResult }[]>`
70+
SELECT pgflow.ensure_flow_compiled(
71+
${flowSlug},
72+
${shapeJson}::jsonb,
73+
${mode}
74+
) as result
75+
`;
76+
return result.result;
77+
}
4678
}

pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,17 @@ import type { ILifecycle, WorkerBootstrap, WorkerRow } from '../core/types.js';
33
import type { Logger } from '../platform/types.js';
44
import { States, WorkerState } from '../core/WorkerState.js';
55
import type { AnyFlow } from '@pgflow/dsl';
6+
import { extractFlowShape } from '@pgflow/dsl';
7+
import {
8+
isLocalSupabase,
9+
KNOWN_LOCAL_ANON_KEY,
10+
KNOWN_LOCAL_SERVICE_ROLE_KEY,
11+
} from '../shared/localDetection.js';
12+
import { FlowShapeMismatchError } from './errors.js';
613

714
export interface FlowLifecycleConfig {
815
heartbeatInterval?: number;
16+
env?: Record<string, string | undefined>;
917
}
1018

1119
/**
@@ -21,13 +29,15 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
2129
private _workerId?: string;
2230
private heartbeatInterval: number;
2331
private lastHeartbeat = 0;
32+
private env?: Record<string, string | undefined>;
2433

2534
constructor(queries: Queries, flow: TFlow, logger: Logger, config?: FlowLifecycleConfig) {
2635
this.queries = queries;
2736
this.flow = flow;
2837
this.logger = logger;
2938
this.workerState = new WorkerState(logger);
3039
this.heartbeatInterval = config?.heartbeatInterval ?? 5000;
40+
this.env = config?.env;
3141
}
3242

3343
async acknowledgeStart(workerBootstrap: WorkerBootstrap): Promise<void> {
@@ -36,6 +46,10 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
3646
// Store workerId for supplier pattern
3747
this._workerId = workerBootstrap.workerId;
3848

49+
// Compile/verify flow as part of Starting (before registering worker)
50+
await this.ensureFlowCompiled();
51+
52+
// Only register worker after successful compilation
3953
this.workerRow = await this.queries.onWorkerStarted({
4054
queueName: this.queueName,
4155
...workerBootstrap,
@@ -44,6 +58,37 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
4458
this.workerState.transitionTo(States.Running);
4559
}
4660

61+
private async ensureFlowCompiled(): Promise<void> {
62+
this.logger.info(`Ensuring flow '${this.flow.slug}' is compiled...`);
63+
64+
const shape = extractFlowShape(this.flow);
65+
const mode = this.detectCompilationMode();
66+
67+
const result = await this.queries.ensureFlowCompiled(
68+
this.flow.slug,
69+
shape,
70+
mode
71+
);
72+
73+
if (result.status === 'mismatch') {
74+
throw new FlowShapeMismatchError(this.flow.slug, result.differences);
75+
}
76+
77+
this.logger.info(`Flow '${this.flow.slug}' ${result.status} (mode: ${mode})`);
78+
}
79+
80+
private detectCompilationMode(): 'development' | 'production' {
81+
// Use provided env if available, otherwise fall back to global detection
82+
if (this.env) {
83+
const anonKey = this.env['SUPABASE_ANON_KEY'];
84+
const serviceRoleKey = this.env['SUPABASE_SERVICE_ROLE_KEY'];
85+
const isLocal = anonKey === KNOWN_LOCAL_ANON_KEY ||
86+
serviceRoleKey === KNOWN_LOCAL_SERVICE_ROLE_KEY;
87+
return isLocal ? 'development' : 'production';
88+
}
89+
return isLocalSupabase() ? 'development' : 'production';
90+
}
91+
4792
acknowledgeStop() {
4893
this.workerState.transitionTo(States.Stopping);
4994

pkgs/edge-worker/src/flow/createFlowWorker.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ export function createFlowWorker<TFlow extends AnyFlow, TResources extends Recor
8989
const lifecycle = new FlowWorkerLifecycle<TFlow>(
9090
queries,
9191
flow,
92-
createLogger('FlowWorkerLifecycle')
92+
createLogger('FlowWorkerLifecycle'),
93+
{ env: platformAdapter.env }
9394
);
9495

9596
// Create frozen worker config ONCE for reuse across all task executions
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/**
2+
* Error thrown when flow shape in code doesn't match database schema in production mode.
3+
* Worker should crash on this error - no recovery possible without migration.
4+
*/
5+
export class FlowShapeMismatchError extends Error {
6+
constructor(
7+
public readonly flowSlug: string,
8+
public readonly differences: string[]
9+
) {
10+
super(
11+
`Flow '${flowSlug}' shape mismatch with database.\n` +
12+
`Run migrations or use development mode to recompile.\n` +
13+
`Differences:\n` +
14+
differences.map(d => ` - ${d}`).join('\n')
15+
);
16+
this.name = 'FlowShapeMismatchError';
17+
}
18+
}

0 commit comments

Comments
 (0)