From c5ef1867eeb23595fc69a16cbb08f03f1b150d2b Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Mon, 1 Dec 2025 06:32:13 +0100 Subject: [PATCH] add flow compilation at worker startup with SQL direct call --- PLAN.md | 374 ++++-------------- pkgs/edge-worker/deno.lock | 1 + pkgs/edge-worker/src/core/Queries.ts | 34 +- .../src/flow/FlowWorkerLifecycle.ts | 45 +++ pkgs/edge-worker/src/flow/createFlowWorker.ts | 3 +- pkgs/edge-worker/src/flow/errors.ts | 18 + .../flow/compilationAtStartup.test.ts | 261 ++++++++++++ .../FlowWorkerLifecycle.deprecation.test.ts | 11 +- 8 files changed, 448 insertions(+), 299 deletions(-) create mode 100644 pkgs/edge-worker/src/flow/errors.ts create mode 100644 pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts diff --git a/PLAN.md b/PLAN.md index db9a5ec82..45a9f3353 100644 --- a/PLAN.md +++ b/PLAN.md @@ -1,4 +1,4 @@ -# PLAN: Auto-Compilation via ControlPlane +# PLAN: Auto-Compilation at Worker Startup ## Goals 1. **No manual `pgflow compile`** - eliminate CLI compilation step @@ -11,12 +11,12 @@ | Decision | Choice | Rationale | |----------|--------|-----------| | Shape Storage | Compute on-the-fly | No DB migration, shows exact differences | -| Compilation Path | Worker → ControlPlane HTTP → SQL function | Central auth + transactional logic | +| Compilation Path | Worker → SQL function directly | Simpler, no HTTP overhead | | Prod Missing Flow | Auto-compile | Enables first-time deployment | | Prod Shape Mismatch | Fail fast | Prevents accidental overwrites | | Dev Behavior | Always recompile | Seamless iteration | | Strict Mode | Deferred (YAGNI) | Can be added later, CI/CD achieves same | -| Local Detection | Known local Supabase keys in ControlPlane | Cryptographic certainty, ControlPlane decides (not Worker) | +| Local Detection | Known local Supabase keys in Worker | Cryptographic certainty, PlatformAdapter decides | ## Compilation Modes @@ -33,10 +33,7 @@ **CRITICAL:** Development mode allows `delete_flow_and_data()` which destroys ALL flow data. False positive detection (thinking we're local when actually in production) would be catastrophic. We use a **default-to-production** approach with cryptographic certainty for local detection. -**IMPORTANT:** Detection happens in **ControlPlane**, not Worker. This ensures: -1. Worker can't spoof the mode - ControlPlane decides based on its own environment -2. Detection logic lives where the destructive action happens -3. Single source of truth for security-critical decision +**Detection happens in Worker via PlatformAdapter.** The `isLocalEnvironment` property checks for known local Supabase keys - this is cryptographically safe since production keys are unique per-project. ### Known Local Supabase Keys @@ -55,9 +52,9 @@ SERVICE_ROLE_KEY: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZ - The JWT payload contains `"iss": "supabase-demo"` for local vs `"iss": "supabase"` with project ref for production - It is **impossible** for a production Supabase project to accidentally have these keys -### Shared Detection Module: `pkgs/edge-worker/src/shared/localDetection.ts` +### Detection Module: `pkgs/edge-worker/src/shared/localDetection.ts` -Extracted to a shared module for reuse by ControlPlane (and optionally Worker for logging): +Shared module used by PlatformAdapter to detect local environment: ```typescript /** @@ -77,7 +74,7 @@ export const KNOWN_LOCAL_SERVICE_ROLE_KEY = * SAFETY: Returns false (production) unless keys EXACTLY match known local values. * This is cryptographically safe - production keys are unique per-project. * - * Used by ControlPlane to determine compilation mode. + * Used by PlatformAdapter.isLocalEnvironment to determine compilation mode. */ export function isLocalSupabase(): boolean { const anonKey = Deno.env.get('SUPABASE_ANON_KEY'); @@ -105,99 +102,28 @@ If a user has overwritten their local `JWT_SECRET` in `config.toml`, detection w --- -## Authentication - -Workers authenticate with ControlPlane using the Supabase service role key (zero-config): - -``` -Env var: SUPABASE_SERVICE_ROLE_KEY (automatically available in Edge Functions) -Header: apikey: -``` - -**Setup:** -- No setup required - both ControlPlane and Worker Edge Functions automatically have access to `SUPABASE_SERVICE_ROLE_KEY` via `Deno.env` -- Workers include `apikey` header in compilation requests -- ControlPlane verifies `apikey` header matches `SUPABASE_SERVICE_ROLE_KEY` env var - -**ControlPlane Verification:** -```typescript -function verifyAuth(request: Request): boolean { - const serviceRoleKey = Deno.env.get('SUPABASE_SERVICE_ROLE_KEY'); - if (!serviceRoleKey) return false; // Not configured - reject all - const apikey = request.headers.get('apikey'); - return apikey === serviceRoleKey; -} -``` - ---- - ## Architecture Overview -**Worker → ControlPlane HTTP → SQL Function** +**Worker → SQL Function Directly** ``` Worker.start(MyFlow) │ ├── extractFlowShape(flow) → shape │ - └── POST /flows/:slug/ensure-compiled - │ Body: { shape } <-- NO mode field - │ Headers: { apikey: SUPABASE_SERVICE_ROLE_KEY } + ├── Detect environment (PlatformAdapter.isLocalEnvironment) + │ └── mode = isLocal ? 'development' : 'production' + │ + └── Direct SQL call: pgflow.ensure_flow_compiled(slug, shape, mode) │ - └── ControlPlane - │ - ├── 1. Verify auth (apikey === SUPABASE_SERVICE_ROLE_KEY) - │ └── If invalid: 401 "Unauthorized" - │ - ├── 2. Validate request body (shape) - │ └── If invalid: 400 "Bad Request" - │ - ├── 3. Detect environment (isLocalSupabase()) <-- ControlPlane decides - │ └── mode = isLocal ? 'development' : 'production' - │ - └── 4. Call SQL function with detected mode - │ - └── sql`SELECT pgflow.ensure_flow_compiled($1, $2, $3)` - │ - └── SQL Function (TRANSACTIONAL) - ├── Acquire advisory lock - ├── Query current shape from flows/steps/deps - ├── Compare incoming shape vs DB shape - ├── If match: return 'verified' - ├── If missing (any mode): compile, return 'compiled' - ├── If different AND mode='development': recompile - ├── If different AND mode='production': return 'mismatch' - └── Return { status, differences[] } -``` - -### HTTP Response Codes - -| Scenario | HTTP Status | Response | -|----------|-------------|----------| -| Invalid/missing apikey | 401 | `{ error: "Unauthorized" }` | -| Invalid request body | 400 | `{ error: "Bad Request" }` | -| SQL not configured | 404 | `{ error: "Not Found" }` | -| Flow compiled/verified/recompiled | 200 | `{ status, differences }` | -| Shape mismatch (production) | 409 | `{ status: "mismatch", differences }` | -| Database error | 500 | `{ error: "Database Error" }` | - ---- - -## Implementation Overview - -### Single ControlPlane Endpoint - -``` -POST /flows/:slug/ensure-compiled - Headers: { apikey: SUPABASE_SERVICE_ROLE_KEY } - Body: { - shape: FlowShape - } - Response: { - status: 'compiled' | 'verified' | 'recompiled' | 'mismatch', - differences?: string[], - mode: 'development' | 'production' // Detected by ControlPlane - } + └── SQL Function (TRANSACTIONAL) + ├── Check if flow exists + ├── If missing: compile, return 'compiled' + ├── If exists: compare shapes + ├── If match: return 'verified' + ├── If different AND mode='development': recompile + ├── If different AND mode='production': return 'mismatch' + └── Return { status, differences[] } ``` ### Worker Startup Flow @@ -209,7 +135,10 @@ Worker.start(MyFlow) extractFlowShape(flow) --> FlowShape | v -POST /ensure-compiled { shape } <-- No mode, ControlPlane detects it +mode = PlatformAdapter.isLocalEnvironment ? 'development' : 'production' + | + v +sql`SELECT pgflow.ensure_flow_compiled(slug, shape, mode)` | v [status === 'mismatch'?] ----yes----> throw FlowShapeMismatchError(differences) @@ -377,70 +306,7 @@ END IF; --- -## Phase 3: ControlPlane Endpoint (Edge-Worker Package) - -### Modify: `pkgs/edge-worker/src/control-plane/server.ts` - -Add single endpoint that handles authentication, validation, environment detection, and SQL call: - -```typescript -import { isLocalSupabase } from '../shared/localDetection.ts'; - -// POST /flows/:slug/ensure-compiled -async function handleEnsureCompiled( - request: Request, - flowSlug: string, - options?: ControlPlaneOptions -): Promise { - // 1. Check SQL is configured - if (!options?.sql) { - return jsonResponse({ error: 'Not Found', message: '...' }, 404); - } - - // 2. Verify authentication - if (!verifyAuth(request)) { - return jsonResponse({ error: 'Unauthorized', message: '...' }, 401); - } - - // 3. Parse and validate request body (shape only, no mode) - const { shape } = await parseAndValidateBody(request); - - // 4. Detect environment - ControlPlane decides, Worker can't spoof - const mode = isLocalSupabase() ? 'development' : 'production'; - - // 5. Call SQL function with detected mode - const [result] = await options.sql` - SELECT pgflow.ensure_flow_compiled( - ${flowSlug}, - ${JSON.stringify(shape)}::jsonb, - ${mode} - ) as result - `; - - // Include detected mode in response for transparency - const response = { ...result.result, mode }; - return jsonResponse(response, result.result.status === 'mismatch' ? 409 : 200); -} - -function verifyAuth(request: Request): boolean { - const serviceRoleKey = Deno.env.get('SUPABASE_SERVICE_ROLE_KEY'); - if (!serviceRoleKey) return false; - const apikey = request.headers.get('apikey'); - return apikey === serviceRoleKey; -} -``` - -**HTTP Status Codes:** -- `200` - compiled, verified, or recompiled -- `400 Bad Request` - invalid JSON or missing shape -- `401 Unauthorized` - invalid or missing apikey -- `404 Not Found` - SQL not configured -- `409 Conflict` - shape mismatch in production mode -- `500 Internal Server Error` - database error - ---- - -## Phase 3.5: Include Options in FlowShape +## Phase 3: Include Options in FlowShape Options (maxAttempts, baseDelay, timeout, startDelay) must be included in FlowShape for proper flow creation, while remaining excluded from shape comparison (options can be tuned at runtime without recompilation). @@ -480,23 +346,9 @@ This prevents drift - defaults are defined in ONE place (inside the function), n ## Phase 4: Worker Configuration (Edge-Worker Package) -### Modify: `pkgs/edge-worker/src/core/workerConfigTypes.ts` +Worker configuration is simplified - mode detection happens via `PlatformAdapter.isLocalEnvironment` which checks for known local Supabase keys. -Worker configuration is simplified - no mode detection needed since ControlPlane handles it: - -```typescript -export interface FlowWorkerConfig { - // ... existing fields ... - - /** - * ControlPlane URL for compilation endpoints - * @default derived from SUPABASE_URL + '/functions/v1/pgflow' - */ - controlPlaneUrl?: string; -} -``` - -**Note:** `compilationMode` config removed from Worker. ControlPlane detects environment and decides mode. This is more secure - Worker can't accidentally or maliciously request development mode in production. +**Note:** No `compilationMode` config needed. Mode is auto-detected from environment. No `controlPlaneUrl` needed since compilation uses direct SQL calls. --- @@ -504,40 +356,34 @@ export interface FlowWorkerConfig { ### Modify: `pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts` -Add compilation verification before `acknowledgeStart()`. Worker just sends shape - ControlPlane detects mode: +Add compilation verification before `acknowledgeStart()`. Worker detects mode and calls SQL directly: ```typescript -async verifyOrCompileFlow(): Promise { +private async ensureFlowCompiled(): Promise { + const mode = this.isLocalEnvironment ? 'development' : 'production'; + this.logger.info(`Compiling flow '${this.flow.slug}' (mode: ${mode})...`); + const shape = extractFlowShape(this.flow); - const response = await fetch( - `${this.controlPlaneUrl}/flows/${this.flow.slug}/ensure-compiled`, - { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'apikey': Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!, - }, - body: JSON.stringify({ shape }), // No mode - ControlPlane detects it - } + const result = await this.queries.ensureFlowCompiled( + this.flow.slug, + shape, + mode ); - const result = await response.json(); - if (result.status === 'mismatch') { throw new FlowShapeMismatchError(this.flow.slug, result.differences); } - // Log compilation result (mode comes from ControlPlane response) - console.log(`[pgflow] Flow '${this.flow.slug}' ${result.status} (mode: ${result.mode})`); + this.logger.info(`Flow '${this.flow.slug}' ${result.status}`); } ``` -**Note:** No `detectCompilationMode()` method needed in Worker - ControlPlane handles all detection logic. This simplifies the Worker and improves security. +**Note:** Mode detection via `PlatformAdapter.isLocalEnvironment` - checks for known local Supabase keys. ### Modify: `pkgs/edge-worker/src/flow/createFlowWorker.ts` -Call verification before starting: +Pass `isLocalEnvironment` from PlatformAdapter to FlowWorkerLifecycle: ```typescript export async function createFlowWorker(...) { @@ -556,18 +402,18 @@ export async function createFlowWorker(...) { | Package | File | Changes | |---------|------|---------| -| **DSL** | `pkgs/dsl/src/flow-shape.ts` | NEW - `FlowShape` interface + `extractFlowShape()` | -| **DSL** | `pkgs/dsl/src/index.ts` | Export new types/functions | -| **Core** | `pkgs/core/schemas/0100_function_ensure_flow_compiled.sql` | NEW - Main SQL function | -| **Core** | `pkgs/core/schemas/0100_function_get_flow_shape.sql` | NEW - Helper to query shape from DB | -| **Core** | `pkgs/core/schemas/0100_function_compare_flow_shapes.sql` | NEW - Shape comparison logic | -| **Core** | `pkgs/core/schemas/0100_function_compile_flow_from_shape.sql` | NEW - Compile from JSONB | -| **Core** | `pkgs/core/schemas/0100_function_delete_flow_and_data.sql` | PROMOTE from tests - Full flow deletion | -| **Edge** | `pkgs/edge-worker/src/shared/localDetection.ts` | NEW - Known local keys + `isLocalSupabase()` (shared module) | -| **Edge** | `pkgs/edge-worker/src/control-plane/server.ts` | Add POST `/ensure-compiled` endpoint + import `isLocalSupabase()` | -| **Edge** | `pkgs/edge-worker/src/core/workerConfigTypes.ts` | Add `controlPlaneUrl` config (no `compilationMode`) | -| **Edge** | `pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts` | Add `verifyOrCompileFlow()` (simplified, no mode detection) | -| **Edge** | `pkgs/edge-worker/src/flow/createFlowWorker.ts` | Call verification at startup | +| **DSL** | `pkgs/dsl/src/flow-shape.ts` | ✅ `FlowShape` interface + `extractFlowShape()` | +| **DSL** | `pkgs/dsl/src/index.ts` | ✅ Export new types/functions | +| **Core** | `pkgs/core/schemas/0100_function_ensure_flow_compiled.sql` | ✅ Main SQL function | +| **Core** | `pkgs/core/schemas/0100_function_get_flow_shape.sql` | ✅ Helper to query shape from DB | +| **Core** | `pkgs/core/schemas/0100_function_compare_flow_shapes.sql` | ✅ Shape comparison logic | +| **Core** | `pkgs/core/schemas/0100_function_create_flow_from_shape.sql` | ✅ Compile from JSONB | +| **Core** | `pkgs/core/schemas/0100_function_delete_flow_and_data.sql` | ✅ Full flow deletion | +| **Edge** | `pkgs/edge-worker/src/shared/localDetection.ts` | ✅ Known local keys + `isLocalSupabase()` | +| **Edge** | `pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts` | ✅ `isLocalEnvironment` property | +| **Edge** | `pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts` | ✅ `ensureFlowCompiled()` with direct SQL | +| **Edge** | `pkgs/edge-worker/src/flow/createFlowWorker.ts` | ✅ Pass isLocalEnvironment to lifecycle | +| **Edge** | `pkgs/edge-worker/src/core/Queries.ts` | ✅ `ensureFlowCompiled()` SQL query method | --- @@ -617,25 +463,12 @@ Test-Driven Development order - write tests FIRST, then implement: 19. Complete implementation ``` -### TDD Phase 3: ControlPlane Endpoint (Vitest + Integration) - -``` -1. Write test: POST /ensure-compiled returns 404 if SQL not configured -2. Write test: POST /ensure-compiled returns 401 for invalid apikey -3. Implement auth check using SUPABASE_SERVICE_ROLE_KEY -4. Write test: POST /ensure-compiled returns 400 for invalid body -5. Implement body validation -6. Write test: POST /ensure-compiled calls SQL function and returns result -7. Implement SQL function call -8. Write test: POST /ensure-compiled returns 409 on mismatch status -``` - -### TDD Phase 4: Worker Integration (Vitest + E2E) +### TDD Phase 3: Worker Integration (Vitest + E2E) ``` -1. Write test: Worker calls /ensure-compiled with correct shape on startup -2. Implement verifyOrCompileFlow() -3. Write test: Worker throws FlowShapeMismatchError on 409 +1. Write test: Worker calls ensureFlowCompiled SQL on startup +2. Implement ensureFlowCompiled() in FlowWorkerLifecycle +3. Write test: Worker throws FlowShapeMismatchError on mismatch 4. Implement error handling 5. Write test: Worker proceeds to polling on success 6. Wire up to createFlowWorker() @@ -656,15 +489,14 @@ Test-Driven Development order - write tests FIRST, then implement: ### Vitest Tests (DSL + Edge-Worker) 1. `extractFlowShape()` - Various flow configs (single, map, deps) 2. `compareFlowShapes()` - All difference detection scenarios -3. ControlPlane endpoint - HTTP response codes for each layer -4. `FlowWorkerLifecycle` - Mode detection and error handling +3. `FlowWorkerLifecycle` - Mode detection and error handling +4. `localDetection` - Known local key matching ### Integration Tests 1. Dev mode: Worker auto-compiles missing flow 2. Dev mode: Worker auto-recompiles when shape differs 3. Prod mode: Worker compiles missing flow (first deploy) 4. Prod mode: Worker fails when shape differs (with clear error) -5. Auth: Worker with invalid apikey gets 401 --- @@ -795,56 +627,36 @@ Test-Driven Development order - write tests FIRST, then implement: --- -### Phase 7: ControlPlane Endpoint (~0.5 day) +### Phase 7: Worker Integration (~0.5 day) **Order within phase:** -1. Add auth verification (check `SUPABASE_SERVICE_ROLE_KEY` env var) -2. Add request body validation (shape, mode) -3. Add SQL function call -4. Return appropriate HTTP status codes -5. Write Vitest tests for each response code - -**Why seventh?** -- Depends on both DSL and SQL -- Simple HTTP wrapper around tested SQL function -- Auth + validation before SQL call - -**Deliverable:** Working endpoint that validates and compiles flows. - ---- - -### Phase 8: Worker Integration (~0.5 day) - -**Order within phase:** -1. Add `CompilationMode` type to workerConfigTypes.ts -2. Add config options (`compilationMode`, `controlPlaneUrl`) -3. Implement `verifyOrCompileFlow()` in FlowWorkerLifecycle +1. Add `isLocalEnvironment` to PlatformAdapter +2. Add `ensureFlowCompiled()` query method to Queries.ts +3. Implement `ensureFlowCompiled()` in FlowWorkerLifecycle 4. Wire up in `createFlowWorker()` - call before polling starts 5. Write integration tests **Why last?** -- Depends on ControlPlane endpoint -- Simple HTTP client code +- Depends on SQL function +- Direct SQL call (no HTTP layer) - Integration tests validate entire stack -**Deliverable:** Workers auto-compile/verify at startup. +**Deliverable:** Workers auto-compile/verify at startup via direct SQL. --- ### Summary Timeline -| Phase | Component | Duration | Cumulative | -|-------|-----------|----------|------------| -| 1 | DSL types + functions | 0.5 day | 0.5 day | -| 2 | Promote delete_flow_and_data | 0.5 day | 1 day | -| 3 | SQL _get_flow_shape | 0.5 day | 1.5 days | -| 4 | SQL _compare_flow_shapes | 0.5 day | 2 days | -| 5 | SQL _compile_flow_from_shape | 0.5 day | 2.5 days | -| 6 | SQL ensure_flow_compiled | 1 day | 3.5 days | -| 7 | ControlPlane endpoint | 0.5 day | 4 days | -| 8 | Worker integration | 0.5 day | 4.5 days | - -**Total: ~4.5 days** +| Phase | Component | Status | +|-------|-----------|--------| +| 1 | DSL types + functions | ✅ Complete | +| 2 | Promote delete_flow_and_data | ✅ Complete | +| 3 | SQL _get_flow_shape | ✅ Complete | +| 4 | SQL _compare_flow_shapes | ✅ Complete | +| 5 | SQL _create_flow_from_shape | ✅ Complete | +| 6 | SQL ensure_flow_compiled | ✅ Complete | +| 7 | Worker integration (direct SQL) | ✅ Complete | +| 8 | Advisory locks | ⏳ To implement | --- @@ -856,52 +668,24 @@ Test-Driven Development order - write tests FIRST, then implement: **After Phase 6:** Full SQL stack works - can compile, verify, recompile flows via direct SQL calls. -**After Phase 7:** Full HTTP stack works - can call endpoint and get correct responses. - -**After Phase 8:** End-to-end works - worker starts, calls endpoint, gets verified/compiled. +**After Phase 7:** End-to-end works - worker starts, calls SQL directly, gets verified/compiled. --- ## Resolved Design Decisions 1. **CLI `pgflow compile` command** - KEEP. Free to maintain, useful for migration files. -2. **Advisory locks** - YES. Required for concurrent worker startups. +2. **Advisory locks** - YES. Required for concurrent worker startups. (To implement in Phase 8) 3. **Telemetry** - `created_at` already exists on `pgflow.flows`. Sufficient for MVP. 4. **pg_jsonschema validation** - SKIP. FlowShape generated by our code, not user input. -5. **Shape source** - DEFENSE IN DEPTH. Worker sends shape, ControlPlane compares against its own, then SQL compares against DB. +5. **Shape source** - Worker extracts shape from Flow, sends directly to SQL for comparison against DB. 6. **FlowShape location** - DSL package (alongside `compileFlow()`). 7. **FlowShape extensibility** - Version field for migrations. Future schemas (Zod) will be part of shape comparison. -8. **Self-contained mode** - DEFERRED (YAGNI). Can add in ~30 min if Lovable.dev needs it. - ---- - -## Self-Contained Mode (Deferred) - -**Decision:** Skip for MVP. Add later if needed. - -**Why defer:** -- ControlPlane already exists (0.9.0) -- Lovable.dev might work fine with ControlPlane -- Adding later is trivial (~30 lines, ~30 minutes) -- No architectural changes needed - just config flag + one `if` branch - -**If needed later:** -```typescript -// Just add this to FlowWorkerLifecycle -if (this.config.selfContained) { - // Direct SQL call - await this.sql`SELECT pgflow.ensure_flow_compiled(...)`; -} else { - // HTTP call (default) - await fetch(controlPlaneUrl + '/ensure-compiled', ...); -} -``` - -SQL function is the core - how we call it is a trivial detail. +8. **Compilation path** - Worker calls SQL directly (simpler than HTTP to ControlPlane). --- -## Advisory Lock Implementation +## Advisory Lock Implementation (Phase 8 - To Implement) Multiple workers may start simultaneously and all call `ensure_flow_compiled`. We use PostgreSQL advisory locks to serialize compilation: diff --git a/pkgs/edge-worker/deno.lock b/pkgs/edge-worker/deno.lock index a33eaccfa..d7f54bea1 100644 --- a/pkgs/edge-worker/deno.lock +++ b/pkgs/edge-worker/deno.lock @@ -5,6 +5,7 @@ "jsr:@henrygd/queue@^1.0.7": "1.0.7", "jsr:@std/assert@*": "0.224.0", "jsr:@std/assert@0.224": "0.224.0", + "jsr:@std/async@*": "0.224.2", "jsr:@std/async@0.224": "0.224.2", "jsr:@std/crypto@*": "1.0.5", "jsr:@std/fmt@0.224": "0.224.0", diff --git a/pkgs/edge-worker/src/core/Queries.ts b/pkgs/edge-worker/src/core/Queries.ts index fc0b41d66..0bf3bf71a 100644 --- a/pkgs/edge-worker/src/core/Queries.ts +++ b/pkgs/edge-worker/src/core/Queries.ts @@ -1,5 +1,13 @@ import type postgres from 'postgres'; import type { WorkerRow } from './types.js'; +import type { FlowShape, Json } from '@pgflow/dsl'; + +export type EnsureFlowCompiledStatus = 'compiled' | 'verified' | 'recompiled' | 'mismatch'; + +export interface EnsureFlowCompiledResult { + status: EnsureFlowCompiledStatus; + differences: string[]; +} export class Queries { constructor(private readonly sql: postgres.Sql) {} @@ -40,7 +48,31 @@ export class Queries { WHERE w.worker_id = ${workerRow.worker_id} RETURNING (w.deprecated_at IS NOT NULL) AS is_deprecated; `; - + return result || { is_deprecated: false }; } + + async ensureFlowCompiled( + flowSlug: string, + shape: FlowShape, + mode: 'development' | 'production' + ): Promise { + // SAFETY: FlowShape is JSON-compatible by construction (only strings, numbers, + // arrays, and plain objects), but TypeScript can't prove this because FlowShape + // uses specific property names while Json uses index signatures. This cast is + // safe because we control both sides: extractFlowShape() builds the object and + // this method consumes it - no untrusted input crosses this boundary. + // + // TODO: If FlowShape ever becomes part of a public API or accepts external input, + // add a runtime assertion function (assertJsonCompatible) to validate at the boundary. + const shapeJson = this.sql.json(shape as unknown as Json); + const [result] = await this.sql<{ result: EnsureFlowCompiledResult }[]>` + SELECT pgflow.ensure_flow_compiled( + ${flowSlug}, + ${shapeJson}::jsonb, + ${mode} + ) as result + `; + return result.result; + } } diff --git a/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts b/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts index 510bb2c49..683cfd47a 100644 --- a/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts +++ b/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts @@ -3,9 +3,17 @@ import type { ILifecycle, WorkerBootstrap, WorkerRow } from '../core/types.js'; import type { Logger } from '../platform/types.js'; import { States, WorkerState } from '../core/WorkerState.js'; import type { AnyFlow } from '@pgflow/dsl'; +import { extractFlowShape } from '@pgflow/dsl'; +import { + isLocalSupabase, + KNOWN_LOCAL_ANON_KEY, + KNOWN_LOCAL_SERVICE_ROLE_KEY, +} from '../shared/localDetection.js'; +import { FlowShapeMismatchError } from './errors.js'; export interface FlowLifecycleConfig { heartbeatInterval?: number; + env?: Record; } /** @@ -21,6 +29,7 @@ export class FlowWorkerLifecycle implements ILifecycle { private _workerId?: string; private heartbeatInterval: number; private lastHeartbeat = 0; + private env?: Record; constructor(queries: Queries, flow: TFlow, logger: Logger, config?: FlowLifecycleConfig) { this.queries = queries; @@ -28,6 +37,7 @@ export class FlowWorkerLifecycle implements ILifecycle { this.logger = logger; this.workerState = new WorkerState(logger); this.heartbeatInterval = config?.heartbeatInterval ?? 5000; + this.env = config?.env; } async acknowledgeStart(workerBootstrap: WorkerBootstrap): Promise { @@ -36,6 +46,10 @@ export class FlowWorkerLifecycle implements ILifecycle { // Store workerId for supplier pattern this._workerId = workerBootstrap.workerId; + // Compile/verify flow as part of Starting (before registering worker) + await this.ensureFlowCompiled(); + + // Only register worker after successful compilation this.workerRow = await this.queries.onWorkerStarted({ queueName: this.queueName, ...workerBootstrap, @@ -44,6 +58,37 @@ export class FlowWorkerLifecycle implements ILifecycle { this.workerState.transitionTo(States.Running); } + private async ensureFlowCompiled(): Promise { + this.logger.info(`Ensuring flow '${this.flow.slug}' is compiled...`); + + const shape = extractFlowShape(this.flow); + const mode = this.detectCompilationMode(); + + const result = await this.queries.ensureFlowCompiled( + this.flow.slug, + shape, + mode + ); + + if (result.status === 'mismatch') { + throw new FlowShapeMismatchError(this.flow.slug, result.differences); + } + + this.logger.info(`Flow '${this.flow.slug}' ${result.status} (mode: ${mode})`); + } + + private detectCompilationMode(): 'development' | 'production' { + // Use provided env if available, otherwise fall back to global detection + if (this.env) { + const anonKey = this.env['SUPABASE_ANON_KEY']; + const serviceRoleKey = this.env['SUPABASE_SERVICE_ROLE_KEY']; + const isLocal = anonKey === KNOWN_LOCAL_ANON_KEY || + serviceRoleKey === KNOWN_LOCAL_SERVICE_ROLE_KEY; + return isLocal ? 'development' : 'production'; + } + return isLocalSupabase() ? 'development' : 'production'; + } + acknowledgeStop() { this.workerState.transitionTo(States.Stopping); diff --git a/pkgs/edge-worker/src/flow/createFlowWorker.ts b/pkgs/edge-worker/src/flow/createFlowWorker.ts index 729b250ac..b3a013f13 100644 --- a/pkgs/edge-worker/src/flow/createFlowWorker.ts +++ b/pkgs/edge-worker/src/flow/createFlowWorker.ts @@ -89,7 +89,8 @@ export function createFlowWorker( queries, flow, - createLogger('FlowWorkerLifecycle') + createLogger('FlowWorkerLifecycle'), + { env: platformAdapter.env } ); // Create frozen worker config ONCE for reuse across all task executions diff --git a/pkgs/edge-worker/src/flow/errors.ts b/pkgs/edge-worker/src/flow/errors.ts new file mode 100644 index 000000000..b9301f541 --- /dev/null +++ b/pkgs/edge-worker/src/flow/errors.ts @@ -0,0 +1,18 @@ +/** + * Error thrown when flow shape in code doesn't match database schema in production mode. + * Worker should crash on this error - no recovery possible without migration. + */ +export class FlowShapeMismatchError extends Error { + constructor( + public readonly flowSlug: string, + public readonly differences: string[] + ) { + super( + `Flow '${flowSlug}' shape mismatch with database.\n` + + `Run migrations or use development mode to recompile.\n` + + `Differences:\n` + + differences.map(d => ` - ${d}`).join('\n') + ); + this.name = 'FlowShapeMismatchError'; + } +} diff --git a/pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts b/pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts new file mode 100644 index 000000000..3a2c26cde --- /dev/null +++ b/pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts @@ -0,0 +1,261 @@ +import { assertEquals } from '@std/assert'; +import { withPgNoTransaction } from '../../db.ts'; +import { Flow } from '@pgflow/dsl'; +import { delay } from '@std/async'; +import { createFlowWorker } from '../../../src/flow/createFlowWorker.ts'; +import { createTestPlatformAdapter } from '../_helpers.ts'; +import { + KNOWN_LOCAL_ANON_KEY, + KNOWN_LOCAL_SERVICE_ROLE_KEY, +} from '../../../src/shared/localDetection.ts'; +import type { postgres } from '../../sql.ts'; + +// Define a minimal test flow +const TestCompilationFlow = new Flow<{ value: number }>({ slug: 'test_compilation_flow' }) + .step({ slug: 'double' }, async (input) => { + await delay(1); + return input.run.value * 2; + }); + +function createLogger(module: string) { + return { + debug: console.log.bind(console, `[${module}]`), + info: console.log.bind(console, `[${module}]`), + warn: console.warn.bind(console, `[${module}]`), + error: console.error.bind(console, `[${module}]`), + }; +} + +function createPlatformAdapterWithEnv( + sql: postgres.Sql, + envOverrides: Record = {} +) { + const baseAdapter = createTestPlatformAdapter(sql); + const modifiedEnv = { + ...baseAdapter.env, + ...envOverrides, + }; + + return { + ...baseAdapter, + get env() { return modifiedEnv; }, + }; +} + +Deno.test( + 'compiles new flow on worker startup', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + // Verify flow doesn't exist + const [flowBefore] = await sql` + SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' + `; + assertEquals(flowBefore, undefined, 'Flow should not exist before worker startup'); + + // Create worker (compilation happens during acknowledgeStart) + const worker = createFlowWorker( + TestCompilationFlow, + { + sql, + maxConcurrent: 1, + batchSize: 10, + maxPollSeconds: 1, + pollIntervalMs: 200, + }, + createLogger, + createPlatformAdapterWithEnv(sql) + ); + + try { + // Start worker - this triggers compilation + worker.startOnlyOnce({ + edgeFunctionName: 'test_compilation', + workerId: crypto.randomUUID(), + }); + + // Give time for startup to complete + await delay(100); + + // Verify flow was created + const [flowAfter] = await sql` + SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' + `; + assertEquals(flowAfter?.flow_slug, 'test_compilation_flow', 'Flow should be created'); + + // Verify step was created + const steps = await sql` + SELECT step_slug FROM pgflow.steps WHERE flow_slug = 'test_compilation_flow' ORDER BY step_slug + `; + assertEquals(steps.length, 1, 'Should have 1 step'); + assertEquals(steps[0].step_slug, 'double', 'Step should be "double"'); + } finally { + await worker.stop(); + } + }) +); + +Deno.test( + 'verifies existing matching flow on worker startup', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + // Pre-create the flow with matching structure + await sql`SELECT pgflow.create_flow('test_compilation_flow')`; + await sql`SELECT pgflow.add_step('test_compilation_flow', 'double')`; + + // Verify flow exists + const [flowBefore] = await sql` + SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' + `; + assertEquals(flowBefore?.flow_slug, 'test_compilation_flow', 'Flow should exist'); + + // Create and start worker + const worker = createFlowWorker( + TestCompilationFlow, + { + sql, + maxConcurrent: 1, + batchSize: 10, + maxPollSeconds: 1, + pollIntervalMs: 200, + }, + createLogger, + createPlatformAdapterWithEnv(sql) + ); + + try { + // Start worker - should verify without error + worker.startOnlyOnce({ + edgeFunctionName: 'test_compilation', + workerId: crypto.randomUUID(), + }); + + // Give time for startup to complete + await delay(100); + + // Verify flow still exists (was not deleted/recreated) + const [flowAfter] = await sql` + SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' + `; + assertEquals(flowAfter?.flow_slug, 'test_compilation_flow', 'Flow should still exist'); + } finally { + await worker.stop(); + } + }) +); + +Deno.test( + 'throws FlowShapeMismatchError on mismatch in production mode', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + // Pre-create flow with DIFFERENT structure than what worker expects + await sql`SELECT pgflow.create_flow('test_compilation_flow')`; + await sql`SELECT pgflow.add_step('test_compilation_flow', 'double')`; + await sql`SELECT pgflow.add_step('test_compilation_flow', 'different_step', deps_slugs => ARRAY['double']::text[])`; + + // Use non-local keys to simulate production mode + const platformAdapter = createPlatformAdapterWithEnv(sql, { + SUPABASE_ANON_KEY: 'prod-anon-key-not-local', + SUPABASE_SERVICE_ROLE_KEY: 'prod-service-key-not-local', + }); + + const worker = createFlowWorker( + TestCompilationFlow, // Has only 'double' step + { + sql, + maxConcurrent: 1, + batchSize: 10, + maxPollSeconds: 1, + pollIntervalMs: 200, + }, + createLogger, + platformAdapter + ); + + // Set up unhandled rejection handler to capture the error + const caughtErrors: Error[] = []; + const errorHandler = (event: PromiseRejectionEvent) => { + event.preventDefault(); + caughtErrors.push(event.reason as Error); + }; + globalThis.addEventListener('unhandledrejection', errorHandler); + + try { + worker.startOnlyOnce({ + edgeFunctionName: 'test_compilation', + workerId: crypto.randomUUID(), + }); + + // Give time for startup to fail + await delay(200); + + // Verify error was thrown + assertEquals(caughtErrors.length > 0, true, 'Should have caught an error'); + const caughtError = caughtErrors[0]; + assertEquals(caughtError.name, 'FlowShapeMismatchError', 'Error should be FlowShapeMismatchError'); + assertEquals( + caughtError.message.includes('shape mismatch'), + true, + 'Error message should mention mismatch' + ); + } finally { + globalThis.removeEventListener('unhandledrejection', errorHandler); + try { + await worker.stop(); + } catch { + // Ignore stop errors since worker may have failed to start + } + } + }) +); + +Deno.test( + 'recompiles flow on mismatch in development mode (local Supabase)', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + // Pre-create flow with DIFFERENT structure + await sql`SELECT pgflow.create_flow('test_compilation_flow')`; + await sql`SELECT pgflow.add_step('test_compilation_flow', 'old_step')`; + + // Use local keys to simulate development mode + const platformAdapter = createPlatformAdapterWithEnv(sql, { + SUPABASE_ANON_KEY: KNOWN_LOCAL_ANON_KEY, + SUPABASE_SERVICE_ROLE_KEY: KNOWN_LOCAL_SERVICE_ROLE_KEY, + }); + + const worker = createFlowWorker( + TestCompilationFlow, // Has 'double' step, not 'old_step' + { + sql, + maxConcurrent: 1, + batchSize: 10, + maxPollSeconds: 1, + pollIntervalMs: 200, + }, + createLogger, + platformAdapter + ); + + try { + worker.startOnlyOnce({ + edgeFunctionName: 'test_compilation', + workerId: crypto.randomUUID(), + }); + + // Give time for startup and recompilation + await delay(200); + + // Verify flow was recompiled with new structure + const steps = await sql` + SELECT step_slug FROM pgflow.steps WHERE flow_slug = 'test_compilation_flow' ORDER BY step_slug + `; + assertEquals(steps.length, 1, 'Should have 1 step after recompilation'); + assertEquals(steps[0].step_slug, 'double', 'Step should be "double" after recompilation'); + } finally { + await worker.stop(); + } + }) +); diff --git a/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts b/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts index 0382aaf23..ff2092985 100644 --- a/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts +++ b/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts @@ -44,14 +44,21 @@ class MockQueries extends Queries { this.workerStopped = true; return Promise.resolve(workerRow); } + + override ensureFlowCompiled(): Promise<{ status: 'compiled' | 'verified' | 'recompiled' | 'mismatch'; differences: string[] }> { + return Promise.resolve({ status: 'verified', differences: [] }); + } } // Mock Flow const createMockFlow = (): AnyFlow => { - // Return a minimal flow structure that matches AnyFlow type + // Return a minimal flow structure needed for extractFlowShape return { slug: 'test-flow', - } as AnyFlow; + stepOrder: [], + options: {}, + getStepDefinition: () => { throw new Error('No steps in mock flow'); }, + } as unknown as AnyFlow; };