From 2e949b1765d0a5f50e52a8dffe744525325b0a9f Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Sun, 30 Nov 2025 16:53:21 +0100 Subject: [PATCH] include options in FlowShape for proper flow creation --- PLAN.md | 38 ++++++ .../schemas/0100_function_create_flow.sql | 19 ++- .../0100_function_create_flow_from_shape.sql | 24 +++- ...130164844_pgflow_temp_options_in_shape.sql | 72 +++++++++++ pkgs/core/supabase/migrations/atlas.sum | 3 +- .../tests/create_flow/options.test.sql | 20 +++- .../options_compile.test.sql | 113 ++++++++++++++++++ pkgs/dsl/__tests__/runtime/flow-shape.test.ts | 85 +++++++++++-- pkgs/dsl/src/flow-shape.ts | 92 ++++++++++++-- 9 files changed, 436 insertions(+), 30 deletions(-) create mode 100644 pkgs/core/supabase/migrations/20251130164844_pgflow_temp_options_in_shape.sql create mode 100644 pkgs/core/supabase/tests/create_flow_from_shape/options_compile.test.sql diff --git a/PLAN.md b/PLAN.md index 60134ab69..c4fb38fcb 100644 --- a/PLAN.md +++ b/PLAN.md @@ -349,6 +349,44 @@ async function handleEnsureCompiled( --- +## Phase 3.5: Include Options in FlowShape + +Options (maxAttempts, baseDelay, timeout, startDelay) must be included in FlowShape for proper flow creation, while remaining excluded from shape comparison (options can be tuned at runtime without recompilation). + +### Problem + +`_create_flow_from_shape()` was using defaults instead of DSL-defined options, causing drift between: +- CLI path: `compileFlow()` -> SQL with options +- Runtime path: `extractFlowShape()` -> `_create_flow_from_shape()` -> defaults only + +### Solution + +1. **FlowShape includes options** - Added optional `options` field to FlowShape/StepShape +2. **NULL = use default** - Modified `create_flow()` to use COALESCE internally +3. **Pass options through** - Updated `_create_flow_from_shape()` to pass options from shape + +### Key Files + +| File | Change | +|------|--------| +| `pkgs/dsl/src/flow-shape.ts` | Add options to interfaces, update extractFlowShape() | +| `pkgs/core/schemas/0100_function_create_flow.sql` | NULL params = use default via COALESCE | +| `pkgs/core/schemas/0100_function_create_flow_from_shape.sql` | Pass options from shape | + +### Design Decision: NULL = Use Default + +SQL functions now treat NULL parameters as "use default" via COALESCE: + +```sql +-- In create_flow(): +INSERT INTO pgflow.flows (..., opt_max_attempts, ...) +VALUES (..., COALESCE(max_attempts, 3), ...); +``` + +This prevents drift - defaults are defined in ONE place (inside the function), not hardcoded by callers. + +--- + ## Phase 4: Worker Configuration (Edge-Worker Package) ### Modify: `pkgs/edge-worker/src/core/workerConfigTypes.ts` diff --git a/pkgs/core/schemas/0100_function_create_flow.sql b/pkgs/core/schemas/0100_function_create_flow.sql index 9fd5cc751..5c56312a3 100644 --- a/pkgs/core/schemas/0100_function_create_flow.sql +++ b/pkgs/core/schemas/0100_function_create_flow.sql @@ -1,8 +1,11 @@ +-- Create a new flow with optional configuration. +-- NULL parameters use defaults defined in the 'defaults' CTE below. +-- This allows callers to pass NULL to explicitly use the default value. create or replace function pgflow.create_flow( flow_slug text, - max_attempts int default 3, - base_delay int default 5, - timeout int default 60 + max_attempts int default null, + base_delay int default null, + timeout int default null ) returns pgflow.flows language sql @@ -10,9 +13,17 @@ set search_path to '' volatile as $$ WITH + defaults AS ( + SELECT 3 AS def_max_attempts, 5 AS def_base_delay, 60 AS def_timeout + ), flow_upsert AS ( INSERT INTO pgflow.flows (flow_slug, opt_max_attempts, opt_base_delay, opt_timeout) - VALUES (flow_slug, max_attempts, base_delay, timeout) + SELECT + flow_slug, + COALESCE(max_attempts, defaults.def_max_attempts), + COALESCE(base_delay, defaults.def_base_delay), + COALESCE(timeout, defaults.def_timeout) + FROM defaults ON CONFLICT (flow_slug) DO UPDATE SET flow_slug = pgflow.flows.flow_slug -- Dummy update RETURNING * diff --git a/pkgs/core/schemas/0100_function_create_flow_from_shape.sql b/pkgs/core/schemas/0100_function_create_flow_from_shape.sql index 4f71c14d9..daf2dc548 100644 --- a/pkgs/core/schemas/0100_function_create_flow_from_shape.sql +++ b/pkgs/core/schemas/0100_function_create_flow_from_shape.sql @@ -1,5 +1,6 @@ -- Compile a flow from a JSONB shape -- Creates the flow and all its steps using existing create_flow/add_step functions +-- Includes options from shape (NULL values = use default) create or replace function pgflow._create_flow_from_shape( p_flow_slug text, p_shape jsonb @@ -12,9 +13,19 @@ as $$ DECLARE v_step jsonb; v_deps text[]; + v_flow_options jsonb; + v_step_options jsonb; BEGIN - -- Create the flow with defaults - PERFORM pgflow.create_flow(p_flow_slug); + -- Extract flow-level options (may be null) + v_flow_options := p_shape->'options'; + + -- Create the flow with options (NULL = use default) + PERFORM pgflow.create_flow( + p_flow_slug, + (v_flow_options->>'maxAttempts')::int, + (v_flow_options->>'baseDelay')::int, + (v_flow_options->>'timeout')::int + ); -- Iterate over steps in order and add each one FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps') @@ -24,11 +35,18 @@ BEGIN INTO v_deps FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep; - -- Add the step + -- Extract step options (may be null) + v_step_options := v_step->'options'; + + -- Add the step with options (NULL = use default/inherit) PERFORM pgflow.add_step( flow_slug => p_flow_slug, step_slug => v_step->>'slug', deps_slugs => v_deps, + max_attempts => (v_step_options->>'maxAttempts')::int, + base_delay => (v_step_options->>'baseDelay')::int, + timeout => (v_step_options->>'timeout')::int, + start_delay => (v_step_options->>'startDelay')::int, step_type => v_step->>'stepType' ); END LOOP; diff --git a/pkgs/core/supabase/migrations/20251130164844_pgflow_temp_options_in_shape.sql b/pkgs/core/supabase/migrations/20251130164844_pgflow_temp_options_in_shape.sql new file mode 100644 index 000000000..03e86f4e3 --- /dev/null +++ b/pkgs/core/supabase/migrations/20251130164844_pgflow_temp_options_in_shape.sql @@ -0,0 +1,72 @@ +-- Modify "create_flow" function +CREATE OR REPLACE FUNCTION "pgflow"."create_flow" ("flow_slug" text, "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer) RETURNS "pgflow"."flows" LANGUAGE sql SET "search_path" = '' AS $$ +WITH + defaults AS ( + SELECT 3 AS def_max_attempts, 5 AS def_base_delay, 60 AS def_timeout + ), + flow_upsert AS ( + INSERT INTO pgflow.flows (flow_slug, opt_max_attempts, opt_base_delay, opt_timeout) + SELECT + flow_slug, + COALESCE(max_attempts, defaults.def_max_attempts), + COALESCE(base_delay, defaults.def_base_delay), + COALESCE(timeout, defaults.def_timeout) + FROM defaults + ON CONFLICT (flow_slug) DO UPDATE + SET flow_slug = pgflow.flows.flow_slug -- Dummy update + RETURNING * + ), + ensure_queue AS ( + SELECT pgmq.create(flow_slug) + WHERE NOT EXISTS ( + SELECT 1 FROM pgmq.list_queues() WHERE queue_name = flow_slug + ) + ) +SELECT f.* +FROM flow_upsert f +LEFT JOIN (SELECT 1 FROM ensure_queue) _dummy ON true; -- Left join ensures flow is returned +$$; +-- Modify "_create_flow_from_shape" function +CREATE OR REPLACE FUNCTION "pgflow"."_create_flow_from_shape" ("p_flow_slug" text, "p_shape" jsonb) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$ +DECLARE + v_step jsonb; + v_deps text[]; + v_flow_options jsonb; + v_step_options jsonb; +BEGIN + -- Extract flow-level options (may be null) + v_flow_options := p_shape->'options'; + + -- Create the flow with options (NULL = use default) + PERFORM pgflow.create_flow( + p_flow_slug, + (v_flow_options->>'maxAttempts')::int, + (v_flow_options->>'baseDelay')::int, + (v_flow_options->>'timeout')::int + ); + + -- Iterate over steps in order and add each one + FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps') + LOOP + -- Convert dependencies jsonb array to text array + SELECT COALESCE(array_agg(dep), '{}') + INTO v_deps + FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep; + + -- Extract step options (may be null) + v_step_options := v_step->'options'; + + -- Add the step with options (NULL = use default/inherit) + PERFORM pgflow.add_step( + flow_slug => p_flow_slug, + step_slug => v_step->>'slug', + deps_slugs => v_deps, + max_attempts => (v_step_options->>'maxAttempts')::int, + base_delay => (v_step_options->>'baseDelay')::int, + timeout => (v_step_options->>'timeout')::int, + start_delay => (v_step_options->>'startDelay')::int, + step_type => v_step->>'stepType' + ); + END LOOP; +END; +$$; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 802c5ed26..aa9bbe923 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:hsI4u+P4rwzRfYUEWgaWMGHQuhCCJcT12gVMHqHV830= +h1:sszJnuW0bvBbhzmEAekpZN/kSi7ga04pjSepmJlgoYY= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -14,3 +14,4 @@ h1:hsI4u+P4rwzRfYUEWgaWMGHQuhCCJcT12gVMHqHV830= 20251130011221_pgflow_temp_shape_utilities.sql h1:KzcP/xJjwfQ7BTbxdgaBzkfPztQcoUwuAmnZTBVqoIE= 20251130012043_pgflow_temp_compilation_utilities.sql h1:Qn7RxYkbFd36hJYhOsuJdrcSlo8itqhmdAQLfmrP9+Y= 20251130012803_pgflow_temp_ensure_flow_compiled.sql h1:RvuDNy53B03P5mzs9JUoVYMA725V6aCVoPSp59Gh9ko= +20251130164844_pgflow_temp_options_in_shape.sql h1:lbMDdu15QiBElTsvl7g0dI7flvyjngK9g68VDnCE0S0= diff --git a/pkgs/core/supabase/tests/create_flow/options.test.sql b/pkgs/core/supabase/tests/create_flow/options.test.sql index 41d02ff24..87ae4589f 100644 --- a/pkgs/core/supabase/tests/create_flow/options.test.sql +++ b/pkgs/core/supabase/tests/create_flow/options.test.sql @@ -1,5 +1,5 @@ begin; -select plan(5); +select plan(7); select pgflow_tests.reset_db(); -- SETUP: flow with all default values @@ -12,6 +12,24 @@ select results_eq( 'Should create flow with default opt_max_attempts, opt_base_delay and opt_timeout' ); +-- TEST: NULL parameters should use defaults (NULL = "use default" semantics) +select pgflow.create_flow('test_flow_null', null, null, null); + +select results_eq( + $$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'test_flow_null' $$, + $$ VALUES (3, 5, 60) $$, + 'NULL parameters should use defaults' +); + +-- TEST: Mixed NULL and explicit values +select pgflow.create_flow('test_flow_mixed', max_attempts => 10, base_delay => null, timeout => 120); + +select results_eq( + $$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'test_flow_mixed' $$, + $$ VALUES (10, 5, 120) $$, + 'NULL parameters should use defaults while explicit values are preserved' +); + -- SETUP: flow with overriden max_attempts select pgflow.create_flow('test_flow_2', max_attempts => 10); diff --git a/pkgs/core/supabase/tests/create_flow_from_shape/options_compile.test.sql b/pkgs/core/supabase/tests/create_flow_from_shape/options_compile.test.sql new file mode 100644 index 000000000..3e57bdd89 --- /dev/null +++ b/pkgs/core/supabase/tests/create_flow_from_shape/options_compile.test.sql @@ -0,0 +1,113 @@ +begin; +select plan(6); +select pgflow_tests.reset_db(); + +-- Test: Compile a flow with flow-level options from shape +select pgflow._create_flow_from_shape( + 'flow_with_options', + '{ + "steps": [ + {"slug": "step1", "stepType": "single", "dependencies": []} + ], + "options": { + "maxAttempts": 5, + "baseDelay": 10, + "timeout": 120 + } + }'::jsonb +); + +-- Verify flow options were applied +select results_eq( + $$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'flow_with_options' $$, + $$ VALUES (5, 10, 120) $$, + 'Flow should have options from shape' +); + +-- Test: Compile a flow with step-level options from shape +select pgflow._create_flow_from_shape( + 'flow_with_step_options', + '{ + "steps": [ + { + "slug": "step1", + "stepType": "single", + "dependencies": [], + "options": { + "maxAttempts": 7, + "baseDelay": 15, + "timeout": 90, + "startDelay": 1000 + } + } + ] + }'::jsonb +); + +-- Verify step options were applied +select results_eq( + $$ SELECT opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay FROM pgflow.steps WHERE flow_slug = 'flow_with_step_options' $$, + $$ VALUES (7, 15, 90, 1000) $$, + 'Step should have options from shape' +); + +-- Test: Compile a flow with no options (defaults should be used) +select pgflow._create_flow_from_shape( + 'flow_no_options', + '{ + "steps": [ + {"slug": "step1", "stepType": "single", "dependencies": []} + ] + }'::jsonb +); + +-- Verify flow uses default options +select results_eq( + $$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'flow_no_options' $$, + $$ VALUES (3, 5, 60) $$, + 'Flow without options in shape should use defaults' +); + +-- Verify step uses NULL options (inherits from flow) +select results_eq( + $$ SELECT opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay FROM pgflow.steps WHERE flow_slug = 'flow_no_options' $$, + $$ VALUES (NULL::int, NULL::int, NULL::int, NULL::int) $$, + 'Step without options in shape should have NULL (inherit from flow)' +); + +-- Test: Compile with partial options (missing options should be NULL/default) +select pgflow._create_flow_from_shape( + 'flow_partial_options', + '{ + "steps": [ + { + "slug": "step1", + "stepType": "single", + "dependencies": [], + "options": { + "timeout": 30 + } + } + ], + "options": { + "maxAttempts": 10 + } + }'::jsonb +); + +-- Verify partial flow options +select results_eq( + $$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'flow_partial_options' $$, + $$ VALUES (10, 5, 60) $$, + 'Flow with partial options should use defaults for missing options' +); + +-- Verify partial step options +select results_eq( + $$ SELECT opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay FROM pgflow.steps WHERE flow_slug = 'flow_partial_options' $$, + $$ VALUES (NULL::int, NULL::int, 30, NULL::int) $$, + 'Step with partial options should have NULL for missing options' +); + +select finish(); +rollback; diff --git a/pkgs/dsl/__tests__/runtime/flow-shape.test.ts b/pkgs/dsl/__tests__/runtime/flow-shape.test.ts index 66e00177b..dc5a1ac4b 100644 --- a/pkgs/dsl/__tests__/runtime/flow-shape.test.ts +++ b/pkgs/dsl/__tests__/runtime/flow-shape.test.ts @@ -17,8 +17,8 @@ describe('extractFlowShape', () => { }); }); - it('should NOT include flow runtime options in shape', () => { - // Options are intentionally excluded - they can be tuned at runtime + it('should include flow runtime options in shape', () => { + // Options are included for flow creation, but not compared const flow = new Flow({ slug: 'test_flow', maxAttempts: 5, @@ -27,10 +27,23 @@ describe('extractFlowShape', () => { }); const shape = extractFlowShape(flow); - // Shape should only have structural info, no options + // Shape should include options for creation expect(shape).toEqual({ steps: [], + options: { + maxAttempts: 5, + baseDelay: 10, + timeout: 120, + }, }); + }); + + it('should omit flow options key when no options defined', () => { + const flow = new Flow({ slug: 'test_flow' }); + const shape = extractFlowShape(flow); + + // No options = no options key in shape + expect(shape).toEqual({ steps: [] }); expect('options' in shape).toBe(false); }); }); @@ -76,8 +89,8 @@ describe('extractFlowShape', () => { expect(shape.steps[3].dependencies).toEqual(['apple', 'mango', 'zebra']); }); - it('should NOT include step runtime options in shape', () => { - // Options are intentionally excluded - they can be tuned at runtime + it('should include step runtime options in shape', () => { + // Options are included for step creation, but not compared const flow = new Flow({ slug: 'test_flow' }).step( { slug: 'step1', @@ -90,7 +103,28 @@ describe('extractFlowShape', () => { ); const shape = extractFlowShape(flow); - // Step shape should only have structural info, no options + // Step shape should include options for creation + expect(shape.steps[0]).toEqual({ + slug: 'step1', + stepType: 'single', + dependencies: [], + options: { + maxAttempts: 3, + baseDelay: 5, + timeout: 30, + startDelay: 100, + }, + }); + }); + + it('should omit step options key when no options defined', () => { + const flow = new Flow({ slug: 'test_flow' }).step( + { slug: 'step1' }, + ({ run }) => run + ); + const shape = extractFlowShape(flow); + + // No options = no options key in step shape expect(shape.steps[0]).toEqual({ slug: 'step1', stepType: 'single', @@ -98,6 +132,19 @@ describe('extractFlowShape', () => { }); expect('options' in shape.steps[0]).toBe(false); }); + + it('should only include defined options (filter undefined)', () => { + // When only some options are set, only those should appear + const flow = new Flow({ slug: 'test_flow', maxAttempts: 5 }).step( + { slug: 'step1', timeout: 30 }, + ({ run }) => run + ); + const shape = extractFlowShape(flow); + + // Only defined options should be included + expect(shape.options).toEqual({ maxAttempts: 5 }); + expect(shape.steps[0].options).toEqual({ timeout: 30 }); + }); }); describe('map step extraction', () => { @@ -133,7 +180,7 @@ describe('extractFlowShape', () => { }); describe('complex flow extraction', () => { - it('should extract a complex flow structure (ignoring options)', () => { + it('should extract a complex flow structure with options', () => { const flow = new Flow<{ url: string }>({ slug: 'analyze_website', maxAttempts: 3, @@ -155,7 +202,7 @@ describe('extractFlowShape', () => { const shape = extractFlowShape(flow); - // Shape should only contain structural info, no options + // Shape should contain structural info AND options expect(shape).toEqual({ steps: [ { @@ -167,6 +214,10 @@ describe('extractFlowShape', () => { slug: 'sentiment', stepType: 'single', dependencies: ['website'], + options: { + maxAttempts: 5, + timeout: 30, + }, }, { slug: 'summary', @@ -179,6 +230,11 @@ describe('extractFlowShape', () => { dependencies: ['sentiment', 'summary'], // sorted alphabetically }, ], + options: { + maxAttempts: 3, + baseDelay: 5, + timeout: 10, + }, }); }); @@ -413,10 +469,10 @@ describe('compareFlowShapes', () => { }); }); - describe('options are NOT compared (runtime tunable)', () => { + describe('options are included in shape but NOT compared', () => { it('should match flows with same structure but different DSL options', () => { - // This is the key behavior: options don't affect shape matching - // Users can tune options at runtime via SQL without recompilation + // This is the key behavior: options are in shape for creation, + // but don't affect shape matching (runtime tunable via SQL) const flowA = new Flow({ slug: 'test_flow', maxAttempts: 3 }).step( { slug: 'step1', timeout: 60 }, ({ run }) => run @@ -430,6 +486,13 @@ describe('compareFlowShapes', () => { const shapeA = extractFlowShape(flowA); const shapeB = extractFlowShape(flowB); + // Verify options ARE included in shapes + expect(shapeA.options).toEqual({ maxAttempts: 3 }); + expect(shapeB.options).toEqual({ maxAttempts: 10 }); + expect(shapeA.steps[0].options).toEqual({ timeout: 60 }); + expect(shapeB.steps[0].options).toEqual({ timeout: 300, startDelay: 100 }); + + // But comparison ignores options - only structure matters const result = compareFlowShapes(shapeA, shapeB); expect(result.match).toBe(true); expect(result.differences).toEqual([]); diff --git a/pkgs/dsl/src/flow-shape.ts b/pkgs/dsl/src/flow-shape.ts index 55d3ac47b..d6ce295c7 100644 --- a/pkgs/dsl/src/flow-shape.ts +++ b/pkgs/dsl/src/flow-shape.ts @@ -4,31 +4,56 @@ import { AnyFlow } from './dsl.js'; // SHAPE TYPE DEFINITIONS // ======================== +/** + * Step-level options that can be included in the shape for creation, + * but are NOT compared during shape comparison (runtime tunable). + */ +export interface StepShapeOptions { + maxAttempts?: number; + baseDelay?: number; + timeout?: number; + startDelay?: number; +} + +/** + * Flow-level options that can be included in the shape for creation, + * but are NOT compared during shape comparison (runtime tunable). + */ +export interface FlowShapeOptions { + maxAttempts?: number; + baseDelay?: number; + timeout?: number; +} + /** * StepShape captures the structural definition of a step for drift detection. * - * NOTE: Runtime options (maxAttempts, baseDelay, timeout, startDelay) are - * intentionally excluded. These can be tuned at runtime via SQL without + * The `options` field is included for flow creation but NOT compared during + * shape comparison. Options can be tuned at runtime via SQL without * requiring recompilation. See: /deploy/tune-flow-config/ */ export interface StepShape { slug: string; stepType: 'single' | 'map'; dependencies: string[]; // sorted alphabetically for deterministic comparison + options?: StepShapeOptions; } /** * FlowShape captures the structural definition of a flow for drift detection. * * This represents the DAG topology - which steps exist, their types, and how - * they connect via dependencies. Runtime configuration options are intentionally - * excluded as they can be tuned in production without recompilation. + * they connect via dependencies. + * + * The `options` field is included for flow creation but NOT compared during + * shape comparison. Options can be tuned at runtime via SQL without recompilation. * * Note: flowSlug is intentionally excluded - it's an identifier, not structural * data. The slug comes from context (URL, registry lookup, function parameter). */ export interface FlowShape { steps: StepShape[]; + options?: FlowShapeOptions; } /** @@ -43,29 +68,76 @@ export interface ShapeComparisonResult { // SHAPE EXTRACTION // ======================== +/** + * Checks if an options object has any defined (non-undefined) values. + */ +function hasDefinedOptions(options: Record): boolean { + return Object.values(options).some((v) => v !== undefined); +} + +/** + * Filters out undefined values from an options object. + * Returns only the keys with defined values. + */ +function filterDefinedOptions>( + options: T +): Partial { + return Object.fromEntries( + Object.entries(options).filter(([_, v]) => v !== undefined) + ) as Partial; +} + /** * Extracts a FlowShape from a Flow object. - * The shape captures structural information needed for drift detection. + * The shape captures structural information needed for drift detection, + * plus options for flow creation. * - * NOTE: Runtime options are intentionally not included in the shape. - * They can be tuned at runtime via SQL without triggering recompilation. + * Options are included in the shape for proper flow/step creation, but + * are NOT compared during shape comparison (they're runtime tunable). * * @param flow - The Flow object to extract shape from - * @returns A FlowShape representing the flow's structure + * @returns A FlowShape representing the flow's structure and options */ export function extractFlowShape(flow: AnyFlow): FlowShape { const steps: StepShape[] = flow.stepOrder.map((stepSlug) => { const stepDef = flow.getStepDefinition(stepSlug); - return { + const stepShape: StepShape = { slug: stepSlug, stepType: stepDef.stepType ?? 'single', // Sort dependencies alphabetically for deterministic comparison dependencies: [...stepDef.dependencies].sort(), }; + + // Only include options if at least one is defined + const stepOptions = { + maxAttempts: stepDef.options.maxAttempts, + baseDelay: stepDef.options.baseDelay, + timeout: stepDef.options.timeout, + startDelay: stepDef.options.startDelay, + }; + + if (hasDefinedOptions(stepOptions)) { + stepShape.options = filterDefinedOptions(stepOptions); + } + + return stepShape; }); - return { steps }; + const shape: FlowShape = { steps }; + + // Only include flow options if at least one is defined + const flowOptions = { + maxAttempts: flow.options.maxAttempts, + baseDelay: flow.options.baseDelay, + timeout: flow.options.timeout, + }; + + if (hasDefinedOptions(flowOptions)) { + shape.options = filterDefinedOptions(flowOptions); + } + + return shape; } // ========================