Skip to content

Commit 6e8539f

Browse files
committed
feat(core): add ensure_flow_compiled function
1 parent 09c6368 commit 6e8539f

File tree

7 files changed

+270
-1
lines changed

7 files changed

+270
-1
lines changed
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
-- Ensure a flow is compiled in the database
2+
-- Handles both development (auto-recompile) and production (fail on mismatch) modes
3+
-- Returns: { status: 'compiled' | 'verified' | 'recompiled' | 'mismatch', differences: text[] }
4+
create or replace function pgflow.ensure_flow_compiled(
5+
p_flow_slug text,
6+
p_shape jsonb,
7+
p_mode text default 'production' -- 'development' | 'production'
8+
)
9+
returns jsonb
10+
language plpgsql
11+
volatile
12+
set search_path to ''
13+
as $$
14+
DECLARE
15+
v_flow_exists boolean;
16+
v_db_shape jsonb;
17+
v_differences text[];
18+
BEGIN
19+
-- 1. Check if flow exists
20+
SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug)
21+
INTO v_flow_exists;
22+
23+
-- 2. If flow missing: compile (both modes)
24+
IF NOT v_flow_exists THEN
25+
PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape);
26+
RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb);
27+
END IF;
28+
29+
-- 3. Get current shape from DB
30+
v_db_shape := pgflow._get_flow_shape(p_flow_slug);
31+
32+
-- 4. Compare shapes
33+
v_differences := pgflow._compare_flow_shapes(p_shape, v_db_shape);
34+
35+
-- 5. If shapes match: return verified
36+
IF array_length(v_differences, 1) IS NULL THEN
37+
RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb);
38+
END IF;
39+
40+
-- 6. Shapes differ - handle by mode
41+
IF p_mode = 'development' THEN
42+
-- Recompile in dev mode: full deletion + fresh compile
43+
PERFORM pgflow.delete_flow_and_data(p_flow_slug);
44+
PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape);
45+
RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences));
46+
ELSE
47+
-- Fail in production mode
48+
RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences));
49+
END IF;
50+
END;
51+
$$;
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
-- Create "ensure_flow_compiled" function
2+
CREATE FUNCTION "pgflow"."ensure_flow_compiled" ("p_flow_slug" text, "p_shape" jsonb, "p_mode" text DEFAULT 'production') RETURNS jsonb LANGUAGE plpgsql SET "search_path" = '' AS $$
3+
DECLARE
4+
v_flow_exists boolean;
5+
v_db_shape jsonb;
6+
v_differences text[];
7+
BEGIN
8+
-- 1. Check if flow exists
9+
SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug)
10+
INTO v_flow_exists;
11+
12+
-- 2. If flow missing: compile (both modes)
13+
IF NOT v_flow_exists THEN
14+
PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape);
15+
RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb);
16+
END IF;
17+
18+
-- 3. Get current shape from DB
19+
v_db_shape := pgflow._get_flow_shape(p_flow_slug);
20+
21+
-- 4. Compare shapes
22+
v_differences := pgflow._compare_flow_shapes(p_shape, v_db_shape);
23+
24+
-- 5. If shapes match: return verified
25+
IF array_length(v_differences, 1) IS NULL THEN
26+
RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb);
27+
END IF;
28+
29+
-- 6. Shapes differ - handle by mode
30+
IF p_mode = 'development' THEN
31+
-- Recompile in dev mode: full deletion + fresh compile
32+
PERFORM pgflow.delete_flow_and_data(p_flow_slug);
33+
PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape);
34+
RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences));
35+
ELSE
36+
-- Fail in production mode
37+
RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences));
38+
END IF;
39+
END;
40+
$$;

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:JnAOhrfq6ppZAuSX1Q14eD0gt5obztnyVM2MToozmvA=
1+
h1:hsI4u+P4rwzRfYUEWgaWMGHQuhCCJcT12gVMHqHV830=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -13,3 +13,4 @@ h1:JnAOhrfq6ppZAuSX1Q14eD0gt5obztnyVM2MToozmvA=
1313
20251104080523_pgflow_upgrade_pgmq_1_5_1.sql h1:Fw7zpMWnjhAHQ0qBJAprAvGl7dJMd8ExNHg8aKvkzTg=
1414
20251130011221_pgflow_temp_shape_utilities.sql h1:KzcP/xJjwfQ7BTbxdgaBzkfPztQcoUwuAmnZTBVqoIE=
1515
20251130012043_pgflow_temp_compilation_utilities.sql h1:Qn7RxYkbFd36hJYhOsuJdrcSlo8itqhmdAQLfmrP9+Y=
16+
20251130012803_pgflow_temp_ensure_flow_compiled.sql h1:RvuDNy53B03P5mzs9JUoVYMA725V6aCVoPSp59Gh9ko=
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
begin;
2+
select plan(3);
3+
select pgflow_tests.reset_db();
4+
5+
-- Test: Missing flow should be compiled (default production mode)
6+
select is(
7+
(
8+
select result->>'status'
9+
from pgflow.ensure_flow_compiled(
10+
'new_flow',
11+
'{
12+
"steps": [
13+
{"slug": "first", "stepType": "single", "dependencies": []}
14+
]
15+
}'::jsonb
16+
) as result
17+
),
18+
'compiled',
19+
'Should return compiled status for missing flow'
20+
);
21+
22+
-- Verify flow was actually created
23+
select is(
24+
(select count(*)::int from pgflow.flows where flow_slug = 'new_flow'),
25+
1,
26+
'Flow should be created in database'
27+
);
28+
29+
select is(
30+
(select count(*)::int from pgflow.steps where flow_slug = 'new_flow'),
31+
1,
32+
'Step should be created in database'
33+
);
34+
35+
select finish();
36+
rollback;
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
begin;
2+
select plan(3);
3+
select pgflow_tests.reset_db();
4+
5+
-- Setup: Create flow with different shape
6+
select pgflow.create_flow('prod_flow');
7+
select pgflow.add_step('prod_flow', 'old_step');
8+
9+
-- Test: Different shape in production mode should return mismatch
10+
select is(
11+
(
12+
select result->>'status'
13+
from pgflow.ensure_flow_compiled(
14+
'prod_flow',
15+
'{
16+
"steps": [
17+
{"slug": "new_step", "stepType": "single", "dependencies": []}
18+
]
19+
}'::jsonb,
20+
'production'
21+
) as result
22+
),
23+
'mismatch',
24+
'Should return mismatch status in production mode'
25+
);
26+
27+
-- Verify differences are returned
28+
select ok(
29+
(
30+
select jsonb_array_length(result->'differences') > 0
31+
from pgflow.ensure_flow_compiled(
32+
'prod_flow',
33+
'{
34+
"steps": [
35+
{"slug": "new_step", "stepType": "single", "dependencies": []}
36+
]
37+
}'::jsonb,
38+
'production'
39+
) as result
40+
),
41+
'Should return differences for production mismatch'
42+
);
43+
44+
-- Verify database was NOT modified
45+
select is(
46+
(select step_slug from pgflow.steps where flow_slug = 'prod_flow'),
47+
'old_step',
48+
'Database should not be modified on production mismatch'
49+
);
50+
51+
select finish();
52+
rollback;
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
begin;
2+
select plan(3);
3+
select pgflow_tests.reset_db();
4+
5+
-- Setup: Create flow with different shape
6+
select pgflow.create_flow('dev_flow');
7+
select pgflow.add_step('dev_flow', 'old_step');
8+
9+
-- Test: Different shape in development mode should recompile
10+
select is(
11+
(
12+
select result->>'status'
13+
from pgflow.ensure_flow_compiled(
14+
'dev_flow',
15+
'{
16+
"steps": [
17+
{"slug": "new_step", "stepType": "single", "dependencies": []}
18+
]
19+
}'::jsonb,
20+
'development'
21+
) as result
22+
),
23+
'recompiled',
24+
'Should return recompiled status in development mode'
25+
);
26+
27+
-- Verify old step is gone
28+
select is(
29+
(select count(*)::int from pgflow.steps where flow_slug = 'dev_flow' and step_slug = 'old_step'),
30+
0,
31+
'Old step should be deleted'
32+
);
33+
34+
-- Verify new step exists
35+
select is(
36+
(select count(*)::int from pgflow.steps where flow_slug = 'dev_flow' and step_slug = 'new_step'),
37+
1,
38+
'New step should be created'
39+
);
40+
41+
select finish();
42+
rollback;
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
begin;
2+
select plan(2);
3+
select pgflow_tests.reset_db();
4+
5+
-- Setup: Create flow first
6+
select pgflow.create_flow('existing_flow');
7+
select pgflow.add_step('existing_flow', 'first');
8+
select pgflow.add_step('existing_flow', 'second', array['first']);
9+
10+
-- Test: Matching shape should return verified
11+
select is(
12+
(
13+
select result->>'status'
14+
from pgflow.ensure_flow_compiled(
15+
'existing_flow',
16+
'{
17+
"steps": [
18+
{"slug": "first", "stepType": "single", "dependencies": []},
19+
{"slug": "second", "stepType": "single", "dependencies": ["first"]}
20+
]
21+
}'::jsonb
22+
) as result
23+
),
24+
'verified',
25+
'Should return verified status for matching shape'
26+
);
27+
28+
-- Verify differences array is empty
29+
select is(
30+
(
31+
select jsonb_array_length(result->'differences')
32+
from pgflow.ensure_flow_compiled(
33+
'existing_flow',
34+
'{
35+
"steps": [
36+
{"slug": "first", "stepType": "single", "dependencies": []},
37+
{"slug": "second", "stepType": "single", "dependencies": ["first"]}
38+
]
39+
}'::jsonb
40+
) as result
41+
),
42+
0,
43+
'Differences should be empty for matching shape'
44+
);
45+
46+
select finish();
47+
rollback;

0 commit comments

Comments
 (0)