Skip to content

Commit 4074b3f

Browse files
committed
move local environment detection from FlowWorkerLifecycle to PlatformAdapter
1 parent 1bef6d0 commit 4074b3f

File tree

10 files changed

+129
-75
lines changed

10 files changed

+129
-75
lines changed

pkgs/edge-worker/src/core/Queries.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type postgres from 'postgres';
22
import type { WorkerRow } from './types.js';
3-
import type { FlowShape } from '@pgflow/dsl';
3+
import type { FlowShape, Json } from '@pgflow/dsl';
44

55
export type EnsureFlowCompiledStatus = 'compiled' | 'verified' | 'recompiled' | 'mismatch';
66

@@ -57,11 +57,19 @@ export class Queries {
5757
shape: FlowShape,
5858
mode: 'development' | 'production'
5959
): Promise<EnsureFlowCompiledResult> {
60-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
60+
// SAFETY: FlowShape is JSON-compatible by construction (only strings, numbers,
61+
// arrays, and plain objects), but TypeScript can't prove this because FlowShape
62+
// uses specific property names while Json uses index signatures. This cast is
63+
// safe because we control both sides: extractFlowShape() builds the object and
64+
// this method consumes it - no untrusted input crosses this boundary.
65+
//
66+
// TODO: If FlowShape ever becomes part of a public API or accepts external input,
67+
// add a runtime assertion function (assertJsonCompatible) to validate at the boundary.
68+
const shapeJson = this.sql.json(shape as unknown as Json);
6169
const [result] = await this.sql<{ result: EnsureFlowCompiledResult }[]>`
6270
SELECT pgflow.ensure_flow_compiled(
6371
${flowSlug},
64-
${this.sql.json(shape as any)}::jsonb,
72+
${shapeJson}::jsonb,
6573
${mode}
6674
) as result
6775
`;

pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,11 @@ import type { Logger } from '../platform/types.js';
44
import { States, WorkerState } from '../core/WorkerState.js';
55
import type { AnyFlow } from '@pgflow/dsl';
66
import { extractFlowShape } from '@pgflow/dsl';
7-
import {
8-
isLocalSupabase,
9-
KNOWN_LOCAL_ANON_KEY,
10-
KNOWN_LOCAL_SERVICE_ROLE_KEY,
11-
} from '../shared/localDetection.js';
127
import { FlowShapeMismatchError } from './errors.js';
138

149
export interface FlowLifecycleConfig {
1510
heartbeatInterval?: number;
16-
env?: Record<string, string | undefined>;
11+
isLocalEnvironment?: boolean;
1712
}
1813

1914
/**
@@ -29,15 +24,15 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
2924
private _workerId?: string;
3025
private heartbeatInterval: number;
3126
private lastHeartbeat = 0;
32-
private env?: Record<string, string | undefined>;
27+
private isLocalEnvironment: boolean;
3328

3429
constructor(queries: Queries, flow: TFlow, logger: Logger, config?: FlowLifecycleConfig) {
3530
this.queries = queries;
3631
this.flow = flow;
3732
this.logger = logger;
3833
this.workerState = new WorkerState(logger);
3934
this.heartbeatInterval = config?.heartbeatInterval ?? 5000;
40-
this.env = config?.env;
35+
this.isLocalEnvironment = config?.isLocalEnvironment ?? false;
4136
}
4237

4338
async acknowledgeStart(workerBootstrap: WorkerBootstrap): Promise<void> {
@@ -59,10 +54,10 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
5954
}
6055

6156
private async ensureFlowCompiled(): Promise<void> {
62-
this.logger.info(`Ensuring flow '${this.flow.slug}' is compiled...`);
57+
const mode = this.isLocalEnvironment ? 'development' : 'production';
58+
this.logger.info(`Compiling flow '${this.flow.slug}' (mode: ${mode})...`);
6359

6460
const shape = extractFlowShape(this.flow);
65-
const mode = this.detectCompilationMode();
6661

6762
const result = await this.queries.ensureFlowCompiled(
6863
this.flow.slug,
@@ -74,19 +69,7 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
7469
throw new FlowShapeMismatchError(this.flow.slug, result.differences);
7570
}
7671

77-
this.logger.info(`Flow '${this.flow.slug}' ${result.status} (mode: ${mode})`);
78-
}
79-
80-
private detectCompilationMode(): 'development' | 'production' {
81-
// Use provided env if available, otherwise fall back to global detection
82-
if (this.env) {
83-
const anonKey = this.env['SUPABASE_ANON_KEY'];
84-
const serviceRoleKey = this.env['SUPABASE_SERVICE_ROLE_KEY'];
85-
const isLocal = anonKey === KNOWN_LOCAL_ANON_KEY ||
86-
serviceRoleKey === KNOWN_LOCAL_SERVICE_ROLE_KEY;
87-
return isLocal ? 'development' : 'production';
88-
}
89-
return isLocalSupabase() ? 'development' : 'production';
72+
this.logger.info(`Flow '${this.flow.slug}' compilation complete: ${result.status}`);
9073
}
9174

9275
acknowledgeStop() {

pkgs/edge-worker/src/flow/createFlowWorker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ export function createFlowWorker<TFlow extends AnyFlow, TResources extends Recor
9090
queries,
9191
flow,
9292
createLogger('FlowWorkerLifecycle'),
93-
{ env: platformAdapter.env }
93+
{ isLocalEnvironment: platformAdapter.isLocalEnvironment }
9494
);
9595

9696
// Create frozen worker config ONCE for reuse across all task executions

pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type { SupabaseResources } from '@pgflow/dsl/supabase';
66
import { createSql } from '../core/sql-factory.js';
77
import { createServiceSupabaseClient } from '../core/supabase-utils.js';
88
import { createLoggingFactory } from './logging.js';
9+
import { isLocalSupabaseEnv } from '../shared/localDetection.js';
910

1011
/**
1112
* Supabase Edge Runtime type (without global augmentation to comply with JSR)
@@ -50,7 +51,7 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
5051
const env = Deno.env.toObject();
5152
this.assertSupabaseEnv(env);
5253
this.validatedEnv = env;
53-
54+
5455
// Create abort controller for shutdown signal
5556
this.abortController = new AbortController();
5657

@@ -139,6 +140,13 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
139140
return this._platformResources;
140141
}
141142

143+
/**
144+
* Whether running in a local/development environment.
145+
*/
146+
get isLocalEnvironment(): boolean {
147+
return isLocalSupabaseEnv(this.validatedEnv);
148+
}
149+
142150
private async spawnNewEdgeFunction(): Promise<void> {
143151
if (!this.edgeFunctionName) {
144152
throw new Error('functionName cannot be null or empty');

pkgs/edge-worker/src/platform/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,10 @@ export interface PlatformAdapter<TResources extends Record<string, unknown> = Re
5656
*/
5757
get platformResources(): TResources;
5858

59+
/**
60+
* Whether running in a local/development environment.
61+
* Used by flow compilation to determine if recompilation is allowed.
62+
*/
63+
get isLocalEnvironment(): boolean;
64+
5965
}

pkgs/edge-worker/src/shared/localDetection.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,17 @@ export const KNOWN_LOCAL_ANON_KEY =
44
export const KNOWN_LOCAL_SERVICE_ROLE_KEY =
55
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';
66

7-
export function isLocalSupabase(): boolean {
8-
const anonKey = Deno.env.get('SUPABASE_ANON_KEY');
9-
const serviceRoleKey = Deno.env.get('SUPABASE_SERVICE_ROLE_KEY');
7+
/**
8+
* Checks if the provided environment indicates local Supabase.
9+
* Use when you have access to an env record (e.g., from PlatformAdapter).
10+
*/
11+
export function isLocalSupabaseEnv(env: Record<string, string | undefined>): boolean {
12+
const anonKey = env['SUPABASE_ANON_KEY'];
13+
const serviceRoleKey = env['SUPABASE_SERVICE_ROLE_KEY'];
1014
return anonKey === KNOWN_LOCAL_ANON_KEY ||
1115
serviceRoleKey === KNOWN_LOCAL_SERVICE_ROLE_KEY;
1216
}
17+
18+
export function isLocalSupabase(): boolean {
19+
return isLocalSupabaseEnv(Deno.env.toObject());
20+
}

pkgs/edge-worker/tests/integration/_helpers.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export function createTestPlatformAdapter(sql: postgres.Sql): PlatformAdapter<Su
4141
get shutdownSignal() { return abortController.signal; },
4242
get platformResources() { return platformResources; },
4343
get connectionString() { return DEFAULT_TEST_SUPABASE_ENV.EDGE_WORKER_DB_URL; },
44+
get isLocalEnvironment() { return false; },
4445
async startWorker(_createWorkerFn: CreateWorkerFn) {},
4546
async stopWorker() {},
4647
};

pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts

Lines changed: 9 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,6 @@ import { Flow } from '@pgflow/dsl';
44
import { delay } from '@std/async';
55
import { createFlowWorker } from '../../../src/flow/createFlowWorker.ts';
66
import { createTestPlatformAdapter } from '../_helpers.ts';
7-
import {
8-
KNOWN_LOCAL_ANON_KEY,
9-
KNOWN_LOCAL_SERVICE_ROLE_KEY,
10-
} from '../../../src/shared/localDetection.ts';
117
import type { postgres } from '../../sql.ts';
128

139
// Define a minimal test flow
@@ -17,17 +13,6 @@ const TestCompilationFlow = new Flow<{ value: number }>({ slug: 'test_compilatio
1713
return input.run.value * 2;
1814
});
1915

20-
// Define a modified version with different structure (for mismatch testing)
21-
const TestCompilationFlowModified = new Flow<{ value: number }>({ slug: 'test_compilation_flow' })
22-
.step({ slug: 'double' }, async (input) => {
23-
await delay(1);
24-
return input.run.value * 2;
25-
})
26-
.step({ slug: 'triple', dependsOn: ['double'] }, async (input) => {
27-
await delay(1);
28-
return input.double * 3;
29-
});
30-
3116
function createLogger(module: string) {
3217
return {
3318
debug: console.log.bind(console, `[${module}]`),
@@ -37,19 +22,15 @@ function createLogger(module: string) {
3722
};
3823
}
3924

40-
function createPlatformAdapterWithEnv(
25+
function createPlatformAdapterWithLocalEnv(
4126
sql: postgres.Sql,
42-
envOverrides: Record<string, string> = {}
27+
isLocal: boolean
4328
) {
4429
const baseAdapter = createTestPlatformAdapter(sql);
45-
const modifiedEnv = {
46-
...baseAdapter.env,
47-
...envOverrides,
48-
};
4930

5031
return {
5132
...baseAdapter,
52-
get env() { return modifiedEnv; },
33+
get isLocalEnvironment() { return isLocal; },
5334
};
5435
}
5536

@@ -75,7 +56,7 @@ Deno.test(
7556
pollIntervalMs: 200,
7657
},
7758
createLogger,
78-
createPlatformAdapterWithEnv(sql)
59+
createPlatformAdapterWithLocalEnv(sql, false)
7960
);
8061

8162
try {
@@ -132,7 +113,7 @@ Deno.test(
132113
pollIntervalMs: 200,
133114
},
134115
createLogger,
135-
createPlatformAdapterWithEnv(sql)
116+
createPlatformAdapterWithLocalEnv(sql, false)
136117
);
137118

138119
try {
@@ -166,11 +147,8 @@ Deno.test(
166147
await sql`SELECT pgflow.add_step('test_compilation_flow', 'double')`;
167148
await sql`SELECT pgflow.add_step('test_compilation_flow', 'different_step', deps_slugs => ARRAY['double']::text[])`;
168149

169-
// Use non-local keys to simulate production mode
170-
const platformAdapter = createPlatformAdapterWithEnv(sql, {
171-
SUPABASE_ANON_KEY: 'prod-anon-key-not-local',
172-
SUPABASE_SERVICE_ROLE_KEY: 'prod-service-key-not-local',
173-
});
150+
// Use isLocal: false to simulate production mode
151+
const platformAdapter = createPlatformAdapterWithLocalEnv(sql, false);
174152

175153
const worker = createFlowWorker(
176154
TestCompilationFlow, // Has only 'double' step
@@ -231,11 +209,8 @@ Deno.test(
231209
await sql`SELECT pgflow.create_flow('test_compilation_flow')`;
232210
await sql`SELECT pgflow.add_step('test_compilation_flow', 'old_step')`;
233211

234-
// Use local keys to simulate development mode
235-
const platformAdapter = createPlatformAdapterWithEnv(sql, {
236-
SUPABASE_ANON_KEY: KNOWN_LOCAL_ANON_KEY,
237-
SUPABASE_SERVICE_ROLE_KEY: KNOWN_LOCAL_SERVICE_ROLE_KEY,
238-
});
212+
// Use isLocal: true to simulate development mode
213+
const platformAdapter = createPlatformAdapterWithLocalEnv(sql, true);
239214

240215
const worker = createFlowWorker(
241216
TestCompilationFlow, // Has 'double' step, not 'old_step'

pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { assertEquals, assertThrows } from '@std/assert';
22
import { FlowWorkerLifecycle } from '../../src/flow/FlowWorkerLifecycle.ts';
33
import { TransitionError } from '../../src/core/WorkerState.ts';
4-
import { Queries } from '../../src/core/Queries.ts';
4+
import { Queries, type EnsureFlowCompiledResult } from '../../src/core/Queries.ts';
55
import type { WorkerRow } from '../../src/core/types.ts';
6-
import type { AnyFlow } from '@pgflow/dsl';
6+
import { Flow, type FlowShape } from '@pgflow/dsl';
77
import type { Logger } from '../../src/platform/types.ts';
88
import { createLoggingFactory } from '../../src/platform/logging.ts';
99
import type { postgres } from '../sql.ts';
@@ -44,15 +44,21 @@ class MockQueries extends Queries {
4444
this.workerStopped = true;
4545
return Promise.resolve(workerRow);
4646
}
47+
48+
override ensureFlowCompiled(
49+
_flowSlug: string,
50+
_shape: FlowShape,
51+
_mode: 'development' | 'production'
52+
): Promise<EnsureFlowCompiledResult> {
53+
return Promise.resolve({ status: 'verified', differences: [] });
54+
}
4755
}
4856

49-
// Mock Flow
50-
const createMockFlow = (): AnyFlow => {
51-
// Return a minimal flow structure that matches AnyFlow type
52-
return {
53-
slug: 'test-flow',
54-
} as AnyFlow;
55-
};
57+
// Real Flow for testing - using the DSL to create a valid flow
58+
const TestFlow = new Flow<{ value: number }>({ slug: 'test_flow' })
59+
.step({ slug: 'step1' }, (input) => input.run.value);
60+
61+
const createMockFlow = () => TestFlow;
5662

5763

5864
Deno.test('FlowWorkerLifecycle - should transition to deprecated state when heartbeat returns is_deprecated true', async () => {
@@ -212,7 +218,7 @@ Deno.test('FlowWorkerLifecycle - queueName should return flow slug', () => {
212218
const mockFlow = createMockFlow();
213219
const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger);
214220

215-
assertEquals(lifecycle.queueName, 'test-flow');
221+
assertEquals(lifecycle.queueName, 'test_flow');
216222
});
217223

218224
Deno.test('FlowWorkerLifecycle - workerId getter should work after start', async () => {

pkgs/edge-worker/tests/unit/shared/localDetection.test.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { assertEquals } from '@std/assert';
22
import {
33
isLocalSupabase,
4+
isLocalSupabaseEnv,
45
KNOWN_LOCAL_ANON_KEY,
56
KNOWN_LOCAL_SERVICE_ROLE_KEY,
67
} from '../../../src/shared/localDetection.ts';
@@ -127,3 +128,61 @@ Deno.test('isLocalSupabase - returns true when only service role matches (anon i
127128
assertEquals(isLocalSupabase(), true);
128129
});
129130
});
131+
132+
// ============================================================
133+
// isLocalSupabaseEnv() tests
134+
// ============================================================
135+
136+
Deno.test('isLocalSupabaseEnv - returns true when anon key matches local', () => {
137+
const env = { SUPABASE_ANON_KEY: KNOWN_LOCAL_ANON_KEY };
138+
assertEquals(isLocalSupabaseEnv(env), true);
139+
});
140+
141+
Deno.test('isLocalSupabaseEnv - returns true when service role key matches local', () => {
142+
const env = { SUPABASE_SERVICE_ROLE_KEY: KNOWN_LOCAL_SERVICE_ROLE_KEY };
143+
assertEquals(isLocalSupabaseEnv(env), true);
144+
});
145+
146+
Deno.test('isLocalSupabaseEnv - returns true when both keys match local', () => {
147+
const env = {
148+
SUPABASE_ANON_KEY: KNOWN_LOCAL_ANON_KEY,
149+
SUPABASE_SERVICE_ROLE_KEY: KNOWN_LOCAL_SERVICE_ROLE_KEY,
150+
};
151+
assertEquals(isLocalSupabaseEnv(env), true);
152+
});
153+
154+
Deno.test('isLocalSupabaseEnv - returns false for non-local keys', () => {
155+
const env = {
156+
SUPABASE_ANON_KEY: 'prod-key',
157+
SUPABASE_SERVICE_ROLE_KEY: 'prod-service-key',
158+
};
159+
assertEquals(isLocalSupabaseEnv(env), false);
160+
});
161+
162+
Deno.test('isLocalSupabaseEnv - returns false for empty env', () => {
163+
assertEquals(isLocalSupabaseEnv({}), false);
164+
});
165+
166+
Deno.test('isLocalSupabaseEnv - returns false for undefined values', () => {
167+
const env = {
168+
SUPABASE_ANON_KEY: undefined,
169+
SUPABASE_SERVICE_ROLE_KEY: undefined,
170+
};
171+
assertEquals(isLocalSupabaseEnv(env), false);
172+
});
173+
174+
Deno.test('isLocalSupabaseEnv - returns true when only anon key matches (service is prod)', () => {
175+
const env = {
176+
SUPABASE_ANON_KEY: KNOWN_LOCAL_ANON_KEY,
177+
SUPABASE_SERVICE_ROLE_KEY: 'prod-service-key',
178+
};
179+
assertEquals(isLocalSupabaseEnv(env), true);
180+
});
181+
182+
Deno.test('isLocalSupabaseEnv - returns true when only service role matches (anon is prod)', () => {
183+
const env = {
184+
SUPABASE_ANON_KEY: 'prod-anon-key',
185+
SUPABASE_SERVICE_ROLE_KEY: KNOWN_LOCAL_SERVICE_ROLE_KEY,
186+
};
187+
assertEquals(isLocalSupabaseEnv(env), true);
188+
});

0 commit comments

Comments
 (0)