Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions PLAN.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,44 @@ async function handleEnsureCompiled(

---

## Phase 3.5: Include Options in FlowShape

Options (maxAttempts, baseDelay, timeout, startDelay) must be included in FlowShape for proper flow creation, while remaining excluded from shape comparison (options can be tuned at runtime without recompilation).

### Problem

`_create_flow_from_shape()` was using defaults instead of DSL-defined options, causing drift between:
- CLI path: `compileFlow()` -> SQL with options
- Runtime path: `extractFlowShape()` -> `_create_flow_from_shape()` -> defaults only

### Solution

1. **FlowShape includes options** - Added optional `options` field to FlowShape/StepShape
2. **NULL = use default** - Modified `create_flow()` to use COALESCE internally
3. **Pass options through** - Updated `_create_flow_from_shape()` to pass options from shape

### Key Files

| File | Change |
|------|--------|
| `pkgs/dsl/src/flow-shape.ts` | Add options to interfaces, update extractFlowShape() |
| `pkgs/core/schemas/0100_function_create_flow.sql` | NULL params = use default via COALESCE |
| `pkgs/core/schemas/0100_function_create_flow_from_shape.sql` | Pass options from shape |

### Design Decision: NULL = Use Default

SQL functions now treat NULL parameters as "use default" via COALESCE:

```sql
-- In create_flow():
INSERT INTO pgflow.flows (..., opt_max_attempts, ...)
VALUES (..., COALESCE(max_attempts, 3), ...);
```

This prevents drift - defaults are defined in ONE place (inside the function), not hardcoded by callers.

---

## Phase 4: Worker Configuration (Edge-Worker Package)

### Modify: `pkgs/edge-worker/src/core/workerConfigTypes.ts`
Expand Down
19 changes: 15 additions & 4 deletions pkgs/core/schemas/0100_function_create_flow.sql
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
-- Create a new flow with optional configuration.
-- NULL parameters use defaults defined in the 'defaults' CTE below.
-- This allows callers to pass NULL to explicitly use the default value.
create or replace function pgflow.create_flow(
flow_slug text,
max_attempts int default 3,
base_delay int default 5,
timeout int default 60
max_attempts int default null,
base_delay int default null,
timeout int default null
)
returns pgflow.flows
language sql
set search_path to ''
volatile
as $$
WITH
defaults AS (
SELECT 3 AS def_max_attempts, 5 AS def_base_delay, 60 AS def_timeout
),
flow_upsert AS (
INSERT INTO pgflow.flows (flow_slug, opt_max_attempts, opt_base_delay, opt_timeout)
VALUES (flow_slug, max_attempts, base_delay, timeout)
SELECT
flow_slug,
COALESCE(max_attempts, defaults.def_max_attempts),
COALESCE(base_delay, defaults.def_base_delay),
COALESCE(timeout, defaults.def_timeout)
FROM defaults
ON CONFLICT (flow_slug) DO UPDATE
SET flow_slug = pgflow.flows.flow_slug -- Dummy update
RETURNING *
Expand Down
24 changes: 21 additions & 3 deletions pkgs/core/schemas/0100_function_create_flow_from_shape.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
-- Compile a flow from a JSONB shape
-- Creates the flow and all its steps using existing create_flow/add_step functions
-- Includes options from shape (NULL values = use default)
create or replace function pgflow._create_flow_from_shape(
p_flow_slug text,
p_shape jsonb
Expand All @@ -12,9 +13,19 @@ as $$
DECLARE
v_step jsonb;
v_deps text[];
v_flow_options jsonb;
v_step_options jsonb;
BEGIN
-- Create the flow with defaults
PERFORM pgflow.create_flow(p_flow_slug);
-- Extract flow-level options (may be null)
v_flow_options := p_shape->'options';

-- Create the flow with options (NULL = use default)
PERFORM pgflow.create_flow(
p_flow_slug,
(v_flow_options->>'maxAttempts')::int,
(v_flow_options->>'baseDelay')::int,
(v_flow_options->>'timeout')::int
);

-- Iterate over steps in order and add each one
FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps')
Expand All @@ -24,11 +35,18 @@ BEGIN
INTO v_deps
FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep;

-- Add the step
-- Extract step options (may be null)
v_step_options := v_step->'options';

-- Add the step with options (NULL = use default/inherit)
PERFORM pgflow.add_step(
flow_slug => p_flow_slug,
step_slug => v_step->>'slug',
deps_slugs => v_deps,
max_attempts => (v_step_options->>'maxAttempts')::int,
base_delay => (v_step_options->>'baseDelay')::int,
timeout => (v_step_options->>'timeout')::int,
start_delay => (v_step_options->>'startDelay')::int,
step_type => v_step->>'stepType'
);
END LOOP;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
-- Modify "create_flow" function
CREATE OR REPLACE FUNCTION "pgflow"."create_flow" ("flow_slug" text, "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer) RETURNS "pgflow"."flows" LANGUAGE sql SET "search_path" = '' AS $$
WITH
defaults AS (
SELECT 3 AS def_max_attempts, 5 AS def_base_delay, 60 AS def_timeout
),
flow_upsert AS (
INSERT INTO pgflow.flows (flow_slug, opt_max_attempts, opt_base_delay, opt_timeout)
SELECT
flow_slug,
COALESCE(max_attempts, defaults.def_max_attempts),
COALESCE(base_delay, defaults.def_base_delay),
COALESCE(timeout, defaults.def_timeout)
FROM defaults
ON CONFLICT (flow_slug) DO UPDATE
SET flow_slug = pgflow.flows.flow_slug -- Dummy update
RETURNING *
),
ensure_queue AS (
SELECT pgmq.create(flow_slug)
WHERE NOT EXISTS (
SELECT 1 FROM pgmq.list_queues() WHERE queue_name = flow_slug
)
)
SELECT f.*
FROM flow_upsert f
LEFT JOIN (SELECT 1 FROM ensure_queue) _dummy ON true; -- Left join ensures flow is returned
$$;
-- Modify "_create_flow_from_shape" function
CREATE OR REPLACE FUNCTION "pgflow"."_create_flow_from_shape" ("p_flow_slug" text, "p_shape" jsonb) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
v_step jsonb;
v_deps text[];
v_flow_options jsonb;
v_step_options jsonb;
BEGIN
-- Extract flow-level options (may be null)
v_flow_options := p_shape->'options';

-- Create the flow with options (NULL = use default)
PERFORM pgflow.create_flow(
p_flow_slug,
(v_flow_options->>'maxAttempts')::int,
(v_flow_options->>'baseDelay')::int,
(v_flow_options->>'timeout')::int
);

-- Iterate over steps in order and add each one
FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps')
LOOP
-- Convert dependencies jsonb array to text array
SELECT COALESCE(array_agg(dep), '{}')
INTO v_deps
FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep;

-- Extract step options (may be null)
v_step_options := v_step->'options';

-- Add the step with options (NULL = use default/inherit)
PERFORM pgflow.add_step(
flow_slug => p_flow_slug,
step_slug => v_step->>'slug',
deps_slugs => v_deps,
max_attempts => (v_step_options->>'maxAttempts')::int,
base_delay => (v_step_options->>'baseDelay')::int,
timeout => (v_step_options->>'timeout')::int,
start_delay => (v_step_options->>'startDelay')::int,
step_type => v_step->>'stepType'
);
END LOOP;
END;
$$;
3 changes: 2 additions & 1 deletion pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:hsI4u+P4rwzRfYUEWgaWMGHQuhCCJcT12gVMHqHV830=
h1:sszJnuW0bvBbhzmEAekpZN/kSi7ga04pjSepmJlgoYY=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -14,3 +14,4 @@ h1:hsI4u+P4rwzRfYUEWgaWMGHQuhCCJcT12gVMHqHV830=
20251130011221_pgflow_temp_shape_utilities.sql h1:KzcP/xJjwfQ7BTbxdgaBzkfPztQcoUwuAmnZTBVqoIE=
20251130012043_pgflow_temp_compilation_utilities.sql h1:Qn7RxYkbFd36hJYhOsuJdrcSlo8itqhmdAQLfmrP9+Y=
20251130012803_pgflow_temp_ensure_flow_compiled.sql h1:RvuDNy53B03P5mzs9JUoVYMA725V6aCVoPSp59Gh9ko=
20251130164844_pgflow_temp_options_in_shape.sql h1:lbMDdu15QiBElTsvl7g0dI7flvyjngK9g68VDnCE0S0=
20 changes: 19 additions & 1 deletion pkgs/core/supabase/tests/create_flow/options.test.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
begin;
select plan(5);
select plan(7);
select pgflow_tests.reset_db();

-- SETUP: flow with all default values
Expand All @@ -12,6 +12,24 @@ select results_eq(
'Should create flow with default opt_max_attempts, opt_base_delay and opt_timeout'
);

-- TEST: NULL parameters should use defaults (NULL = "use default" semantics)
select pgflow.create_flow('test_flow_null', null, null, null);

select results_eq(
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'test_flow_null' $$,
$$ VALUES (3, 5, 60) $$,
'NULL parameters should use defaults'
);

-- TEST: Mixed NULL and explicit values
select pgflow.create_flow('test_flow_mixed', max_attempts => 10, base_delay => null, timeout => 120);

select results_eq(
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'test_flow_mixed' $$,
$$ VALUES (10, 5, 120) $$,
'NULL parameters should use defaults while explicit values are preserved'
);

-- SETUP: flow with overriden max_attempts
select pgflow.create_flow('test_flow_2', max_attempts => 10);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
begin;
select plan(6);
select pgflow_tests.reset_db();

-- Test: Compile a flow with flow-level options from shape
select pgflow._create_flow_from_shape(
'flow_with_options',
'{
"steps": [
{"slug": "step1", "stepType": "single", "dependencies": []}
],
"options": {
"maxAttempts": 5,
"baseDelay": 10,
"timeout": 120
}
}'::jsonb
);

-- Verify flow options were applied
select results_eq(
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'flow_with_options' $$,
$$ VALUES (5, 10, 120) $$,
'Flow should have options from shape'
);

-- Test: Compile a flow with step-level options from shape
select pgflow._create_flow_from_shape(
'flow_with_step_options',
'{
"steps": [
{
"slug": "step1",
"stepType": "single",
"dependencies": [],
"options": {
"maxAttempts": 7,
"baseDelay": 15,
"timeout": 90,
"startDelay": 1000
}
}
]
}'::jsonb
);

-- Verify step options were applied
select results_eq(
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay FROM pgflow.steps WHERE flow_slug = 'flow_with_step_options' $$,
$$ VALUES (7, 15, 90, 1000) $$,
'Step should have options from shape'
);

-- Test: Compile a flow with no options (defaults should be used)
select pgflow._create_flow_from_shape(
'flow_no_options',
'{
"steps": [
{"slug": "step1", "stepType": "single", "dependencies": []}
]
}'::jsonb
);

-- Verify flow uses default options
select results_eq(
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'flow_no_options' $$,
$$ VALUES (3, 5, 60) $$,
'Flow without options in shape should use defaults'
);

-- Verify step uses NULL options (inherits from flow)
select results_eq(
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay FROM pgflow.steps WHERE flow_slug = 'flow_no_options' $$,
$$ VALUES (NULL::int, NULL::int, NULL::int, NULL::int) $$,
'Step without options in shape should have NULL (inherit from flow)'
);

-- Test: Compile with partial options (missing options should be NULL/default)
select pgflow._create_flow_from_shape(
'flow_partial_options',
'{
"steps": [
{
"slug": "step1",
"stepType": "single",
"dependencies": [],
"options": {
"timeout": 30
}
}
],
"options": {
"maxAttempts": 10
}
}'::jsonb
);

-- Verify partial flow options
select results_eq(
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'flow_partial_options' $$,
$$ VALUES (10, 5, 60) $$,
'Flow with partial options should use defaults for missing options'
);

-- Verify partial step options
select results_eq(
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay FROM pgflow.steps WHERE flow_slug = 'flow_partial_options' $$,
$$ VALUES (NULL::int, NULL::int, 30, NULL::int) $$,
'Step with partial options should have NULL for missing options'
);

select finish();
rollback;
Loading