Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions pkgs/core/schemas/0100_function_compare_flow_shapes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
-- Compare two flow shapes and return array of difference descriptions
-- Mirrors TypeScript compareFlowShapes() function logic
create or replace function pgflow._compare_flow_shapes(
p_local jsonb,
p_db jsonb
)
returns text []
language plpgsql
stable
set search_path to ''
as $fn$
DECLARE
v_differences text[] := '{}';
v_local_steps jsonb;
v_db_steps jsonb;
v_local_count int;
v_db_count int;
v_max_count int;
v_idx int;
v_local_step jsonb;
v_db_step jsonb;
v_local_deps text;
v_db_deps text;
BEGIN
v_local_steps := p_local->'steps';
v_db_steps := p_db->'steps';
v_local_count := jsonb_array_length(COALESCE(v_local_steps, '[]'::jsonb));
v_db_count := jsonb_array_length(COALESCE(v_db_steps, '[]'::jsonb));

-- Compare step counts
IF v_local_count != v_db_count THEN
v_differences := array_append(
v_differences,
format('Step count differs: %s vs %s', v_local_count, v_db_count)
);
END IF;

-- Compare steps by index
v_max_count := GREATEST(v_local_count, v_db_count);

FOR v_idx IN 0..(v_max_count - 1) LOOP
v_local_step := v_local_steps->v_idx;
v_db_step := v_db_steps->v_idx;

IF v_local_step IS NULL THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: missing in first shape (second has '%s')$$,
v_idx,
v_db_step->>'slug'
)
);
ELSIF v_db_step IS NULL THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: missing in second shape (first has '%s')$$,
v_idx,
v_local_step->>'slug'
)
);
ELSE
-- Compare slug
IF v_local_step->>'slug' != v_db_step->>'slug' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: slug differs '%s' vs '%s'$$,
v_idx,
v_local_step->>'slug',
v_db_step->>'slug'
)
);
END IF;

-- Compare step type
IF v_local_step->>'stepType' != v_db_step->>'stepType' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: type differs '%s' vs '%s'$$,
v_idx,
v_local_step->>'stepType',
v_db_step->>'stepType'
)
);
END IF;

-- Compare dependencies (convert arrays to comma-separated strings)
SELECT string_agg(dep, ', ' ORDER BY dep)
INTO v_local_deps
FROM jsonb_array_elements_text(COALESCE(v_local_step->'dependencies', '[]'::jsonb)) AS dep;

SELECT string_agg(dep, ', ' ORDER BY dep)
INTO v_db_deps
FROM jsonb_array_elements_text(COALESCE(v_db_step->'dependencies', '[]'::jsonb)) AS dep;

IF COALESCE(v_local_deps, '') != COALESCE(v_db_deps, '') THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: dependencies differ [%s] vs [%s]$$,
v_idx,
COALESCE(v_local_deps, ''),
COALESCE(v_db_deps, '')
)
);
END IF;
END IF;
END LOOP;

RETURN v_differences;
END;
$fn$;
34 changes: 34 additions & 0 deletions pkgs/core/schemas/0100_function_get_flow_shape.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- Get flow shape from database as JSONB
-- Returns structure matching TypeScript FlowShape interface:
-- { "steps": [{ "slug": "...", "stepType": "...", "dependencies": [...] }] }
create or replace function pgflow._get_flow_shape(p_flow_slug text)
returns jsonb
language sql
stable
set search_path to ''
as $$
SELECT jsonb_build_object(
'steps',
COALESCE(
jsonb_agg(
jsonb_build_object(
'slug', step.step_slug,
'stepType', step.step_type,
'dependencies', COALESCE(
(
SELECT jsonb_agg(dep.dep_slug ORDER BY dep.dep_slug)
FROM pgflow.deps AS dep
WHERE dep.flow_slug = step.flow_slug
AND dep.step_slug = step.step_slug
),
'[]'::jsonb
)
)
ORDER BY step.step_index
),
'[]'::jsonb
)
)
FROM pgflow.steps AS step
WHERE step.flow_slug = p_flow_slug;
$$;
5 changes: 5 additions & 0 deletions pkgs/core/src/database-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,11 @@ export type Database = {
[_ in never]: never
}
Functions: {
_compare_flow_shapes: {
Args: { p_db: Json; p_local: Json }
Returns: string[]
}
_get_flow_shape: { Args: { p_flow_slug: string }; Returns: Json }
add_step: {
Args: {
base_delay?: number
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
-- Create "_compare_flow_shapes" function
CREATE FUNCTION "pgflow"."_compare_flow_shapes" ("p_local" jsonb, "p_db" jsonb) RETURNS text[] LANGUAGE plpgsql STABLE SET "search_path" = '' AS $BODY$
DECLARE
v_differences text[] := '{}';
v_local_steps jsonb;
v_db_steps jsonb;
v_local_count int;
v_db_count int;
v_max_count int;
v_idx int;
v_local_step jsonb;
v_db_step jsonb;
v_local_deps text;
v_db_deps text;
BEGIN
v_local_steps := p_local->'steps';
v_db_steps := p_db->'steps';
v_local_count := jsonb_array_length(COALESCE(v_local_steps, '[]'::jsonb));
v_db_count := jsonb_array_length(COALESCE(v_db_steps, '[]'::jsonb));

-- Compare step counts
IF v_local_count != v_db_count THEN
v_differences := array_append(
v_differences,
format('Step count differs: %s vs %s', v_local_count, v_db_count)
);
END IF;

-- Compare steps by index
v_max_count := GREATEST(v_local_count, v_db_count);

FOR v_idx IN 0..(v_max_count - 1) LOOP
v_local_step := v_local_steps->v_idx;
v_db_step := v_db_steps->v_idx;

IF v_local_step IS NULL THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: missing in first shape (second has '%s')$$,
v_idx,
v_db_step->>'slug'
)
);
ELSIF v_db_step IS NULL THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: missing in second shape (first has '%s')$$,
v_idx,
v_local_step->>'slug'
)
);
ELSE
-- Compare slug
IF v_local_step->>'slug' != v_db_step->>'slug' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: slug differs '%s' vs '%s'$$,
v_idx,
v_local_step->>'slug',
v_db_step->>'slug'
)
);
END IF;

-- Compare step type
IF v_local_step->>'stepType' != v_db_step->>'stepType' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: type differs '%s' vs '%s'$$,
v_idx,
v_local_step->>'stepType',
v_db_step->>'stepType'
)
);
END IF;

-- Compare dependencies (convert arrays to comma-separated strings)
SELECT string_agg(dep, ', ' ORDER BY dep)
INTO v_local_deps
FROM jsonb_array_elements_text(COALESCE(v_local_step->'dependencies', '[]'::jsonb)) AS dep;

SELECT string_agg(dep, ', ' ORDER BY dep)
INTO v_db_deps
FROM jsonb_array_elements_text(COALESCE(v_db_step->'dependencies', '[]'::jsonb)) AS dep;

IF COALESCE(v_local_deps, '') != COALESCE(v_db_deps, '') THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: dependencies differ [%s] vs [%s]$$,
v_idx,
COALESCE(v_local_deps, ''),
COALESCE(v_db_deps, '')
)
);
END IF;
END IF;
END LOOP;

RETURN v_differences;
END;
$BODY$;
-- Create "_get_flow_shape" function
CREATE FUNCTION "pgflow"."_get_flow_shape" ("p_flow_slug" text) RETURNS jsonb LANGUAGE sql STABLE SET "search_path" = '' AS $$
SELECT jsonb_build_object(
'steps',
COALESCE(
jsonb_agg(
jsonb_build_object(
'slug', step.step_slug,
'stepType', step.step_type,
'dependencies', COALESCE(
(
SELECT jsonb_agg(dep.dep_slug ORDER BY dep.dep_slug)
FROM pgflow.deps AS dep
WHERE dep.flow_slug = step.flow_slug
AND dep.step_slug = step.step_slug
),
'[]'::jsonb
)
)
ORDER BY step.step_index
),
'[]'::jsonb
)
)
FROM pgflow.steps AS step
WHERE step.flow_slug = p_flow_slug;
$$;
3 changes: 2 additions & 1 deletion pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:TiGfW/P3IHRUFlm9TwimGZLK/80Gh5RfQm+x21jBuic=
h1:GJGcig/hHnOUiqhMaJfe/JiwaD1pl1vGFaG0SxeAmRc=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -11,3 +11,4 @@ h1:TiGfW/P3IHRUFlm9TwimGZLK/80Gh5RfQm+x21jBuic=
20251006073122_pgflow_add_map_step_type.sql h1:D/skgKpaVg5TM8bPovo9FUutQfg35/AzkxEcasYwytY=
20251103222045_pgflow_fix_broadcast_order_and_timestamp_handling.sql h1:K/XnZpOmxfelsaNoJbR5HxhBrs/oW4aYja222h5cps4=
20251104080523_pgflow_upgrade_pgmq_1_5_1.sql h1:Fw7zpMWnjhAHQ0qBJAprAvGl7dJMd8ExNHg8aKvkzTg=
20251130011221_pgflow_temp_shape_utilities.sql h1:KzcP/xJjwfQ7BTbxdgaBzkfPztQcoUwuAmnZTBVqoIE=
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
begin;
select plan(1);
select pgflow_tests.reset_db();

-- Test: Different dependencies at same index should be detected
select ok(
$$Step at index 1: dependencies differ [alpha] vs [beta]$$ = ANY(
pgflow._compare_flow_shapes(
'{
"steps": [
{"slug": "first", "stepType": "single", "dependencies": []},
{"slug": "second", "stepType": "single", "dependencies": ["alpha"]}
]
}'::jsonb,
'{
"steps": [
{"slug": "first", "stepType": "single", "dependencies": []},
{"slug": "second", "stepType": "single", "dependencies": ["beta"]}
]
}'::jsonb
)
),
'Should detect dependency difference'
);

select finish();
rollback;
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
begin;
select plan(1);
select pgflow_tests.reset_db();

-- Test: Identical shapes should return empty differences array
select is(
pgflow._compare_flow_shapes(
'{
"steps": [
{"slug": "first", "stepType": "single", "dependencies": []},
{"slug": "second", "stepType": "single", "dependencies": ["first"]}
]
}'::jsonb,
'{
"steps": [
{"slug": "first", "stepType": "single", "dependencies": []},
{"slug": "second", "stepType": "single", "dependencies": ["first"]}
]
}'::jsonb
),
'{}'::text[],
'Identical shapes should have no differences'
);

select finish();
rollback;
25 changes: 25 additions & 0 deletions pkgs/core/supabase/tests/compare_flow_shapes/slug_differs.test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
begin;
select plan(1);
select pgflow_tests.reset_db();

-- Test: Different slugs at same index should be detected
select ok(
$$Step at index 0: slug differs 'first' vs 'different'$$ = ANY(
pgflow._compare_flow_shapes(
'{
"steps": [
{"slug": "first", "stepType": "single", "dependencies": []}
]
}'::jsonb,
'{
"steps": [
{"slug": "different", "stepType": "single", "dependencies": []}
]
}'::jsonb
)
),
'Should detect slug difference at same index'
);

select finish();
rollback;
Loading
Loading