Skip to content

Commit 88f515d

Browse files
committed
move local environment detection from FlowWorkerLifecycle to PlatformAdapter
1 parent f98a265 commit 88f515d

File tree

9 files changed

+118
-61
lines changed

9 files changed

+118
-61
lines changed

pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,11 @@ import type { Logger } from '../platform/types.js';
44
import { States, WorkerState } from '../core/WorkerState.js';
55
import type { AnyFlow } from '@pgflow/dsl';
66
import { extractFlowShape } from '@pgflow/dsl';
7-
import {
8-
isLocalSupabase,
9-
KNOWN_LOCAL_ANON_KEY,
10-
KNOWN_LOCAL_SERVICE_ROLE_KEY,
11-
} from '../shared/localDetection.js';
127
import { FlowShapeMismatchError } from './errors.js';
138

149
export interface FlowLifecycleConfig {
1510
heartbeatInterval?: number;
16-
env?: Record<string, string | undefined>;
11+
isLocalEnvironment?: boolean;
1712
}
1813

1914
/**
@@ -29,15 +24,15 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
2924
private _workerId?: string;
3025
private heartbeatInterval: number;
3126
private lastHeartbeat = 0;
32-
private env?: Record<string, string | undefined>;
27+
private isLocalEnvironment: boolean;
3328

3429
constructor(queries: Queries, flow: TFlow, logger: Logger, config?: FlowLifecycleConfig) {
3530
this.queries = queries;
3631
this.flow = flow;
3732
this.logger = logger;
3833
this.workerState = new WorkerState(logger);
3934
this.heartbeatInterval = config?.heartbeatInterval ?? 5000;
40-
this.env = config?.env;
35+
this.isLocalEnvironment = config?.isLocalEnvironment ?? false;
4136
}
4237

4338
async acknowledgeStart(workerBootstrap: WorkerBootstrap): Promise<void> {
@@ -59,10 +54,10 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
5954
}
6055

6156
private async ensureFlowCompiled(): Promise<void> {
62-
this.logger.info(`Ensuring flow '${this.flow.slug}' is compiled...`);
57+
const mode = this.isLocalEnvironment ? 'development' : 'production';
58+
this.logger.info(`Compiling flow '${this.flow.slug}' (mode: ${mode})...`);
6359

6460
const shape = extractFlowShape(this.flow);
65-
const mode = this.detectCompilationMode();
6661

6762
const result = await this.queries.ensureFlowCompiled(
6863
this.flow.slug,
@@ -74,19 +69,7 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
7469
throw new FlowShapeMismatchError(this.flow.slug, result.differences);
7570
}
7671

77-
this.logger.info(`Flow '${this.flow.slug}' ${result.status} (mode: ${mode})`);
78-
}
79-
80-
private detectCompilationMode(): 'development' | 'production' {
81-
// Use provided env if available, otherwise fall back to global detection
82-
if (this.env) {
83-
const anonKey = this.env['SUPABASE_ANON_KEY'];
84-
const serviceRoleKey = this.env['SUPABASE_SERVICE_ROLE_KEY'];
85-
const isLocal = anonKey === KNOWN_LOCAL_ANON_KEY ||
86-
serviceRoleKey === KNOWN_LOCAL_SERVICE_ROLE_KEY;
87-
return isLocal ? 'development' : 'production';
88-
}
89-
return isLocalSupabase() ? 'development' : 'production';
72+
this.logger.info(`Flow '${this.flow.slug}' compilation complete: ${result.status}`);
9073
}
9174

9275
acknowledgeStop() {

pkgs/edge-worker/src/flow/createFlowWorker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ export function createFlowWorker<TFlow extends AnyFlow, TResources extends Recor
9090
queries,
9191
flow,
9292
createLogger('FlowWorkerLifecycle'),
93-
{ env: platformAdapter.env }
93+
{ isLocalEnvironment: platformAdapter.isLocalEnvironment }
9494
);
9595

9696
// Create frozen worker config ONCE for reuse across all task executions

pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type { SupabaseResources } from '@pgflow/dsl/supabase';
66
import { createSql } from '../core/sql-factory.js';
77
import { createServiceSupabaseClient } from '../core/supabase-utils.js';
88
import { createLoggingFactory } from './logging.js';
9+
import { isLocalSupabaseEnv } from '../shared/localDetection.js';
910

1011
/**
1112
* Supabase Edge Runtime type (without global augmentation to comply with JSR)
@@ -50,7 +51,7 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
5051
const env = Deno.env.toObject();
5152
this.assertSupabaseEnv(env);
5253
this.validatedEnv = env;
53-
54+
5455
// Create abort controller for shutdown signal
5556
this.abortController = new AbortController();
5657

@@ -139,6 +140,13 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
139140
return this._platformResources;
140141
}
141142

143+
/**
144+
* Whether running in a local/development environment.
145+
*/
146+
get isLocalEnvironment(): boolean {
147+
return isLocalSupabaseEnv(this.validatedEnv);
148+
}
149+
142150
private async spawnNewEdgeFunction(): Promise<void> {
143151
if (!this.edgeFunctionName) {
144152
throw new Error('functionName cannot be null or empty');

pkgs/edge-worker/src/platform/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,10 @@ export interface PlatformAdapter<TResources extends Record<string, unknown> = Re
5656
*/
5757
get platformResources(): TResources;
5858

59+
/**
60+
* Whether running in a local/development environment.
61+
* Used by flow compilation to determine if recompilation is allowed.
62+
*/
63+
get isLocalEnvironment(): boolean;
64+
5965
}

pkgs/edge-worker/src/shared/localDetection.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,17 @@ export const KNOWN_LOCAL_ANON_KEY =
44
export const KNOWN_LOCAL_SERVICE_ROLE_KEY =
55
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';
66

7-
export function isLocalSupabase(): boolean {
8-
const anonKey = Deno.env.get('SUPABASE_ANON_KEY');
9-
const serviceRoleKey = Deno.env.get('SUPABASE_SERVICE_ROLE_KEY');
7+
/**
8+
* Checks if the provided environment indicates local Supabase.
9+
* Use when you have access to an env record (e.g., from PlatformAdapter).
10+
*/
11+
export function isLocalSupabaseEnv(env: Record<string, string | undefined>): boolean {
12+
const anonKey = env['SUPABASE_ANON_KEY'];
13+
const serviceRoleKey = env['SUPABASE_SERVICE_ROLE_KEY'];
1014
return anonKey === KNOWN_LOCAL_ANON_KEY ||
1115
serviceRoleKey === KNOWN_LOCAL_SERVICE_ROLE_KEY;
1216
}
17+
18+
export function isLocalSupabase(): boolean {
19+
return isLocalSupabaseEnv(Deno.env.toObject());
20+
}

pkgs/edge-worker/tests/integration/_helpers.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export function createTestPlatformAdapter(sql: postgres.Sql): PlatformAdapter<Su
4141
get shutdownSignal() { return abortController.signal; },
4242
get platformResources() { return platformResources; },
4343
get connectionString() { return DEFAULT_TEST_SUPABASE_ENV.EDGE_WORKER_DB_URL; },
44+
get isLocalEnvironment() { return false; },
4445
async startWorker(_createWorkerFn: CreateWorkerFn) {},
4546
async stopWorker() {},
4647
};

pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,6 @@ import { Flow } from '@pgflow/dsl';
44
import { delay } from '@std/async';
55
import { createFlowWorker } from '../../../src/flow/createFlowWorker.ts';
66
import { createTestPlatformAdapter } from '../_helpers.ts';
7-
import {
8-
KNOWN_LOCAL_ANON_KEY,
9-
KNOWN_LOCAL_SERVICE_ROLE_KEY,
10-
} from '../../../src/shared/localDetection.ts';
117
import type { postgres } from '../../sql.ts';
128

139
// Define a minimal test flow
@@ -26,19 +22,15 @@ function createLogger(module: string) {
2622
};
2723
}
2824

29-
function createPlatformAdapterWithEnv(
25+
function createPlatformAdapterWithLocalEnv(
3026
sql: postgres.Sql,
31-
envOverrides: Record<string, string> = {}
27+
isLocal: boolean
3228
) {
3329
const baseAdapter = createTestPlatformAdapter(sql);
34-
const modifiedEnv = {
35-
...baseAdapter.env,
36-
...envOverrides,
37-
};
3830

3931
return {
4032
...baseAdapter,
41-
get env() { return modifiedEnv; },
33+
get isLocalEnvironment() { return isLocal; },
4234
};
4335
}
4436

@@ -64,7 +56,7 @@ Deno.test(
6456
pollIntervalMs: 200,
6557
},
6658
createLogger,
67-
createPlatformAdapterWithEnv(sql)
59+
createPlatformAdapterWithLocalEnv(sql, false)
6860
);
6961

7062
try {
@@ -121,7 +113,7 @@ Deno.test(
121113
pollIntervalMs: 200,
122114
},
123115
createLogger,
124-
createPlatformAdapterWithEnv(sql)
116+
createPlatformAdapterWithLocalEnv(sql, false)
125117
);
126118

127119
try {
@@ -155,11 +147,8 @@ Deno.test(
155147
await sql`SELECT pgflow.add_step('test_compilation_flow', 'double')`;
156148
await sql`SELECT pgflow.add_step('test_compilation_flow', 'different_step', deps_slugs => ARRAY['double']::text[])`;
157149

158-
// Use non-local keys to simulate production mode
159-
const platformAdapter = createPlatformAdapterWithEnv(sql, {
160-
SUPABASE_ANON_KEY: 'prod-anon-key-not-local',
161-
SUPABASE_SERVICE_ROLE_KEY: 'prod-service-key-not-local',
162-
});
150+
// Use isLocal: false to simulate production mode
151+
const platformAdapter = createPlatformAdapterWithLocalEnv(sql, false);
163152

164153
const worker = createFlowWorker(
165154
TestCompilationFlow, // Has only 'double' step
@@ -220,11 +209,8 @@ Deno.test(
220209
await sql`SELECT pgflow.create_flow('test_compilation_flow')`;
221210
await sql`SELECT pgflow.add_step('test_compilation_flow', 'old_step')`;
222211

223-
// Use local keys to simulate development mode
224-
const platformAdapter = createPlatformAdapterWithEnv(sql, {
225-
SUPABASE_ANON_KEY: KNOWN_LOCAL_ANON_KEY,
226-
SUPABASE_SERVICE_ROLE_KEY: KNOWN_LOCAL_SERVICE_ROLE_KEY,
227-
});
212+
// Use isLocal: true to simulate development mode
213+
const platformAdapter = createPlatformAdapterWithLocalEnv(sql, true);
228214

229215
const worker = createFlowWorker(
230216
TestCompilationFlow, // Has 'double' step, not 'old_step'

pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { assertEquals, assertThrows } from '@std/assert';
22
import { FlowWorkerLifecycle } from '../../src/flow/FlowWorkerLifecycle.ts';
33
import { TransitionError } from '../../src/core/WorkerState.ts';
4-
import { Queries } from '../../src/core/Queries.ts';
4+
import { Queries, type EnsureFlowCompiledResult } from '../../src/core/Queries.ts';
55
import type { WorkerRow } from '../../src/core/types.ts';
6-
import type { AnyFlow } from '@pgflow/dsl';
6+
import { Flow, type FlowShape } from '@pgflow/dsl';
77
import type { Logger } from '../../src/platform/types.ts';
88
import { createLoggingFactory } from '../../src/platform/logging.ts';
99
import type { postgres } from '../sql.ts';
@@ -44,15 +44,21 @@ class MockQueries extends Queries {
4444
this.workerStopped = true;
4545
return Promise.resolve(workerRow);
4646
}
47+
48+
override ensureFlowCompiled(
49+
_flowSlug: string,
50+
_shape: FlowShape,
51+
_mode: 'development' | 'production'
52+
): Promise<EnsureFlowCompiledResult> {
53+
return Promise.resolve({ status: 'verified', differences: [] });
54+
}
4755
}
4856

49-
// Mock Flow
50-
const createMockFlow = (): AnyFlow => {
51-
// Return a minimal flow structure that matches AnyFlow type
52-
return {
53-
slug: 'test-flow',
54-
} as AnyFlow;
55-
};
57+
// Real Flow for testing - using the DSL to create a valid flow
58+
const TestFlow = new Flow<{ value: number }>({ slug: 'test_flow' })
59+
.step({ slug: 'step1' }, (input) => input.run.value);
60+
61+
const createMockFlow = () => TestFlow;
5662

5763

5864
Deno.test('FlowWorkerLifecycle - should transition to deprecated state when heartbeat returns is_deprecated true', async () => {
@@ -212,7 +218,7 @@ Deno.test('FlowWorkerLifecycle - queueName should return flow slug', () => {
212218
const mockFlow = createMockFlow();
213219
const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger);
214220

215-
assertEquals(lifecycle.queueName, 'test-flow');
221+
assertEquals(lifecycle.queueName, 'test_flow');
216222
});
217223

218224
Deno.test('FlowWorkerLifecycle - workerId getter should work after start', async () => {

pkgs/edge-worker/tests/unit/shared/localDetection.test.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { assertEquals } from '@std/assert';
22
import {
33
isLocalSupabase,
4+
isLocalSupabaseEnv,
45
KNOWN_LOCAL_ANON_KEY,
56
KNOWN_LOCAL_SERVICE_ROLE_KEY,
67
} from '../../../src/shared/localDetection.ts';
@@ -127,3 +128,61 @@ Deno.test('isLocalSupabase - returns true when only service role matches (anon i
127128
assertEquals(isLocalSupabase(), true);
128129
});
129130
});
131+
132+
// ============================================================
133+
// isLocalSupabaseEnv() tests
134+
// ============================================================
135+
136+
Deno.test('isLocalSupabaseEnv - returns true when anon key matches local', () => {
137+
const env = { SUPABASE_ANON_KEY: KNOWN_LOCAL_ANON_KEY };
138+
assertEquals(isLocalSupabaseEnv(env), true);
139+
});
140+
141+
Deno.test('isLocalSupabaseEnv - returns true when service role key matches local', () => {
142+
const env = { SUPABASE_SERVICE_ROLE_KEY: KNOWN_LOCAL_SERVICE_ROLE_KEY };
143+
assertEquals(isLocalSupabaseEnv(env), true);
144+
});
145+
146+
Deno.test('isLocalSupabaseEnv - returns true when both keys match local', () => {
147+
const env = {
148+
SUPABASE_ANON_KEY: KNOWN_LOCAL_ANON_KEY,
149+
SUPABASE_SERVICE_ROLE_KEY: KNOWN_LOCAL_SERVICE_ROLE_KEY,
150+
};
151+
assertEquals(isLocalSupabaseEnv(env), true);
152+
});
153+
154+
Deno.test('isLocalSupabaseEnv - returns false for non-local keys', () => {
155+
const env = {
156+
SUPABASE_ANON_KEY: 'prod-key',
157+
SUPABASE_SERVICE_ROLE_KEY: 'prod-service-key',
158+
};
159+
assertEquals(isLocalSupabaseEnv(env), false);
160+
});
161+
162+
Deno.test('isLocalSupabaseEnv - returns false for empty env', () => {
163+
assertEquals(isLocalSupabaseEnv({}), false);
164+
});
165+
166+
Deno.test('isLocalSupabaseEnv - returns false for undefined values', () => {
167+
const env = {
168+
SUPABASE_ANON_KEY: undefined,
169+
SUPABASE_SERVICE_ROLE_KEY: undefined,
170+
};
171+
assertEquals(isLocalSupabaseEnv(env), false);
172+
});
173+
174+
Deno.test('isLocalSupabaseEnv - returns true when only anon key matches (service is prod)', () => {
175+
const env = {
176+
SUPABASE_ANON_KEY: KNOWN_LOCAL_ANON_KEY,
177+
SUPABASE_SERVICE_ROLE_KEY: 'prod-service-key',
178+
};
179+
assertEquals(isLocalSupabaseEnv(env), true);
180+
});
181+
182+
Deno.test('isLocalSupabaseEnv - returns true when only service role matches (anon is prod)', () => {
183+
const env = {
184+
SUPABASE_ANON_KEY: 'prod-anon-key',
185+
SUPABASE_SERVICE_ROLE_KEY: KNOWN_LOCAL_SERVICE_ROLE_KEY,
186+
};
187+
assertEquals(isLocalSupabaseEnv(env), true);
188+
});

0 commit comments

Comments
 (0)