Skip to content

Commit abd6e12

Browse files
committed
feat(core): add flow creation and deletion functions
1 parent b1eb033 commit abd6e12

File tree

10 files changed

+267
-3
lines changed

10 files changed

+267
-3
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
-- Compile a flow from a JSONB shape
2+
-- Creates the flow and all its steps using existing create_flow/add_step functions
3+
create or replace function pgflow._create_flow_from_shape(
4+
p_flow_slug text,
5+
p_shape jsonb
6+
)
7+
returns void
8+
language plpgsql
9+
volatile
10+
set search_path to ''
11+
as $$
12+
DECLARE
13+
v_step jsonb;
14+
v_deps text[];
15+
BEGIN
16+
-- Create the flow with defaults
17+
PERFORM pgflow.create_flow(p_flow_slug);
18+
19+
-- Iterate over steps in order and add each one
20+
FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps')
21+
LOOP
22+
-- Convert dependencies jsonb array to text array
23+
SELECT COALESCE(array_agg(dep), '{}')
24+
INTO v_deps
25+
FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep;
26+
27+
-- Add the step
28+
PERFORM pgflow.add_step(
29+
flow_slug => p_flow_slug,
30+
step_slug => v_step->>'slug',
31+
deps_slugs => v_deps,
32+
step_type => v_step->>'stepType'
33+
);
34+
END LOOP;
35+
END;
36+
$$;
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
-- Deletes a flow and all its associated data
2+
-- WARNING: This is destructive - deletes flow definition AND all runtime data
3+
-- Used by ensure_flow_compiled for development mode recompilation
4+
create or replace function pgflow.delete_flow_and_data(p_flow_slug text)
5+
returns void
6+
language plpgsql
7+
volatile
8+
set search_path to ''
9+
as $$
10+
BEGIN
11+
-- Drop queue and archive table (pgmq)
12+
PERFORM pgmq.drop_queue(p_flow_slug);
13+
14+
-- Delete all associated data in the correct order (respecting FK constraints)
15+
DELETE FROM pgflow.step_tasks AS task WHERE task.flow_slug = p_flow_slug;
16+
DELETE FROM pgflow.step_states AS state WHERE state.flow_slug = p_flow_slug;
17+
DELETE FROM pgflow.runs AS run WHERE run.flow_slug = p_flow_slug;
18+
DELETE FROM pgflow.deps AS dep WHERE dep.flow_slug = p_flow_slug;
19+
DELETE FROM pgflow.steps AS step WHERE step.flow_slug = p_flow_slug;
20+
DELETE FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug;
21+
END;
22+
$$;

pkgs/core/src/database-types.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,10 @@ export type Database = {
350350
Args: { p_db: Json; p_local: Json }
351351
Returns: string[]
352352
}
353+
_create_flow_from_shape: {
354+
Args: { p_flow_slug: string; p_shape: Json }
355+
Returns: undefined
356+
}
353357
_get_flow_shape: { Args: { p_flow_slug: string }; Returns: Json }
354358
add_step: {
355359
Args: {
@@ -440,6 +444,10 @@ export type Database = {
440444
isSetofReturn: false
441445
}
442446
}
447+
delete_flow_and_data: {
448+
Args: { p_flow_slug: string }
449+
Returns: undefined
450+
}
443451
fail_task: {
444452
Args: {
445453
error_message: string
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
-- Create "_create_flow_from_shape" function
2+
CREATE FUNCTION "pgflow"."_create_flow_from_shape" ("p_flow_slug" text, "p_shape" jsonb) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
3+
DECLARE
4+
v_step jsonb;
5+
v_deps text[];
6+
BEGIN
7+
-- Create the flow with defaults
8+
PERFORM pgflow.create_flow(p_flow_slug);
9+
10+
-- Iterate over steps in order and add each one
11+
FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps')
12+
LOOP
13+
-- Convert dependencies jsonb array to text array
14+
SELECT COALESCE(array_agg(dep), '{}')
15+
INTO v_deps
16+
FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep;
17+
18+
-- Add the step
19+
PERFORM pgflow.add_step(
20+
flow_slug => p_flow_slug,
21+
step_slug => v_step->>'slug',
22+
deps_slugs => v_deps,
23+
step_type => v_step->>'stepType'
24+
);
25+
END LOOP;
26+
END;
27+
$$;
28+
-- Create "delete_flow_and_data" function
29+
CREATE FUNCTION "pgflow"."delete_flow_and_data" ("p_flow_slug" text) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
30+
BEGIN
31+
-- Drop queue and archive table (pgmq)
32+
PERFORM pgmq.drop_queue(p_flow_slug);
33+
34+
-- Delete all associated data in the correct order (respecting FK constraints)
35+
DELETE FROM pgflow.step_tasks AS task WHERE task.flow_slug = p_flow_slug;
36+
DELETE FROM pgflow.step_states AS state WHERE state.flow_slug = p_flow_slug;
37+
DELETE FROM pgflow.runs AS run WHERE run.flow_slug = p_flow_slug;
38+
DELETE FROM pgflow.deps AS dep WHERE dep.flow_slug = p_flow_slug;
39+
DELETE FROM pgflow.steps AS step WHERE step.flow_slug = p_flow_slug;
40+
DELETE FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug;
41+
END;
42+
$$;

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:GJGcig/hHnOUiqhMaJfe/JiwaD1pl1vGFaG0SxeAmRc=
1+
h1:JnAOhrfq6ppZAuSX1Q14eD0gt5obztnyVM2MToozmvA=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -12,3 +12,4 @@ h1:GJGcig/hHnOUiqhMaJfe/JiwaD1pl1vGFaG0SxeAmRc=
1212
20251103222045_pgflow_fix_broadcast_order_and_timestamp_handling.sql h1:K/XnZpOmxfelsaNoJbR5HxhBrs/oW4aYja222h5cps4=
1313
20251104080523_pgflow_upgrade_pgmq_1_5_1.sql h1:Fw7zpMWnjhAHQ0qBJAprAvGl7dJMd8ExNHg8aKvkzTg=
1414
20251130011221_pgflow_temp_shape_utilities.sql h1:KzcP/xJjwfQ7BTbxdgaBzkfPztQcoUwuAmnZTBVqoIE=
15+
20251130012043_pgflow_temp_compilation_utilities.sql h1:Qn7RxYkbFd36hJYhOsuJdrcSlo8itqhmdAQLfmrP9+Y=
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
begin;
2+
select plan(4);
3+
select pgflow_tests.reset_db();
4+
5+
-- Test: Compile a simple sequential flow from shape
6+
select pgflow._create_flow_from_shape(
7+
'test_flow',
8+
'{
9+
"steps": [
10+
{"slug": "first", "stepType": "single", "dependencies": []},
11+
{"slug": "second", "stepType": "single", "dependencies": ["first"]},
12+
{"slug": "third", "stepType": "single", "dependencies": ["second"]}
13+
]
14+
}'::jsonb
15+
);
16+
17+
-- Verify flow was created
18+
select is(
19+
(select count(*)::int from pgflow.flows where flow_slug = 'test_flow'),
20+
1,
21+
'Flow should be created'
22+
);
23+
24+
-- Verify steps were created
25+
select is(
26+
(select count(*)::int from pgflow.steps where flow_slug = 'test_flow'),
27+
3,
28+
'All 3 steps should be created'
29+
);
30+
31+
-- Verify step order matches (step_index)
32+
select results_eq(
33+
$$ SELECT step_slug FROM pgflow.steps WHERE flow_slug = 'test_flow' ORDER BY step_index $$,
34+
$$ VALUES ('first'), ('second'), ('third') $$,
35+
'Steps should be in correct order'
36+
);
37+
38+
-- Verify dependencies were created
39+
select results_eq(
40+
$$ SELECT dep_slug, step_slug FROM pgflow.deps WHERE flow_slug = 'test_flow' ORDER BY step_slug $$,
41+
$$ VALUES ('first', 'second'), ('second', 'third') $$,
42+
'Dependencies should be created correctly'
43+
);
44+
45+
select finish();
46+
rollback;
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
begin;
2+
select plan(2);
3+
select pgflow_tests.reset_db();
4+
5+
-- Test: Compile flow with map step
6+
select pgflow._create_flow_from_shape(
7+
'map_flow',
8+
'{
9+
"steps": [
10+
{"slug": "root_map", "stepType": "map", "dependencies": []},
11+
{"slug": "process", "stepType": "single", "dependencies": ["root_map"]}
12+
]
13+
}'::jsonb
14+
);
15+
16+
-- Verify map step was created with correct type
17+
select is(
18+
(select step_type from pgflow.steps where flow_slug = 'map_flow' and step_slug = 'root_map'),
19+
'map',
20+
'Map step should have step_type = map'
21+
);
22+
23+
-- Verify shape round-trips correctly
24+
select is(
25+
pgflow._get_flow_shape('map_flow'),
26+
'{
27+
"steps": [
28+
{"slug": "root_map", "stepType": "map", "dependencies": []},
29+
{"slug": "process", "stepType": "single", "dependencies": ["root_map"]}
30+
]
31+
}'::jsonb,
32+
'Shape should round-trip correctly'
33+
);
34+
35+
select finish();
36+
rollback;
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
begin;
2+
select plan(3);
3+
select pgflow_tests.reset_db();
4+
5+
-- Setup: Create a flow with steps
6+
select pgflow.create_flow('test_flow');
7+
select pgflow.add_step('test_flow', 'first');
8+
select pgflow.add_step('test_flow', 'second', array['first']);
9+
10+
-- Verify setup
11+
select is(
12+
(select count(*)::int from pgflow.flows where flow_slug = 'test_flow'),
13+
1,
14+
'Flow should exist before deletion'
15+
);
16+
17+
-- Test: Delete the flow
18+
select pgflow.delete_flow_and_data('test_flow');
19+
20+
-- Verify deletion
21+
select is(
22+
(select count(*)::int from pgflow.flows where flow_slug = 'test_flow'),
23+
0,
24+
'Flow should be deleted'
25+
);
26+
27+
select is(
28+
(select count(*)::int from pgflow.steps where flow_slug = 'test_flow'),
29+
0,
30+
'Steps should be deleted'
31+
);
32+
33+
select finish();
34+
rollback;
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
begin;
2+
select plan(4);
3+
select pgflow_tests.reset_db();
4+
5+
-- Setup: Create flow, start it, create runtime data
6+
select pgflow.create_flow('test_flow');
7+
select pgflow.add_step('test_flow', 'first');
8+
select pgflow.start_flow('test_flow', '{"input": "test"}'::jsonb);
9+
10+
-- Verify runtime data exists
11+
select is(
12+
(select count(*)::int from pgflow.runs where flow_slug = 'test_flow'),
13+
1,
14+
'Run should exist before deletion'
15+
);
16+
17+
select is(
18+
(select count(*)::int from pgflow.step_states where flow_slug = 'test_flow'),
19+
1,
20+
'Step state should exist before deletion'
21+
);
22+
23+
-- Test: Delete the flow
24+
select pgflow.delete_flow_and_data('test_flow');
25+
26+
-- Verify all runtime data deleted
27+
select is(
28+
(select count(*)::int from pgflow.runs where flow_slug = 'test_flow'),
29+
0,
30+
'Runs should be deleted'
31+
);
32+
33+
select is(
34+
(select count(*)::int from pgflow.step_states where flow_slug = 'test_flow'),
35+
0,
36+
'Step states should be deleted'
37+
);
38+
39+
select finish();
40+
rollback;

pkgs/core/supabase/tests/maintenance/delete_flow_and_data.test.sql

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ begin;
22
select plan(9);
33
select pgflow_tests.reset_db();
44

5-
-- Load the delete_flow_and_data function
6-
\i _shared/delete_flow_and_data.sql.raw
5+
-- Note: delete_flow_and_data is now part of core schema (no longer needs to be loaded)
76

87
-- Create test flow with steps and dependencies
98
select pgflow.create_flow('test_flow_to_delete', max_attempts => 0);

0 commit comments

Comments
 (0)