Skip to content

Commit 9adebe8

Browse files
committed
feat(core): add shape extraction and comparison functions
1 parent 19fbfb4 commit 9adebe8

14 files changed

+516
-1
lines changed
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
-- Compare two flow shapes and return array of difference descriptions
2+
-- Mirrors TypeScript compareFlowShapes() function logic
3+
create or replace function pgflow._compare_flow_shapes(
4+
p_local jsonb,
5+
p_db jsonb
6+
)
7+
returns text []
8+
language plpgsql
9+
stable
10+
set search_path to ''
11+
as $fn$
12+
DECLARE
13+
v_differences text[] := '{}';
14+
v_local_steps jsonb;
15+
v_db_steps jsonb;
16+
v_local_count int;
17+
v_db_count int;
18+
v_max_count int;
19+
v_idx int;
20+
v_local_step jsonb;
21+
v_db_step jsonb;
22+
v_local_deps text;
23+
v_db_deps text;
24+
BEGIN
25+
v_local_steps := p_local->'steps';
26+
v_db_steps := p_db->'steps';
27+
v_local_count := jsonb_array_length(COALESCE(v_local_steps, '[]'::jsonb));
28+
v_db_count := jsonb_array_length(COALESCE(v_db_steps, '[]'::jsonb));
29+
30+
-- Compare step counts
31+
IF v_local_count != v_db_count THEN
32+
v_differences := array_append(
33+
v_differences,
34+
format('Step count differs: %s vs %s', v_local_count, v_db_count)
35+
);
36+
END IF;
37+
38+
-- Compare steps by index
39+
v_max_count := GREATEST(v_local_count, v_db_count);
40+
41+
FOR v_idx IN 0..(v_max_count - 1) LOOP
42+
v_local_step := v_local_steps->v_idx;
43+
v_db_step := v_db_steps->v_idx;
44+
45+
IF v_local_step IS NULL THEN
46+
v_differences := array_append(
47+
v_differences,
48+
format(
49+
$$Step at index %s: missing in first shape (second has '%s')$$,
50+
v_idx,
51+
v_db_step->>'slug'
52+
)
53+
);
54+
ELSIF v_db_step IS NULL THEN
55+
v_differences := array_append(
56+
v_differences,
57+
format(
58+
$$Step at index %s: missing in second shape (first has '%s')$$,
59+
v_idx,
60+
v_local_step->>'slug'
61+
)
62+
);
63+
ELSE
64+
-- Compare slug
65+
IF v_local_step->>'slug' != v_db_step->>'slug' THEN
66+
v_differences := array_append(
67+
v_differences,
68+
format(
69+
$$Step at index %s: slug differs '%s' vs '%s'$$,
70+
v_idx,
71+
v_local_step->>'slug',
72+
v_db_step->>'slug'
73+
)
74+
);
75+
END IF;
76+
77+
-- Compare step type
78+
IF v_local_step->>'stepType' != v_db_step->>'stepType' THEN
79+
v_differences := array_append(
80+
v_differences,
81+
format(
82+
$$Step at index %s: type differs '%s' vs '%s'$$,
83+
v_idx,
84+
v_local_step->>'stepType',
85+
v_db_step->>'stepType'
86+
)
87+
);
88+
END IF;
89+
90+
-- Compare dependencies (convert arrays to comma-separated strings)
91+
SELECT string_agg(dep, ', ' ORDER BY dep)
92+
INTO v_local_deps
93+
FROM jsonb_array_elements_text(COALESCE(v_local_step->'dependencies', '[]'::jsonb)) AS dep;
94+
95+
SELECT string_agg(dep, ', ' ORDER BY dep)
96+
INTO v_db_deps
97+
FROM jsonb_array_elements_text(COALESCE(v_db_step->'dependencies', '[]'::jsonb)) AS dep;
98+
99+
IF COALESCE(v_local_deps, '') != COALESCE(v_db_deps, '') THEN
100+
v_differences := array_append(
101+
v_differences,
102+
format(
103+
$$Step at index %s: dependencies differ [%s] vs [%s]$$,
104+
v_idx,
105+
COALESCE(v_local_deps, ''),
106+
COALESCE(v_db_deps, '')
107+
)
108+
);
109+
END IF;
110+
END IF;
111+
END LOOP;
112+
113+
RETURN v_differences;
114+
END;
115+
$fn$;
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
-- Get flow shape from database as JSONB
2+
-- Returns structure matching TypeScript FlowShape interface:
3+
-- { "steps": [{ "slug": "...", "stepType": "...", "dependencies": [...] }] }
4+
create or replace function pgflow._get_flow_shape(p_flow_slug text)
5+
returns jsonb
6+
language sql
7+
stable
8+
set search_path to ''
9+
as $$
10+
SELECT jsonb_build_object(
11+
'steps',
12+
COALESCE(
13+
jsonb_agg(
14+
jsonb_build_object(
15+
'slug', step.step_slug,
16+
'stepType', step.step_type,
17+
'dependencies', COALESCE(
18+
(
19+
SELECT jsonb_agg(dep.dep_slug ORDER BY dep.dep_slug)
20+
FROM pgflow.deps AS dep
21+
WHERE dep.flow_slug = step.flow_slug
22+
AND dep.step_slug = step.step_slug
23+
),
24+
'[]'::jsonb
25+
)
26+
)
27+
ORDER BY step.step_index
28+
),
29+
'[]'::jsonb
30+
)
31+
)
32+
FROM pgflow.steps AS step
33+
WHERE step.flow_slug = p_flow_slug;
34+
$$;

pkgs/core/src/database-types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,11 @@ export type Database = {
346346
[_ in never]: never
347347
}
348348
Functions: {
349+
_compare_flow_shapes: {
350+
Args: { p_db: Json; p_local: Json }
351+
Returns: string[]
352+
}
353+
_get_flow_shape: { Args: { p_flow_slug: string }; Returns: Json }
349354
add_step: {
350355
Args: {
351356
base_delay?: number
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
-- Create "_compare_flow_shapes" function
2+
CREATE FUNCTION "pgflow"."_compare_flow_shapes" ("p_local" jsonb, "p_db" jsonb) RETURNS text[] LANGUAGE plpgsql STABLE SET "search_path" = '' AS $BODY$
3+
DECLARE
4+
v_differences text[] := '{}';
5+
v_local_steps jsonb;
6+
v_db_steps jsonb;
7+
v_local_count int;
8+
v_db_count int;
9+
v_max_count int;
10+
v_idx int;
11+
v_local_step jsonb;
12+
v_db_step jsonb;
13+
v_local_deps text;
14+
v_db_deps text;
15+
BEGIN
16+
v_local_steps := p_local->'steps';
17+
v_db_steps := p_db->'steps';
18+
v_local_count := jsonb_array_length(COALESCE(v_local_steps, '[]'::jsonb));
19+
v_db_count := jsonb_array_length(COALESCE(v_db_steps, '[]'::jsonb));
20+
21+
-- Compare step counts
22+
IF v_local_count != v_db_count THEN
23+
v_differences := array_append(
24+
v_differences,
25+
format('Step count differs: %s vs %s', v_local_count, v_db_count)
26+
);
27+
END IF;
28+
29+
-- Compare steps by index
30+
v_max_count := GREATEST(v_local_count, v_db_count);
31+
32+
FOR v_idx IN 0..(v_max_count - 1) LOOP
33+
v_local_step := v_local_steps->v_idx;
34+
v_db_step := v_db_steps->v_idx;
35+
36+
IF v_local_step IS NULL THEN
37+
v_differences := array_append(
38+
v_differences,
39+
format(
40+
$$Step at index %s: missing in first shape (second has '%s')$$,
41+
v_idx,
42+
v_db_step->>'slug'
43+
)
44+
);
45+
ELSIF v_db_step IS NULL THEN
46+
v_differences := array_append(
47+
v_differences,
48+
format(
49+
$$Step at index %s: missing in second shape (first has '%s')$$,
50+
v_idx,
51+
v_local_step->>'slug'
52+
)
53+
);
54+
ELSE
55+
-- Compare slug
56+
IF v_local_step->>'slug' != v_db_step->>'slug' THEN
57+
v_differences := array_append(
58+
v_differences,
59+
format(
60+
$$Step at index %s: slug differs '%s' vs '%s'$$,
61+
v_idx,
62+
v_local_step->>'slug',
63+
v_db_step->>'slug'
64+
)
65+
);
66+
END IF;
67+
68+
-- Compare step type
69+
IF v_local_step->>'stepType' != v_db_step->>'stepType' THEN
70+
v_differences := array_append(
71+
v_differences,
72+
format(
73+
$$Step at index %s: type differs '%s' vs '%s'$$,
74+
v_idx,
75+
v_local_step->>'stepType',
76+
v_db_step->>'stepType'
77+
)
78+
);
79+
END IF;
80+
81+
-- Compare dependencies (convert arrays to comma-separated strings)
82+
SELECT string_agg(dep, ', ' ORDER BY dep)
83+
INTO v_local_deps
84+
FROM jsonb_array_elements_text(COALESCE(v_local_step->'dependencies', '[]'::jsonb)) AS dep;
85+
86+
SELECT string_agg(dep, ', ' ORDER BY dep)
87+
INTO v_db_deps
88+
FROM jsonb_array_elements_text(COALESCE(v_db_step->'dependencies', '[]'::jsonb)) AS dep;
89+
90+
IF COALESCE(v_local_deps, '') != COALESCE(v_db_deps, '') THEN
91+
v_differences := array_append(
92+
v_differences,
93+
format(
94+
$$Step at index %s: dependencies differ [%s] vs [%s]$$,
95+
v_idx,
96+
COALESCE(v_local_deps, ''),
97+
COALESCE(v_db_deps, '')
98+
)
99+
);
100+
END IF;
101+
END IF;
102+
END LOOP;
103+
104+
RETURN v_differences;
105+
END;
106+
$BODY$;
107+
-- Create "_get_flow_shape" function
108+
CREATE FUNCTION "pgflow"."_get_flow_shape" ("p_flow_slug" text) RETURNS jsonb LANGUAGE sql STABLE SET "search_path" = '' AS $$
109+
SELECT jsonb_build_object(
110+
'steps',
111+
COALESCE(
112+
jsonb_agg(
113+
jsonb_build_object(
114+
'slug', step.step_slug,
115+
'stepType', step.step_type,
116+
'dependencies', COALESCE(
117+
(
118+
SELECT jsonb_agg(dep.dep_slug ORDER BY dep.dep_slug)
119+
FROM pgflow.deps AS dep
120+
WHERE dep.flow_slug = step.flow_slug
121+
AND dep.step_slug = step.step_slug
122+
),
123+
'[]'::jsonb
124+
)
125+
)
126+
ORDER BY step.step_index
127+
),
128+
'[]'::jsonb
129+
)
130+
)
131+
FROM pgflow.steps AS step
132+
WHERE step.flow_slug = p_flow_slug;
133+
$$;

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:TiGfW/P3IHRUFlm9TwimGZLK/80Gh5RfQm+x21jBuic=
1+
h1:GJGcig/hHnOUiqhMaJfe/JiwaD1pl1vGFaG0SxeAmRc=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -11,3 +11,4 @@ h1:TiGfW/P3IHRUFlm9TwimGZLK/80Gh5RfQm+x21jBuic=
1111
20251006073122_pgflow_add_map_step_type.sql h1:D/skgKpaVg5TM8bPovo9FUutQfg35/AzkxEcasYwytY=
1212
20251103222045_pgflow_fix_broadcast_order_and_timestamp_handling.sql h1:K/XnZpOmxfelsaNoJbR5HxhBrs/oW4aYja222h5cps4=
1313
20251104080523_pgflow_upgrade_pgmq_1_5_1.sql h1:Fw7zpMWnjhAHQ0qBJAprAvGl7dJMd8ExNHg8aKvkzTg=
14+
20251130011221_pgflow_temp_shape_utilities.sql h1:KzcP/xJjwfQ7BTbxdgaBzkfPztQcoUwuAmnZTBVqoIE=
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
begin;
2+
select plan(1);
3+
select pgflow_tests.reset_db();
4+
5+
-- Test: Different dependencies at same index should be detected
6+
select ok(
7+
$$Step at index 1: dependencies differ [alpha] vs [beta]$$ = ANY(
8+
pgflow._compare_flow_shapes(
9+
'{
10+
"steps": [
11+
{"slug": "first", "stepType": "single", "dependencies": []},
12+
{"slug": "second", "stepType": "single", "dependencies": ["alpha"]}
13+
]
14+
}'::jsonb,
15+
'{
16+
"steps": [
17+
{"slug": "first", "stepType": "single", "dependencies": []},
18+
{"slug": "second", "stepType": "single", "dependencies": ["beta"]}
19+
]
20+
}'::jsonb
21+
)
22+
),
23+
'Should detect dependency difference'
24+
);
25+
26+
select finish();
27+
rollback;
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
begin;
2+
select plan(1);
3+
select pgflow_tests.reset_db();
4+
5+
-- Test: Identical shapes should return empty differences array
6+
select is(
7+
pgflow._compare_flow_shapes(
8+
'{
9+
"steps": [
10+
{"slug": "first", "stepType": "single", "dependencies": []},
11+
{"slug": "second", "stepType": "single", "dependencies": ["first"]}
12+
]
13+
}'::jsonb,
14+
'{
15+
"steps": [
16+
{"slug": "first", "stepType": "single", "dependencies": []},
17+
{"slug": "second", "stepType": "single", "dependencies": ["first"]}
18+
]
19+
}'::jsonb
20+
),
21+
'{}'::text[],
22+
'Identical shapes should have no differences'
23+
);
24+
25+
select finish();
26+
rollback;
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
begin;
2+
select plan(1);
3+
select pgflow_tests.reset_db();
4+
5+
-- Test: Different slugs at same index should be detected
6+
select ok(
7+
$$Step at index 0: slug differs 'first' vs 'different'$$ = ANY(
8+
pgflow._compare_flow_shapes(
9+
'{
10+
"steps": [
11+
{"slug": "first", "stepType": "single", "dependencies": []}
12+
]
13+
}'::jsonb,
14+
'{
15+
"steps": [
16+
{"slug": "different", "stepType": "single", "dependencies": []}
17+
]
18+
}'::jsonb
19+
)
20+
),
21+
'Should detect slug difference at same index'
22+
);
23+
24+
select finish();
25+
rollback;

0 commit comments

Comments
 (0)