diff --git a/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts b/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts index 683cfd47a..3b3d09c52 100644 --- a/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts +++ b/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts @@ -4,16 +4,11 @@ import type { Logger } from '../platform/types.js'; import { States, WorkerState } from '../core/WorkerState.js'; import type { AnyFlow } from '@pgflow/dsl'; import { extractFlowShape } from '@pgflow/dsl'; -import { - isLocalSupabase, - KNOWN_LOCAL_ANON_KEY, - KNOWN_LOCAL_SERVICE_ROLE_KEY, -} from '../shared/localDetection.js'; import { FlowShapeMismatchError } from './errors.js'; export interface FlowLifecycleConfig { heartbeatInterval?: number; - env?: Record; + isLocalEnvironment?: boolean; } /** @@ -29,7 +24,7 @@ export class FlowWorkerLifecycle implements ILifecycle { private _workerId?: string; private heartbeatInterval: number; private lastHeartbeat = 0; - private env?: Record; + private isLocalEnvironment: boolean; constructor(queries: Queries, flow: TFlow, logger: Logger, config?: FlowLifecycleConfig) { this.queries = queries; @@ -37,7 +32,7 @@ export class FlowWorkerLifecycle implements ILifecycle { this.logger = logger; this.workerState = new WorkerState(logger); this.heartbeatInterval = config?.heartbeatInterval ?? 5000; - this.env = config?.env; + this.isLocalEnvironment = config?.isLocalEnvironment ?? false; } async acknowledgeStart(workerBootstrap: WorkerBootstrap): Promise { @@ -59,10 +54,10 @@ export class FlowWorkerLifecycle implements ILifecycle { } private async ensureFlowCompiled(): Promise { - this.logger.info(`Ensuring flow '${this.flow.slug}' is compiled...`); + const mode = this.isLocalEnvironment ? 'development' : 'production'; + this.logger.info(`Compiling flow '${this.flow.slug}' (mode: ${mode})...`); const shape = extractFlowShape(this.flow); - const mode = this.detectCompilationMode(); const result = await this.queries.ensureFlowCompiled( this.flow.slug, @@ -74,19 +69,7 @@ export class FlowWorkerLifecycle implements ILifecycle { throw new FlowShapeMismatchError(this.flow.slug, result.differences); } - this.logger.info(`Flow '${this.flow.slug}' ${result.status} (mode: ${mode})`); - } - - private detectCompilationMode(): 'development' | 'production' { - // Use provided env if available, otherwise fall back to global detection - if (this.env) { - const anonKey = this.env['SUPABASE_ANON_KEY']; - const serviceRoleKey = this.env['SUPABASE_SERVICE_ROLE_KEY']; - const isLocal = anonKey === KNOWN_LOCAL_ANON_KEY || - serviceRoleKey === KNOWN_LOCAL_SERVICE_ROLE_KEY; - return isLocal ? 'development' : 'production'; - } - return isLocalSupabase() ? 'development' : 'production'; + this.logger.info(`Flow '${this.flow.slug}' compilation complete: ${result.status}`); } acknowledgeStop() { diff --git a/pkgs/edge-worker/src/flow/createFlowWorker.ts b/pkgs/edge-worker/src/flow/createFlowWorker.ts index b3a013f13..7b6a5b7ac 100644 --- a/pkgs/edge-worker/src/flow/createFlowWorker.ts +++ b/pkgs/edge-worker/src/flow/createFlowWorker.ts @@ -90,7 +90,7 @@ export function createFlowWorker { if (!this.edgeFunctionName) { throw new Error('functionName cannot be null or empty'); diff --git a/pkgs/edge-worker/src/platform/types.ts b/pkgs/edge-worker/src/platform/types.ts index 7d6a9aad7..d8cdfcd93 100644 --- a/pkgs/edge-worker/src/platform/types.ts +++ b/pkgs/edge-worker/src/platform/types.ts @@ -56,4 +56,10 @@ export interface PlatformAdapter = Re */ get platformResources(): TResources; + /** + * Whether running in a local/development environment. + * Used by flow compilation to determine if recompilation is allowed. + */ + get isLocalEnvironment(): boolean; + } diff --git a/pkgs/edge-worker/src/shared/localDetection.ts b/pkgs/edge-worker/src/shared/localDetection.ts index 7e596ab67..21d492674 100644 --- a/pkgs/edge-worker/src/shared/localDetection.ts +++ b/pkgs/edge-worker/src/shared/localDetection.ts @@ -4,9 +4,17 @@ export const KNOWN_LOCAL_ANON_KEY = export const KNOWN_LOCAL_SERVICE_ROLE_KEY = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU'; -export function isLocalSupabase(): boolean { - const anonKey = Deno.env.get('SUPABASE_ANON_KEY'); - const serviceRoleKey = Deno.env.get('SUPABASE_SERVICE_ROLE_KEY'); +/** + * Checks if the provided environment indicates local Supabase. + * Use when you have access to an env record (e.g., from PlatformAdapter). + */ +export function isLocalSupabaseEnv(env: Record): boolean { + const anonKey = env['SUPABASE_ANON_KEY']; + const serviceRoleKey = env['SUPABASE_SERVICE_ROLE_KEY']; return anonKey === KNOWN_LOCAL_ANON_KEY || serviceRoleKey === KNOWN_LOCAL_SERVICE_ROLE_KEY; } + +export function isLocalSupabase(): boolean { + return isLocalSupabaseEnv(Deno.env.toObject()); +} diff --git a/pkgs/edge-worker/tests/integration/_helpers.ts b/pkgs/edge-worker/tests/integration/_helpers.ts index ba2aa9602..eed43a4cd 100644 --- a/pkgs/edge-worker/tests/integration/_helpers.ts +++ b/pkgs/edge-worker/tests/integration/_helpers.ts @@ -41,6 +41,7 @@ export function createTestPlatformAdapter(sql: postgres.Sql): PlatformAdapter = {} + isLocal: boolean ) { const baseAdapter = createTestPlatformAdapter(sql); - const modifiedEnv = { - ...baseAdapter.env, - ...envOverrides, - }; return { ...baseAdapter, - get env() { return modifiedEnv; }, + get isLocalEnvironment() { return isLocal; }, }; } @@ -64,7 +56,7 @@ Deno.test( pollIntervalMs: 200, }, createLogger, - createPlatformAdapterWithEnv(sql) + createPlatformAdapterWithLocalEnv(sql, false) ); try { @@ -121,7 +113,7 @@ Deno.test( pollIntervalMs: 200, }, createLogger, - createPlatformAdapterWithEnv(sql) + createPlatformAdapterWithLocalEnv(sql, false) ); try { @@ -155,11 +147,8 @@ Deno.test( await sql`SELECT pgflow.add_step('test_compilation_flow', 'double')`; await sql`SELECT pgflow.add_step('test_compilation_flow', 'different_step', deps_slugs => ARRAY['double']::text[])`; - // Use non-local keys to simulate production mode - const platformAdapter = createPlatformAdapterWithEnv(sql, { - SUPABASE_ANON_KEY: 'prod-anon-key-not-local', - SUPABASE_SERVICE_ROLE_KEY: 'prod-service-key-not-local', - }); + // Use isLocal: false to simulate production mode + const platformAdapter = createPlatformAdapterWithLocalEnv(sql, false); const worker = createFlowWorker( TestCompilationFlow, // Has only 'double' step @@ -220,11 +209,8 @@ Deno.test( await sql`SELECT pgflow.create_flow('test_compilation_flow')`; await sql`SELECT pgflow.add_step('test_compilation_flow', 'old_step')`; - // Use local keys to simulate development mode - const platformAdapter = createPlatformAdapterWithEnv(sql, { - SUPABASE_ANON_KEY: KNOWN_LOCAL_ANON_KEY, - SUPABASE_SERVICE_ROLE_KEY: KNOWN_LOCAL_SERVICE_ROLE_KEY, - }); + // Use isLocal: true to simulate development mode + const platformAdapter = createPlatformAdapterWithLocalEnv(sql, true); const worker = createFlowWorker( TestCompilationFlow, // Has 'double' step, not 'old_step' diff --git a/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts b/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts index ff2092985..f2304dd4f 100644 --- a/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts +++ b/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts @@ -1,9 +1,9 @@ import { assertEquals, assertThrows } from '@std/assert'; import { FlowWorkerLifecycle } from '../../src/flow/FlowWorkerLifecycle.ts'; import { TransitionError } from '../../src/core/WorkerState.ts'; -import { Queries } from '../../src/core/Queries.ts'; +import { Queries, type EnsureFlowCompiledResult } from '../../src/core/Queries.ts'; import type { WorkerRow } from '../../src/core/types.ts'; -import type { AnyFlow } from '@pgflow/dsl'; +import { Flow, type FlowShape } from '@pgflow/dsl'; import type { Logger } from '../../src/platform/types.ts'; import { createLoggingFactory } from '../../src/platform/logging.ts'; import type { postgres } from '../sql.ts'; @@ -45,21 +45,20 @@ class MockQueries extends Queries { return Promise.resolve(workerRow); } - override ensureFlowCompiled(): Promise<{ status: 'compiled' | 'verified' | 'recompiled' | 'mismatch'; differences: string[] }> { + override ensureFlowCompiled( + _flowSlug: string, + _shape: FlowShape, + _mode: 'development' | 'production' + ): Promise { return Promise.resolve({ status: 'verified', differences: [] }); } } -// Mock Flow -const createMockFlow = (): AnyFlow => { - // Return a minimal flow structure needed for extractFlowShape - return { - slug: 'test-flow', - stepOrder: [], - options: {}, - getStepDefinition: () => { throw new Error('No steps in mock flow'); }, - } as unknown as AnyFlow; -}; +// Real Flow for testing - using the DSL to create a valid flow +const TestFlow = new Flow<{ value: number }>({ slug: 'test_flow' }) + .step({ slug: 'step1' }, (input) => input.run.value); + +const createMockFlow = () => TestFlow; Deno.test('FlowWorkerLifecycle - should transition to deprecated state when heartbeat returns is_deprecated true', async () => { @@ -219,7 +218,7 @@ Deno.test('FlowWorkerLifecycle - queueName should return flow slug', () => { const mockFlow = createMockFlow(); const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger); - assertEquals(lifecycle.queueName, 'test-flow'); + assertEquals(lifecycle.queueName, 'test_flow'); }); Deno.test('FlowWorkerLifecycle - workerId getter should work after start', async () => { diff --git a/pkgs/edge-worker/tests/unit/shared/localDetection.test.ts b/pkgs/edge-worker/tests/unit/shared/localDetection.test.ts index 91de32ebe..8c024cc29 100644 --- a/pkgs/edge-worker/tests/unit/shared/localDetection.test.ts +++ b/pkgs/edge-worker/tests/unit/shared/localDetection.test.ts @@ -1,6 +1,7 @@ import { assertEquals } from '@std/assert'; import { isLocalSupabase, + isLocalSupabaseEnv, KNOWN_LOCAL_ANON_KEY, KNOWN_LOCAL_SERVICE_ROLE_KEY, } from '../../../src/shared/localDetection.ts'; @@ -127,3 +128,61 @@ Deno.test('isLocalSupabase - returns true when only service role matches (anon i assertEquals(isLocalSupabase(), true); }); }); + +// ============================================================ +// isLocalSupabaseEnv() tests +// ============================================================ + +Deno.test('isLocalSupabaseEnv - returns true when anon key matches local', () => { + const env = { SUPABASE_ANON_KEY: KNOWN_LOCAL_ANON_KEY }; + assertEquals(isLocalSupabaseEnv(env), true); +}); + +Deno.test('isLocalSupabaseEnv - returns true when service role key matches local', () => { + const env = { SUPABASE_SERVICE_ROLE_KEY: KNOWN_LOCAL_SERVICE_ROLE_KEY }; + assertEquals(isLocalSupabaseEnv(env), true); +}); + +Deno.test('isLocalSupabaseEnv - returns true when both keys match local', () => { + const env = { + SUPABASE_ANON_KEY: KNOWN_LOCAL_ANON_KEY, + SUPABASE_SERVICE_ROLE_KEY: KNOWN_LOCAL_SERVICE_ROLE_KEY, + }; + assertEquals(isLocalSupabaseEnv(env), true); +}); + +Deno.test('isLocalSupabaseEnv - returns false for non-local keys', () => { + const env = { + SUPABASE_ANON_KEY: 'prod-key', + SUPABASE_SERVICE_ROLE_KEY: 'prod-service-key', + }; + assertEquals(isLocalSupabaseEnv(env), false); +}); + +Deno.test('isLocalSupabaseEnv - returns false for empty env', () => { + assertEquals(isLocalSupabaseEnv({}), false); +}); + +Deno.test('isLocalSupabaseEnv - returns false for undefined values', () => { + const env = { + SUPABASE_ANON_KEY: undefined, + SUPABASE_SERVICE_ROLE_KEY: undefined, + }; + assertEquals(isLocalSupabaseEnv(env), false); +}); + +Deno.test('isLocalSupabaseEnv - returns true when only anon key matches (service is prod)', () => { + const env = { + SUPABASE_ANON_KEY: KNOWN_LOCAL_ANON_KEY, + SUPABASE_SERVICE_ROLE_KEY: 'prod-service-key', + }; + assertEquals(isLocalSupabaseEnv(env), true); +}); + +Deno.test('isLocalSupabaseEnv - returns true when only service role matches (anon is prod)', () => { + const env = { + SUPABASE_ANON_KEY: 'prod-anon-key', + SUPABASE_SERVICE_ROLE_KEY: KNOWN_LOCAL_SERVICE_ROLE_KEY, + }; + assertEquals(isLocalSupabaseEnv(env), true); +});