Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 6 additions & 23 deletions pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,11 @@ import type { Logger } from '../platform/types.js';
import { States, WorkerState } from '../core/WorkerState.js';
import type { AnyFlow } from '@pgflow/dsl';
import { extractFlowShape } from '@pgflow/dsl';
import {
isLocalSupabase,
KNOWN_LOCAL_ANON_KEY,
KNOWN_LOCAL_SERVICE_ROLE_KEY,
} from '../shared/localDetection.js';
import { FlowShapeMismatchError } from './errors.js';

export interface FlowLifecycleConfig {
heartbeatInterval?: number;
env?: Record<string, string | undefined>;
isLocalEnvironment?: boolean;
}

/**
Expand All @@ -29,15 +24,15 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
private _workerId?: string;
private heartbeatInterval: number;
private lastHeartbeat = 0;
private env?: Record<string, string | undefined>;
private isLocalEnvironment: boolean;

constructor(queries: Queries, flow: TFlow, logger: Logger, config?: FlowLifecycleConfig) {
this.queries = queries;
this.flow = flow;
this.logger = logger;
this.workerState = new WorkerState(logger);
this.heartbeatInterval = config?.heartbeatInterval ?? 5000;
this.env = config?.env;
this.isLocalEnvironment = config?.isLocalEnvironment ?? false;
}

async acknowledgeStart(workerBootstrap: WorkerBootstrap): Promise<void> {
Expand All @@ -59,10 +54,10 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
}

private async ensureFlowCompiled(): Promise<void> {
this.logger.info(`Ensuring flow '${this.flow.slug}' is compiled...`);
const mode = this.isLocalEnvironment ? 'development' : 'production';
this.logger.info(`Compiling flow '${this.flow.slug}' (mode: ${mode})...`);

const shape = extractFlowShape(this.flow);
const mode = this.detectCompilationMode();

const result = await this.queries.ensureFlowCompiled(
this.flow.slug,
Expand All @@ -74,19 +69,7 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
throw new FlowShapeMismatchError(this.flow.slug, result.differences);
}

this.logger.info(`Flow '${this.flow.slug}' ${result.status} (mode: ${mode})`);
}

private detectCompilationMode(): 'development' | 'production' {
// Use provided env if available, otherwise fall back to global detection
if (this.env) {
const anonKey = this.env['SUPABASE_ANON_KEY'];
const serviceRoleKey = this.env['SUPABASE_SERVICE_ROLE_KEY'];
const isLocal = anonKey === KNOWN_LOCAL_ANON_KEY ||
serviceRoleKey === KNOWN_LOCAL_SERVICE_ROLE_KEY;
return isLocal ? 'development' : 'production';
}
return isLocalSupabase() ? 'development' : 'production';
this.logger.info(`Flow '${this.flow.slug}' compilation complete: ${result.status}`);
}

acknowledgeStop() {
Expand Down
2 changes: 1 addition & 1 deletion pkgs/edge-worker/src/flow/createFlowWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export function createFlowWorker<TFlow extends AnyFlow, TResources extends Recor
queries,
flow,
createLogger('FlowWorkerLifecycle'),
{ env: platformAdapter.env }
{ isLocalEnvironment: platformAdapter.isLocalEnvironment }
);

// Create frozen worker config ONCE for reuse across all task executions
Expand Down
10 changes: 9 additions & 1 deletion pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { SupabaseResources } from '@pgflow/dsl/supabase';
import { createSql } from '../core/sql-factory.js';
import { createServiceSupabaseClient } from '../core/supabase-utils.js';
import { createLoggingFactory } from './logging.js';
import { isLocalSupabaseEnv } from '../shared/localDetection.js';

/**
* Supabase Edge Runtime type (without global augmentation to comply with JSR)
Expand Down Expand Up @@ -50,7 +51,7 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
const env = Deno.env.toObject();
this.assertSupabaseEnv(env);
this.validatedEnv = env;

// Create abort controller for shutdown signal
this.abortController = new AbortController();

Expand Down Expand Up @@ -139,6 +140,13 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
return this._platformResources;
}

/**
* Whether running in a local/development environment.
*/
get isLocalEnvironment(): boolean {
return isLocalSupabaseEnv(this.validatedEnv);
}

private async spawnNewEdgeFunction(): Promise<void> {
if (!this.edgeFunctionName) {
throw new Error('functionName cannot be null or empty');
Expand Down
6 changes: 6 additions & 0 deletions pkgs/edge-worker/src/platform/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,10 @@ export interface PlatformAdapter<TResources extends Record<string, unknown> = Re
*/
get platformResources(): TResources;

/**
* Whether running in a local/development environment.
* Used by flow compilation to determine if recompilation is allowed.
*/
get isLocalEnvironment(): boolean;

}
14 changes: 11 additions & 3 deletions pkgs/edge-worker/src/shared/localDetection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,17 @@ export const KNOWN_LOCAL_ANON_KEY =
export const KNOWN_LOCAL_SERVICE_ROLE_KEY =
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';

export function isLocalSupabase(): boolean {
const anonKey = Deno.env.get('SUPABASE_ANON_KEY');
const serviceRoleKey = Deno.env.get('SUPABASE_SERVICE_ROLE_KEY');
/**
* Checks if the provided environment indicates local Supabase.
* Use when you have access to an env record (e.g., from PlatformAdapter).
*/
export function isLocalSupabaseEnv(env: Record<string, string | undefined>): boolean {
const anonKey = env['SUPABASE_ANON_KEY'];
const serviceRoleKey = env['SUPABASE_SERVICE_ROLE_KEY'];
return anonKey === KNOWN_LOCAL_ANON_KEY ||
serviceRoleKey === KNOWN_LOCAL_SERVICE_ROLE_KEY;
}

export function isLocalSupabase(): boolean {
return isLocalSupabaseEnv(Deno.env.toObject());
}
1 change: 1 addition & 0 deletions pkgs/edge-worker/tests/integration/_helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export function createTestPlatformAdapter(sql: postgres.Sql): PlatformAdapter<Su
get shutdownSignal() { return abortController.signal; },
get platformResources() { return platformResources; },
get connectionString() { return DEFAULT_TEST_SUPABASE_ENV.EDGE_WORKER_DB_URL; },
get isLocalEnvironment() { return false; },
async startWorker(_createWorkerFn: CreateWorkerFn) {},
async stopWorker() {},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ import { Flow } from '@pgflow/dsl';
import { delay } from '@std/async';
import { createFlowWorker } from '../../../src/flow/createFlowWorker.ts';
import { createTestPlatformAdapter } from '../_helpers.ts';
import {
KNOWN_LOCAL_ANON_KEY,
KNOWN_LOCAL_SERVICE_ROLE_KEY,
} from '../../../src/shared/localDetection.ts';
import type { postgres } from '../../sql.ts';

// Define a minimal test flow
Expand All @@ -26,19 +22,15 @@ function createLogger(module: string) {
};
}

function createPlatformAdapterWithEnv(
function createPlatformAdapterWithLocalEnv(
sql: postgres.Sql,
envOverrides: Record<string, string> = {}
isLocal: boolean
) {
const baseAdapter = createTestPlatformAdapter(sql);
const modifiedEnv = {
...baseAdapter.env,
...envOverrides,
};

return {
...baseAdapter,
get env() { return modifiedEnv; },
get isLocalEnvironment() { return isLocal; },
};
}

Expand All @@ -64,7 +56,7 @@ Deno.test(
pollIntervalMs: 200,
},
createLogger,
createPlatformAdapterWithEnv(sql)
createPlatformAdapterWithLocalEnv(sql, false)
);

try {
Expand Down Expand Up @@ -121,7 +113,7 @@ Deno.test(
pollIntervalMs: 200,
},
createLogger,
createPlatformAdapterWithEnv(sql)
createPlatformAdapterWithLocalEnv(sql, false)
);

try {
Expand Down Expand Up @@ -155,11 +147,8 @@ Deno.test(
await sql`SELECT pgflow.add_step('test_compilation_flow', 'double')`;
await sql`SELECT pgflow.add_step('test_compilation_flow', 'different_step', deps_slugs => ARRAY['double']::text[])`;

// Use non-local keys to simulate production mode
const platformAdapter = createPlatformAdapterWithEnv(sql, {
SUPABASE_ANON_KEY: 'prod-anon-key-not-local',
SUPABASE_SERVICE_ROLE_KEY: 'prod-service-key-not-local',
});
// Use isLocal: false to simulate production mode
const platformAdapter = createPlatformAdapterWithLocalEnv(sql, false);

const worker = createFlowWorker(
TestCompilationFlow, // Has only 'double' step
Expand Down Expand Up @@ -220,11 +209,8 @@ Deno.test(
await sql`SELECT pgflow.create_flow('test_compilation_flow')`;
await sql`SELECT pgflow.add_step('test_compilation_flow', 'old_step')`;

// Use local keys to simulate development mode
const platformAdapter = createPlatformAdapterWithEnv(sql, {
SUPABASE_ANON_KEY: KNOWN_LOCAL_ANON_KEY,
SUPABASE_SERVICE_ROLE_KEY: KNOWN_LOCAL_SERVICE_ROLE_KEY,
});
// Use isLocal: true to simulate development mode
const platformAdapter = createPlatformAdapterWithLocalEnv(sql, true);

const worker = createFlowWorker(
TestCompilationFlow, // Has 'double' step, not 'old_step'
Expand Down
27 changes: 13 additions & 14 deletions pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { assertEquals, assertThrows } from '@std/assert';
import { FlowWorkerLifecycle } from '../../src/flow/FlowWorkerLifecycle.ts';
import { TransitionError } from '../../src/core/WorkerState.ts';
import { Queries } from '../../src/core/Queries.ts';
import { Queries, type EnsureFlowCompiledResult } from '../../src/core/Queries.ts';
import type { WorkerRow } from '../../src/core/types.ts';
import type { AnyFlow } from '@pgflow/dsl';
import { Flow, type FlowShape } from '@pgflow/dsl';
import type { Logger } from '../../src/platform/types.ts';
import { createLoggingFactory } from '../../src/platform/logging.ts';
import type { postgres } from '../sql.ts';
Expand Down Expand Up @@ -45,21 +45,20 @@ class MockQueries extends Queries {
return Promise.resolve(workerRow);
}

override ensureFlowCompiled(): Promise<{ status: 'compiled' | 'verified' | 'recompiled' | 'mismatch'; differences: string[] }> {
override ensureFlowCompiled(
_flowSlug: string,
_shape: FlowShape,
_mode: 'development' | 'production'
): Promise<EnsureFlowCompiledResult> {
return Promise.resolve({ status: 'verified', differences: [] });
}
}

// Mock Flow
const createMockFlow = (): AnyFlow => {
// Return a minimal flow structure needed for extractFlowShape
return {
slug: 'test-flow',
stepOrder: [],
options: {},
getStepDefinition: () => { throw new Error('No steps in mock flow'); },
} as unknown as AnyFlow;
};
// Real Flow for testing - using the DSL to create a valid flow
const TestFlow = new Flow<{ value: number }>({ slug: 'test_flow' })
.step({ slug: 'step1' }, (input) => input.run.value);

const createMockFlow = () => TestFlow;


Deno.test('FlowWorkerLifecycle - should transition to deprecated state when heartbeat returns is_deprecated true', async () => {
Expand Down Expand Up @@ -219,7 +218,7 @@ Deno.test('FlowWorkerLifecycle - queueName should return flow slug', () => {
const mockFlow = createMockFlow();
const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger);

assertEquals(lifecycle.queueName, 'test-flow');
assertEquals(lifecycle.queueName, 'test_flow');
});

Deno.test('FlowWorkerLifecycle - workerId getter should work after start', async () => {
Expand Down
59 changes: 59 additions & 0 deletions pkgs/edge-worker/tests/unit/shared/localDetection.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { assertEquals } from '@std/assert';
import {
isLocalSupabase,
isLocalSupabaseEnv,
KNOWN_LOCAL_ANON_KEY,
KNOWN_LOCAL_SERVICE_ROLE_KEY,
} from '../../../src/shared/localDetection.ts';
Expand Down Expand Up @@ -127,3 +128,61 @@ Deno.test('isLocalSupabase - returns true when only service role matches (anon i
assertEquals(isLocalSupabase(), true);
});
});

// ============================================================
// isLocalSupabaseEnv() tests
// ============================================================

Deno.test('isLocalSupabaseEnv - returns true when anon key matches local', () => {
const env = { SUPABASE_ANON_KEY: KNOWN_LOCAL_ANON_KEY };
assertEquals(isLocalSupabaseEnv(env), true);
});

Deno.test('isLocalSupabaseEnv - returns true when service role key matches local', () => {
const env = { SUPABASE_SERVICE_ROLE_KEY: KNOWN_LOCAL_SERVICE_ROLE_KEY };
assertEquals(isLocalSupabaseEnv(env), true);
});

Deno.test('isLocalSupabaseEnv - returns true when both keys match local', () => {
const env = {
SUPABASE_ANON_KEY: KNOWN_LOCAL_ANON_KEY,
SUPABASE_SERVICE_ROLE_KEY: KNOWN_LOCAL_SERVICE_ROLE_KEY,
};
assertEquals(isLocalSupabaseEnv(env), true);
});

Deno.test('isLocalSupabaseEnv - returns false for non-local keys', () => {
const env = {
SUPABASE_ANON_KEY: 'prod-key',
SUPABASE_SERVICE_ROLE_KEY: 'prod-service-key',
};
assertEquals(isLocalSupabaseEnv(env), false);
});

Deno.test('isLocalSupabaseEnv - returns false for empty env', () => {
assertEquals(isLocalSupabaseEnv({}), false);
});

Deno.test('isLocalSupabaseEnv - returns false for undefined values', () => {
const env = {
SUPABASE_ANON_KEY: undefined,
SUPABASE_SERVICE_ROLE_KEY: undefined,
};
assertEquals(isLocalSupabaseEnv(env), false);
});

Deno.test('isLocalSupabaseEnv - returns true when only anon key matches (service is prod)', () => {
const env = {
SUPABASE_ANON_KEY: KNOWN_LOCAL_ANON_KEY,
SUPABASE_SERVICE_ROLE_KEY: 'prod-service-key',
};
assertEquals(isLocalSupabaseEnv(env), true);
});

Deno.test('isLocalSupabaseEnv - returns true when only service role matches (anon is prod)', () => {
const env = {
SUPABASE_ANON_KEY: 'prod-anon-key',
SUPABASE_SERVICE_ROLE_KEY: KNOWN_LOCAL_SERVICE_ROLE_KEY,
};
assertEquals(isLocalSupabaseEnv(env), true);
});
Loading