Skip to content

Commit 2e949b1

Browse files
committed
include options in FlowShape for proper flow creation
1 parent 6ed062f commit 2e949b1

File tree

9 files changed

+436
-30
lines changed

9 files changed

+436
-30
lines changed

PLAN.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,44 @@ async function handleEnsureCompiled(
349349

350350
---
351351

352+
## Phase 3.5: Include Options in FlowShape
353+
354+
Options (maxAttempts, baseDelay, timeout, startDelay) must be included in FlowShape for proper flow creation, while remaining excluded from shape comparison (options can be tuned at runtime without recompilation).
355+
356+
### Problem
357+
358+
`_create_flow_from_shape()` was using defaults instead of DSL-defined options, causing drift between:
359+
- CLI path: `compileFlow()` -> SQL with options
360+
- Runtime path: `extractFlowShape()` -> `_create_flow_from_shape()` -> defaults only
361+
362+
### Solution
363+
364+
1. **FlowShape includes options** - Added optional `options` field to FlowShape/StepShape
365+
2. **NULL = use default** - Modified `create_flow()` to use COALESCE internally
366+
3. **Pass options through** - Updated `_create_flow_from_shape()` to pass options from shape
367+
368+
### Key Files
369+
370+
| File | Change |
371+
|------|--------|
372+
| `pkgs/dsl/src/flow-shape.ts` | Add options to interfaces, update extractFlowShape() |
373+
| `pkgs/core/schemas/0100_function_create_flow.sql` | NULL params = use default via COALESCE |
374+
| `pkgs/core/schemas/0100_function_create_flow_from_shape.sql` | Pass options from shape |
375+
376+
### Design Decision: NULL = Use Default
377+
378+
SQL functions now treat NULL parameters as "use default" via COALESCE:
379+
380+
```sql
381+
-- In create_flow():
382+
INSERT INTO pgflow.flows (..., opt_max_attempts, ...)
383+
VALUES (..., COALESCE(max_attempts, 3), ...);
384+
```
385+
386+
This prevents drift - defaults are defined in ONE place (inside the function), not hardcoded by callers.
387+
388+
---
389+
352390
## Phase 4: Worker Configuration (Edge-Worker Package)
353391

354392
### Modify: `pkgs/edge-worker/src/core/workerConfigTypes.ts`

pkgs/core/schemas/0100_function_create_flow.sql

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,29 @@
1+
-- Create a new flow with optional configuration.
2+
-- NULL parameters use defaults defined in the 'defaults' CTE below.
3+
-- This allows callers to pass NULL to explicitly use the default value.
14
create or replace function pgflow.create_flow(
25
flow_slug text,
3-
max_attempts int default 3,
4-
base_delay int default 5,
5-
timeout int default 60
6+
max_attempts int default null,
7+
base_delay int default null,
8+
timeout int default null
69
)
710
returns pgflow.flows
811
language sql
912
set search_path to ''
1013
volatile
1114
as $$
1215
WITH
16+
defaults AS (
17+
SELECT 3 AS def_max_attempts, 5 AS def_base_delay, 60 AS def_timeout
18+
),
1319
flow_upsert AS (
1420
INSERT INTO pgflow.flows (flow_slug, opt_max_attempts, opt_base_delay, opt_timeout)
15-
VALUES (flow_slug, max_attempts, base_delay, timeout)
21+
SELECT
22+
flow_slug,
23+
COALESCE(max_attempts, defaults.def_max_attempts),
24+
COALESCE(base_delay, defaults.def_base_delay),
25+
COALESCE(timeout, defaults.def_timeout)
26+
FROM defaults
1627
ON CONFLICT (flow_slug) DO UPDATE
1728
SET flow_slug = pgflow.flows.flow_slug -- Dummy update
1829
RETURNING *

pkgs/core/schemas/0100_function_create_flow_from_shape.sql

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
-- Compile a flow from a JSONB shape
22
-- Creates the flow and all its steps using existing create_flow/add_step functions
3+
-- Includes options from shape (NULL values = use default)
34
create or replace function pgflow._create_flow_from_shape(
45
p_flow_slug text,
56
p_shape jsonb
@@ -12,9 +13,19 @@ as $$
1213
DECLARE
1314
v_step jsonb;
1415
v_deps text[];
16+
v_flow_options jsonb;
17+
v_step_options jsonb;
1518
BEGIN
16-
-- Create the flow with defaults
17-
PERFORM pgflow.create_flow(p_flow_slug);
19+
-- Extract flow-level options (may be null)
20+
v_flow_options := p_shape->'options';
21+
22+
-- Create the flow with options (NULL = use default)
23+
PERFORM pgflow.create_flow(
24+
p_flow_slug,
25+
(v_flow_options->>'maxAttempts')::int,
26+
(v_flow_options->>'baseDelay')::int,
27+
(v_flow_options->>'timeout')::int
28+
);
1829

1930
-- Iterate over steps in order and add each one
2031
FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps')
@@ -24,11 +35,18 @@ BEGIN
2435
INTO v_deps
2536
FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep;
2637

27-
-- Add the step
38+
-- Extract step options (may be null)
39+
v_step_options := v_step->'options';
40+
41+
-- Add the step with options (NULL = use default/inherit)
2842
PERFORM pgflow.add_step(
2943
flow_slug => p_flow_slug,
3044
step_slug => v_step->>'slug',
3145
deps_slugs => v_deps,
46+
max_attempts => (v_step_options->>'maxAttempts')::int,
47+
base_delay => (v_step_options->>'baseDelay')::int,
48+
timeout => (v_step_options->>'timeout')::int,
49+
start_delay => (v_step_options->>'startDelay')::int,
3250
step_type => v_step->>'stepType'
3351
);
3452
END LOOP;
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
-- Modify "create_flow" function
2+
CREATE OR REPLACE FUNCTION "pgflow"."create_flow" ("flow_slug" text, "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer) RETURNS "pgflow"."flows" LANGUAGE sql SET "search_path" = '' AS $$
3+
WITH
4+
defaults AS (
5+
SELECT 3 AS def_max_attempts, 5 AS def_base_delay, 60 AS def_timeout
6+
),
7+
flow_upsert AS (
8+
INSERT INTO pgflow.flows (flow_slug, opt_max_attempts, opt_base_delay, opt_timeout)
9+
SELECT
10+
flow_slug,
11+
COALESCE(max_attempts, defaults.def_max_attempts),
12+
COALESCE(base_delay, defaults.def_base_delay),
13+
COALESCE(timeout, defaults.def_timeout)
14+
FROM defaults
15+
ON CONFLICT (flow_slug) DO UPDATE
16+
SET flow_slug = pgflow.flows.flow_slug -- Dummy update
17+
RETURNING *
18+
),
19+
ensure_queue AS (
20+
SELECT pgmq.create(flow_slug)
21+
WHERE NOT EXISTS (
22+
SELECT 1 FROM pgmq.list_queues() WHERE queue_name = flow_slug
23+
)
24+
)
25+
SELECT f.*
26+
FROM flow_upsert f
27+
LEFT JOIN (SELECT 1 FROM ensure_queue) _dummy ON true; -- Left join ensures flow is returned
28+
$$;
29+
-- Modify "_create_flow_from_shape" function
30+
CREATE OR REPLACE FUNCTION "pgflow"."_create_flow_from_shape" ("p_flow_slug" text, "p_shape" jsonb) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
31+
DECLARE
32+
v_step jsonb;
33+
v_deps text[];
34+
v_flow_options jsonb;
35+
v_step_options jsonb;
36+
BEGIN
37+
-- Extract flow-level options (may be null)
38+
v_flow_options := p_shape->'options';
39+
40+
-- Create the flow with options (NULL = use default)
41+
PERFORM pgflow.create_flow(
42+
p_flow_slug,
43+
(v_flow_options->>'maxAttempts')::int,
44+
(v_flow_options->>'baseDelay')::int,
45+
(v_flow_options->>'timeout')::int
46+
);
47+
48+
-- Iterate over steps in order and add each one
49+
FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps')
50+
LOOP
51+
-- Convert dependencies jsonb array to text array
52+
SELECT COALESCE(array_agg(dep), '{}')
53+
INTO v_deps
54+
FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep;
55+
56+
-- Extract step options (may be null)
57+
v_step_options := v_step->'options';
58+
59+
-- Add the step with options (NULL = use default/inherit)
60+
PERFORM pgflow.add_step(
61+
flow_slug => p_flow_slug,
62+
step_slug => v_step->>'slug',
63+
deps_slugs => v_deps,
64+
max_attempts => (v_step_options->>'maxAttempts')::int,
65+
base_delay => (v_step_options->>'baseDelay')::int,
66+
timeout => (v_step_options->>'timeout')::int,
67+
start_delay => (v_step_options->>'startDelay')::int,
68+
step_type => v_step->>'stepType'
69+
);
70+
END LOOP;
71+
END;
72+
$$;

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:hsI4u+P4rwzRfYUEWgaWMGHQuhCCJcT12gVMHqHV830=
1+
h1:sszJnuW0bvBbhzmEAekpZN/kSi7ga04pjSepmJlgoYY=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -14,3 +14,4 @@ h1:hsI4u+P4rwzRfYUEWgaWMGHQuhCCJcT12gVMHqHV830=
1414
20251130011221_pgflow_temp_shape_utilities.sql h1:KzcP/xJjwfQ7BTbxdgaBzkfPztQcoUwuAmnZTBVqoIE=
1515
20251130012043_pgflow_temp_compilation_utilities.sql h1:Qn7RxYkbFd36hJYhOsuJdrcSlo8itqhmdAQLfmrP9+Y=
1616
20251130012803_pgflow_temp_ensure_flow_compiled.sql h1:RvuDNy53B03P5mzs9JUoVYMA725V6aCVoPSp59Gh9ko=
17+
20251130164844_pgflow_temp_options_in_shape.sql h1:lbMDdu15QiBElTsvl7g0dI7flvyjngK9g68VDnCE0S0=

pkgs/core/supabase/tests/create_flow/options.test.sql

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
begin;
2-
select plan(5);
2+
select plan(7);
33
select pgflow_tests.reset_db();
44

55
-- SETUP: flow with all default values
@@ -12,6 +12,24 @@ select results_eq(
1212
'Should create flow with default opt_max_attempts, opt_base_delay and opt_timeout'
1313
);
1414

15+
-- TEST: NULL parameters should use defaults (NULL = "use default" semantics)
16+
select pgflow.create_flow('test_flow_null', null, null, null);
17+
18+
select results_eq(
19+
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'test_flow_null' $$,
20+
$$ VALUES (3, 5, 60) $$,
21+
'NULL parameters should use defaults'
22+
);
23+
24+
-- TEST: Mixed NULL and explicit values
25+
select pgflow.create_flow('test_flow_mixed', max_attempts => 10, base_delay => null, timeout => 120);
26+
27+
select results_eq(
28+
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'test_flow_mixed' $$,
29+
$$ VALUES (10, 5, 120) $$,
30+
'NULL parameters should use defaults while explicit values are preserved'
31+
);
32+
1533
-- SETUP: flow with overriden max_attempts
1634
select pgflow.create_flow('test_flow_2', max_attempts => 10);
1735

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
begin;
2+
select plan(6);
3+
select pgflow_tests.reset_db();
4+
5+
-- Test: Compile a flow with flow-level options from shape
6+
select pgflow._create_flow_from_shape(
7+
'flow_with_options',
8+
'{
9+
"steps": [
10+
{"slug": "step1", "stepType": "single", "dependencies": []}
11+
],
12+
"options": {
13+
"maxAttempts": 5,
14+
"baseDelay": 10,
15+
"timeout": 120
16+
}
17+
}'::jsonb
18+
);
19+
20+
-- Verify flow options were applied
21+
select results_eq(
22+
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'flow_with_options' $$,
23+
$$ VALUES (5, 10, 120) $$,
24+
'Flow should have options from shape'
25+
);
26+
27+
-- Test: Compile a flow with step-level options from shape
28+
select pgflow._create_flow_from_shape(
29+
'flow_with_step_options',
30+
'{
31+
"steps": [
32+
{
33+
"slug": "step1",
34+
"stepType": "single",
35+
"dependencies": [],
36+
"options": {
37+
"maxAttempts": 7,
38+
"baseDelay": 15,
39+
"timeout": 90,
40+
"startDelay": 1000
41+
}
42+
}
43+
]
44+
}'::jsonb
45+
);
46+
47+
-- Verify step options were applied
48+
select results_eq(
49+
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay FROM pgflow.steps WHERE flow_slug = 'flow_with_step_options' $$,
50+
$$ VALUES (7, 15, 90, 1000) $$,
51+
'Step should have options from shape'
52+
);
53+
54+
-- Test: Compile a flow with no options (defaults should be used)
55+
select pgflow._create_flow_from_shape(
56+
'flow_no_options',
57+
'{
58+
"steps": [
59+
{"slug": "step1", "stepType": "single", "dependencies": []}
60+
]
61+
}'::jsonb
62+
);
63+
64+
-- Verify flow uses default options
65+
select results_eq(
66+
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'flow_no_options' $$,
67+
$$ VALUES (3, 5, 60) $$,
68+
'Flow without options in shape should use defaults'
69+
);
70+
71+
-- Verify step uses NULL options (inherits from flow)
72+
select results_eq(
73+
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay FROM pgflow.steps WHERE flow_slug = 'flow_no_options' $$,
74+
$$ VALUES (NULL::int, NULL::int, NULL::int, NULL::int) $$,
75+
'Step without options in shape should have NULL (inherit from flow)'
76+
);
77+
78+
-- Test: Compile with partial options (missing options should be NULL/default)
79+
select pgflow._create_flow_from_shape(
80+
'flow_partial_options',
81+
'{
82+
"steps": [
83+
{
84+
"slug": "step1",
85+
"stepType": "single",
86+
"dependencies": [],
87+
"options": {
88+
"timeout": 30
89+
}
90+
}
91+
],
92+
"options": {
93+
"maxAttempts": 10
94+
}
95+
}'::jsonb
96+
);
97+
98+
-- Verify partial flow options
99+
select results_eq(
100+
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'flow_partial_options' $$,
101+
$$ VALUES (10, 5, 60) $$,
102+
'Flow with partial options should use defaults for missing options'
103+
);
104+
105+
-- Verify partial step options
106+
select results_eq(
107+
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay FROM pgflow.steps WHERE flow_slug = 'flow_partial_options' $$,
108+
$$ VALUES (NULL::int, NULL::int, 30, NULL::int) $$,
109+
'Step with partial options should have NULL for missing options'
110+
);
111+
112+
select finish();
113+
rollback;

0 commit comments

Comments
 (0)