Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
374 changes: 79 additions & 295 deletions PLAN.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkgs/edge-worker/deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 33 additions & 1 deletion pkgs/edge-worker/src/core/Queries.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import type postgres from 'postgres';
import type { WorkerRow } from './types.js';
import type { FlowShape, Json } from '@pgflow/dsl';

export type EnsureFlowCompiledStatus = 'compiled' | 'verified' | 'recompiled' | 'mismatch';

export interface EnsureFlowCompiledResult {
status: EnsureFlowCompiledStatus;
differences: string[];
}

export class Queries {
constructor(private readonly sql: postgres.Sql) {}
Expand Down Expand Up @@ -40,7 +48,31 @@ export class Queries {
WHERE w.worker_id = ${workerRow.worker_id}
RETURNING (w.deprecated_at IS NOT NULL) AS is_deprecated;
`;

return result || { is_deprecated: false };
}

async ensureFlowCompiled(
flowSlug: string,
shape: FlowShape,
mode: 'development' | 'production'
): Promise<EnsureFlowCompiledResult> {
// SAFETY: FlowShape is JSON-compatible by construction (only strings, numbers,
// arrays, and plain objects), but TypeScript can't prove this because FlowShape
// uses specific property names while Json uses index signatures. This cast is
// safe because we control both sides: extractFlowShape() builds the object and
// this method consumes it - no untrusted input crosses this boundary.
//
// TODO: If FlowShape ever becomes part of a public API or accepts external input,
// add a runtime assertion function (assertJsonCompatible) to validate at the boundary.
const shapeJson = this.sql.json(shape as unknown as Json);
const [result] = await this.sql<{ result: EnsureFlowCompiledResult }[]>`
SELECT pgflow.ensure_flow_compiled(
${flowSlug},
${shapeJson}::jsonb,
${mode}
) as result
`;
return result.result;
}
}
45 changes: 45 additions & 0 deletions pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,17 @@ import type { ILifecycle, WorkerBootstrap, WorkerRow } from '../core/types.js';
import type { Logger } from '../platform/types.js';
import { States, WorkerState } from '../core/WorkerState.js';
import type { AnyFlow } from '@pgflow/dsl';
import { extractFlowShape } from '@pgflow/dsl';
import {
isLocalSupabase,
KNOWN_LOCAL_ANON_KEY,
KNOWN_LOCAL_SERVICE_ROLE_KEY,
} from '../shared/localDetection.js';
import { FlowShapeMismatchError } from './errors.js';

export interface FlowLifecycleConfig {
heartbeatInterval?: number;
env?: Record<string, string | undefined>;
}

/**
Expand All @@ -21,13 +29,15 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
private _workerId?: string;
private heartbeatInterval: number;
private lastHeartbeat = 0;
private env?: Record<string, string | undefined>;

constructor(queries: Queries, flow: TFlow, logger: Logger, config?: FlowLifecycleConfig) {
this.queries = queries;
this.flow = flow;
this.logger = logger;
this.workerState = new WorkerState(logger);
this.heartbeatInterval = config?.heartbeatInterval ?? 5000;
this.env = config?.env;
}

async acknowledgeStart(workerBootstrap: WorkerBootstrap): Promise<void> {
Expand All @@ -36,6 +46,10 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
// Store workerId for supplier pattern
this._workerId = workerBootstrap.workerId;

// Compile/verify flow as part of Starting (before registering worker)
await this.ensureFlowCompiled();

// Only register worker after successful compilation
this.workerRow = await this.queries.onWorkerStarted({
queueName: this.queueName,
...workerBootstrap,
Expand All @@ -44,6 +58,37 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
this.workerState.transitionTo(States.Running);
}

private async ensureFlowCompiled(): Promise<void> {
this.logger.info(`Ensuring flow '${this.flow.slug}' is compiled...`);

const shape = extractFlowShape(this.flow);
const mode = this.detectCompilationMode();

const result = await this.queries.ensureFlowCompiled(
this.flow.slug,
shape,
mode
);

if (result.status === 'mismatch') {
throw new FlowShapeMismatchError(this.flow.slug, result.differences);
}

this.logger.info(`Flow '${this.flow.slug}' ${result.status} (mode: ${mode})`);
}

private detectCompilationMode(): 'development' | 'production' {
// Use provided env if available, otherwise fall back to global detection
if (this.env) {
const anonKey = this.env['SUPABASE_ANON_KEY'];
const serviceRoleKey = this.env['SUPABASE_SERVICE_ROLE_KEY'];
const isLocal = anonKey === KNOWN_LOCAL_ANON_KEY ||
serviceRoleKey === KNOWN_LOCAL_SERVICE_ROLE_KEY;
return isLocal ? 'development' : 'production';
}
return isLocalSupabase() ? 'development' : 'production';
}

acknowledgeStop() {
this.workerState.transitionTo(States.Stopping);

Expand Down
3 changes: 2 additions & 1 deletion pkgs/edge-worker/src/flow/createFlowWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ export function createFlowWorker<TFlow extends AnyFlow, TResources extends Recor
const lifecycle = new FlowWorkerLifecycle<TFlow>(
queries,
flow,
createLogger('FlowWorkerLifecycle')
createLogger('FlowWorkerLifecycle'),
{ env: platformAdapter.env }
);

// Create frozen worker config ONCE for reuse across all task executions
Expand Down
18 changes: 18 additions & 0 deletions pkgs/edge-worker/src/flow/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Error thrown when flow shape in code doesn't match database schema in production mode.
* Worker should crash on this error - no recovery possible without migration.
*/
export class FlowShapeMismatchError extends Error {
constructor(
public readonly flowSlug: string,
public readonly differences: string[]
) {
super(
`Flow '${flowSlug}' shape mismatch with database.\n` +
`Run migrations or use development mode to recompile.\n` +
`Differences:\n` +
differences.map(d => ` - ${d}`).join('\n')
);
this.name = 'FlowShapeMismatchError';
}
}
Loading