From b0f0a33afba8e906934b788e802965067912590f Mon Sep 17 00:00:00 2001 From: jumski <9126+jumski@users.noreply.github.com> Date: Thu, 4 Dec 2025 11:09:28 +0000 Subject: [PATCH] refactor: simplify environment detection for flow compilation mode (#475) # Refactor Flow Compilation Environment Detection This PR improves how we detect local/development environments for flow compilation by: 1. Adding an explicit `isLocalEnvironment` property to the `PlatformAdapter` interface 2. Replacing the environment variable-based detection with a cleaner approach that: - Removes direct access to environment variables in `FlowWorkerLifecycle` - Adds a new `isLocalSupabaseEnv()` function for checking environment records - Simplifies the flow compilation mode detection logic The changes make the code more maintainable by: - Adding a helpful comment about FlowShape JSON serialization - Improving log messages during flow compilation - Updating tests to use the new approach for environment detection - Making the environment detection more explicit and testable This refactoring maintains the same behavior while making the code more robust and easier to understand. --- .../src/flow/FlowWorkerLifecycle.ts | 29 ++++------------- pkgs/edge-worker/src/flow/createFlowWorker.ts | 2 +- .../src/platform/SupabasePlatformAdapter.ts | 8 ++--- pkgs/edge-worker/src/platform/types.ts | 6 ++++ .../edge-worker/tests/integration/_helpers.ts | 5 +-- .../flow/compilationAtStartup.test.ts | 32 ++++++------------- .../FlowWorkerLifecycle.deprecation.test.ts | 27 ++++++++-------- 7 files changed, 42 insertions(+), 67 deletions(-) diff --git a/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts b/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts index 683cfd47a..3b3d09c52 100644 --- a/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts +++ b/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts @@ -4,16 +4,11 @@ import type { Logger } from '../platform/types.js'; import { States, WorkerState } from '../core/WorkerState.js'; import type { AnyFlow } from '@pgflow/dsl'; import { extractFlowShape } from '@pgflow/dsl'; -import { - isLocalSupabase, - KNOWN_LOCAL_ANON_KEY, - KNOWN_LOCAL_SERVICE_ROLE_KEY, -} from '../shared/localDetection.js'; import { FlowShapeMismatchError } from './errors.js'; export interface FlowLifecycleConfig { heartbeatInterval?: number; - env?: Record; + isLocalEnvironment?: boolean; } /** @@ -29,7 +24,7 @@ export class FlowWorkerLifecycle implements ILifecycle { private _workerId?: string; private heartbeatInterval: number; private lastHeartbeat = 0; - private env?: Record; + private isLocalEnvironment: boolean; constructor(queries: Queries, flow: TFlow, logger: Logger, config?: FlowLifecycleConfig) { this.queries = queries; @@ -37,7 +32,7 @@ export class FlowWorkerLifecycle implements ILifecycle { this.logger = logger; this.workerState = new WorkerState(logger); this.heartbeatInterval = config?.heartbeatInterval ?? 5000; - this.env = config?.env; + this.isLocalEnvironment = config?.isLocalEnvironment ?? false; } async acknowledgeStart(workerBootstrap: WorkerBootstrap): Promise { @@ -59,10 +54,10 @@ export class FlowWorkerLifecycle implements ILifecycle { } private async ensureFlowCompiled(): Promise { - this.logger.info(`Ensuring flow '${this.flow.slug}' is compiled...`); + const mode = this.isLocalEnvironment ? 'development' : 'production'; + this.logger.info(`Compiling flow '${this.flow.slug}' (mode: ${mode})...`); const shape = extractFlowShape(this.flow); - const mode = this.detectCompilationMode(); const result = await this.queries.ensureFlowCompiled( this.flow.slug, @@ -74,19 +69,7 @@ export class FlowWorkerLifecycle implements ILifecycle { throw new FlowShapeMismatchError(this.flow.slug, result.differences); } - this.logger.info(`Flow '${this.flow.slug}' ${result.status} (mode: ${mode})`); - } - - private detectCompilationMode(): 'development' | 'production' { - // Use provided env if available, otherwise fall back to global detection - if (this.env) { - const anonKey = this.env['SUPABASE_ANON_KEY']; - const serviceRoleKey = this.env['SUPABASE_SERVICE_ROLE_KEY']; - const isLocal = anonKey === KNOWN_LOCAL_ANON_KEY || - serviceRoleKey === KNOWN_LOCAL_SERVICE_ROLE_KEY; - return isLocal ? 'development' : 'production'; - } - return isLocalSupabase() ? 'development' : 'production'; + this.logger.info(`Flow '${this.flow.slug}' compilation complete: ${result.status}`); } acknowledgeStop() { diff --git a/pkgs/edge-worker/src/flow/createFlowWorker.ts b/pkgs/edge-worker/src/flow/createFlowWorker.ts index b3a013f13..7b6a5b7ac 100644 --- a/pkgs/edge-worker/src/flow/createFlowWorker.ts +++ b/pkgs/edge-worker/src/flow/createFlowWorker.ts @@ -90,7 +90,7 @@ export function createFlowWorker { // Trigger shutdown signal this.abortController.abort(); - + // Cleanup resources await this._platformResources.sql.end(); - + if (this.worker) { await this.worker.stop(); } @@ -262,7 +262,7 @@ export class SupabasePlatformAdapter implements PlatformAdapter = Re */ get platformResources(): TResources; + /** + * Whether running in a local/development environment. + * Used by flow compilation to determine if recompilation is allowed. + */ + get isLocalEnvironment(): boolean; + } diff --git a/pkgs/edge-worker/tests/integration/_helpers.ts b/pkgs/edge-worker/tests/integration/_helpers.ts index c5626684f..978564e5e 100644 --- a/pkgs/edge-worker/tests/integration/_helpers.ts +++ b/pkgs/edge-worker/tests/integration/_helpers.ts @@ -41,14 +41,15 @@ export function createTestPlatformAdapter(sql: postgres.Sql): PlatformAdapter( - sql: postgres.Sql, - flow: TFlow, + sql: postgres.Sql, + flow: TFlow, options: FlowWorkerConfig ) { const defaultOptions = { diff --git a/pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts b/pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts index 3a2c26cde..54efe0a99 100644 --- a/pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts +++ b/pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts @@ -4,10 +4,6 @@ import { Flow } from '@pgflow/dsl'; import { delay } from '@std/async'; import { createFlowWorker } from '../../../src/flow/createFlowWorker.ts'; import { createTestPlatformAdapter } from '../_helpers.ts'; -import { - KNOWN_LOCAL_ANON_KEY, - KNOWN_LOCAL_SERVICE_ROLE_KEY, -} from '../../../src/shared/localDetection.ts'; import type { postgres } from '../../sql.ts'; // Define a minimal test flow @@ -26,19 +22,15 @@ function createLogger(module: string) { }; } -function createPlatformAdapterWithEnv( +function createPlatformAdapterWithLocalEnv( sql: postgres.Sql, - envOverrides: Record = {} + isLocal: boolean ) { const baseAdapter = createTestPlatformAdapter(sql); - const modifiedEnv = { - ...baseAdapter.env, - ...envOverrides, - }; return { ...baseAdapter, - get env() { return modifiedEnv; }, + get isLocalEnvironment() { return isLocal; }, }; } @@ -64,7 +56,7 @@ Deno.test( pollIntervalMs: 200, }, createLogger, - createPlatformAdapterWithEnv(sql) + createPlatformAdapterWithLocalEnv(sql, false) ); try { @@ -121,7 +113,7 @@ Deno.test( pollIntervalMs: 200, }, createLogger, - createPlatformAdapterWithEnv(sql) + createPlatformAdapterWithLocalEnv(sql, false) ); try { @@ -155,11 +147,8 @@ Deno.test( await sql`SELECT pgflow.add_step('test_compilation_flow', 'double')`; await sql`SELECT pgflow.add_step('test_compilation_flow', 'different_step', deps_slugs => ARRAY['double']::text[])`; - // Use non-local keys to simulate production mode - const platformAdapter = createPlatformAdapterWithEnv(sql, { - SUPABASE_ANON_KEY: 'prod-anon-key-not-local', - SUPABASE_SERVICE_ROLE_KEY: 'prod-service-key-not-local', - }); + // Use isLocal: false to simulate production mode + const platformAdapter = createPlatformAdapterWithLocalEnv(sql, false); const worker = createFlowWorker( TestCompilationFlow, // Has only 'double' step @@ -220,11 +209,8 @@ Deno.test( await sql`SELECT pgflow.create_flow('test_compilation_flow')`; await sql`SELECT pgflow.add_step('test_compilation_flow', 'old_step')`; - // Use local keys to simulate development mode - const platformAdapter = createPlatformAdapterWithEnv(sql, { - SUPABASE_ANON_KEY: KNOWN_LOCAL_ANON_KEY, - SUPABASE_SERVICE_ROLE_KEY: KNOWN_LOCAL_SERVICE_ROLE_KEY, - }); + // Use isLocal: true to simulate development mode + const platformAdapter = createPlatformAdapterWithLocalEnv(sql, true); const worker = createFlowWorker( TestCompilationFlow, // Has 'double' step, not 'old_step' diff --git a/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts b/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts index ff2092985..f2304dd4f 100644 --- a/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts +++ b/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts @@ -1,9 +1,9 @@ import { assertEquals, assertThrows } from '@std/assert'; import { FlowWorkerLifecycle } from '../../src/flow/FlowWorkerLifecycle.ts'; import { TransitionError } from '../../src/core/WorkerState.ts'; -import { Queries } from '../../src/core/Queries.ts'; +import { Queries, type EnsureFlowCompiledResult } from '../../src/core/Queries.ts'; import type { WorkerRow } from '../../src/core/types.ts'; -import type { AnyFlow } from '@pgflow/dsl'; +import { Flow, type FlowShape } from '@pgflow/dsl'; import type { Logger } from '../../src/platform/types.ts'; import { createLoggingFactory } from '../../src/platform/logging.ts'; import type { postgres } from '../sql.ts'; @@ -45,21 +45,20 @@ class MockQueries extends Queries { return Promise.resolve(workerRow); } - override ensureFlowCompiled(): Promise<{ status: 'compiled' | 'verified' | 'recompiled' | 'mismatch'; differences: string[] }> { + override ensureFlowCompiled( + _flowSlug: string, + _shape: FlowShape, + _mode: 'development' | 'production' + ): Promise { return Promise.resolve({ status: 'verified', differences: [] }); } } -// Mock Flow -const createMockFlow = (): AnyFlow => { - // Return a minimal flow structure needed for extractFlowShape - return { - slug: 'test-flow', - stepOrder: [], - options: {}, - getStepDefinition: () => { throw new Error('No steps in mock flow'); }, - } as unknown as AnyFlow; -}; +// Real Flow for testing - using the DSL to create a valid flow +const TestFlow = new Flow<{ value: number }>({ slug: 'test_flow' }) + .step({ slug: 'step1' }, (input) => input.run.value); + +const createMockFlow = () => TestFlow; Deno.test('FlowWorkerLifecycle - should transition to deprecated state when heartbeat returns is_deprecated true', async () => { @@ -219,7 +218,7 @@ Deno.test('FlowWorkerLifecycle - queueName should return flow slug', () => { const mockFlow = createMockFlow(); const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger); - assertEquals(lifecycle.queueName, 'test-flow'); + assertEquals(lifecycle.queueName, 'test_flow'); }); Deno.test('FlowWorkerLifecycle - workerId getter should work after start', async () => {