From 9adebe89acda71f6516b97532c601df55e76d284 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Sun, 30 Nov 2025 02:15:28 +0100 Subject: [PATCH] feat(core): add shape extraction and comparison functions --- .../0100_function_compare_flow_shapes.sql | 115 +++++++++++++++ .../schemas/0100_function_get_flow_shape.sql | 34 +++++ pkgs/core/src/database-types.ts | 5 + ...1130011221_pgflow_temp_shape_utilities.sql | 133 ++++++++++++++++++ pkgs/core/supabase/migrations/atlas.sum | 3 +- .../dependencies_differ.test.sql | 27 ++++ .../matching_shapes.test.sql | 26 ++++ .../compare_flow_shapes/slug_differs.test.sql | 25 ++++ .../step_count_differs.test.sql | 26 ++++ .../step_type_differs.test.sql | 25 ++++ .../tests/get_flow_shape/basic_shape.test.sql | 25 ++++ .../tests/get_flow_shape/map_steps.test.sql | 31 ++++ .../multiple_deps_sorted.test.sql | 29 ++++ .../get_flow_shape/nonexistent_flow.test.sql | 13 ++ 14 files changed, 516 insertions(+), 1 deletion(-) create mode 100644 pkgs/core/schemas/0100_function_compare_flow_shapes.sql create mode 100644 pkgs/core/schemas/0100_function_get_flow_shape.sql create mode 100644 pkgs/core/supabase/migrations/20251130011221_pgflow_temp_shape_utilities.sql create mode 100644 pkgs/core/supabase/tests/compare_flow_shapes/dependencies_differ.test.sql create mode 100644 pkgs/core/supabase/tests/compare_flow_shapes/matching_shapes.test.sql create mode 100644 pkgs/core/supabase/tests/compare_flow_shapes/slug_differs.test.sql create mode 100644 pkgs/core/supabase/tests/compare_flow_shapes/step_count_differs.test.sql create mode 100644 pkgs/core/supabase/tests/compare_flow_shapes/step_type_differs.test.sql create mode 100644 pkgs/core/supabase/tests/get_flow_shape/basic_shape.test.sql create mode 100644 pkgs/core/supabase/tests/get_flow_shape/map_steps.test.sql create mode 100644 pkgs/core/supabase/tests/get_flow_shape/multiple_deps_sorted.test.sql create mode 100644 pkgs/core/supabase/tests/get_flow_shape/nonexistent_flow.test.sql diff --git a/pkgs/core/schemas/0100_function_compare_flow_shapes.sql b/pkgs/core/schemas/0100_function_compare_flow_shapes.sql new file mode 100644 index 000000000..193d8c564 --- /dev/null +++ b/pkgs/core/schemas/0100_function_compare_flow_shapes.sql @@ -0,0 +1,115 @@ +-- Compare two flow shapes and return array of difference descriptions +-- Mirrors TypeScript compareFlowShapes() function logic +create or replace function pgflow._compare_flow_shapes( + p_local jsonb, + p_db jsonb +) +returns text [] +language plpgsql +stable +set search_path to '' +as $fn$ +DECLARE + v_differences text[] := '{}'; + v_local_steps jsonb; + v_db_steps jsonb; + v_local_count int; + v_db_count int; + v_max_count int; + v_idx int; + v_local_step jsonb; + v_db_step jsonb; + v_local_deps text; + v_db_deps text; +BEGIN + v_local_steps := p_local->'steps'; + v_db_steps := p_db->'steps'; + v_local_count := jsonb_array_length(COALESCE(v_local_steps, '[]'::jsonb)); + v_db_count := jsonb_array_length(COALESCE(v_db_steps, '[]'::jsonb)); + + -- Compare step counts + IF v_local_count != v_db_count THEN + v_differences := array_append( + v_differences, + format('Step count differs: %s vs %s', v_local_count, v_db_count) + ); + END IF; + + -- Compare steps by index + v_max_count := GREATEST(v_local_count, v_db_count); + + FOR v_idx IN 0..(v_max_count - 1) LOOP + v_local_step := v_local_steps->v_idx; + v_db_step := v_db_steps->v_idx; + + IF v_local_step IS NULL THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: missing in first shape (second has '%s')$$, + v_idx, + v_db_step->>'slug' + ) + ); + ELSIF v_db_step IS NULL THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: missing in second shape (first has '%s')$$, + v_idx, + v_local_step->>'slug' + ) + ); + ELSE + -- Compare slug + IF v_local_step->>'slug' != v_db_step->>'slug' THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: slug differs '%s' vs '%s'$$, + v_idx, + v_local_step->>'slug', + v_db_step->>'slug' + ) + ); + END IF; + + -- Compare step type + IF v_local_step->>'stepType' != v_db_step->>'stepType' THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: type differs '%s' vs '%s'$$, + v_idx, + v_local_step->>'stepType', + v_db_step->>'stepType' + ) + ); + END IF; + + -- Compare dependencies (convert arrays to comma-separated strings) + SELECT string_agg(dep, ', ' ORDER BY dep) + INTO v_local_deps + FROM jsonb_array_elements_text(COALESCE(v_local_step->'dependencies', '[]'::jsonb)) AS dep; + + SELECT string_agg(dep, ', ' ORDER BY dep) + INTO v_db_deps + FROM jsonb_array_elements_text(COALESCE(v_db_step->'dependencies', '[]'::jsonb)) AS dep; + + IF COALESCE(v_local_deps, '') != COALESCE(v_db_deps, '') THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: dependencies differ [%s] vs [%s]$$, + v_idx, + COALESCE(v_local_deps, ''), + COALESCE(v_db_deps, '') + ) + ); + END IF; + END IF; + END LOOP; + + RETURN v_differences; +END; +$fn$; diff --git a/pkgs/core/schemas/0100_function_get_flow_shape.sql b/pkgs/core/schemas/0100_function_get_flow_shape.sql new file mode 100644 index 000000000..a725c820c --- /dev/null +++ b/pkgs/core/schemas/0100_function_get_flow_shape.sql @@ -0,0 +1,34 @@ +-- Get flow shape from database as JSONB +-- Returns structure matching TypeScript FlowShape interface: +-- { "steps": [{ "slug": "...", "stepType": "...", "dependencies": [...] }] } +create or replace function pgflow._get_flow_shape(p_flow_slug text) +returns jsonb +language sql +stable +set search_path to '' +as $$ + SELECT jsonb_build_object( + 'steps', + COALESCE( + jsonb_agg( + jsonb_build_object( + 'slug', step.step_slug, + 'stepType', step.step_type, + 'dependencies', COALESCE( + ( + SELECT jsonb_agg(dep.dep_slug ORDER BY dep.dep_slug) + FROM pgflow.deps AS dep + WHERE dep.flow_slug = step.flow_slug + AND dep.step_slug = step.step_slug + ), + '[]'::jsonb + ) + ) + ORDER BY step.step_index + ), + '[]'::jsonb + ) + ) + FROM pgflow.steps AS step + WHERE step.flow_slug = p_flow_slug; +$$; diff --git a/pkgs/core/src/database-types.ts b/pkgs/core/src/database-types.ts index f2e7d3326..c2bf0e5dc 100644 --- a/pkgs/core/src/database-types.ts +++ b/pkgs/core/src/database-types.ts @@ -346,6 +346,11 @@ export type Database = { [_ in never]: never } Functions: { + _compare_flow_shapes: { + Args: { p_db: Json; p_local: Json } + Returns: string[] + } + _get_flow_shape: { Args: { p_flow_slug: string }; Returns: Json } add_step: { Args: { base_delay?: number diff --git a/pkgs/core/supabase/migrations/20251130011221_pgflow_temp_shape_utilities.sql b/pkgs/core/supabase/migrations/20251130011221_pgflow_temp_shape_utilities.sql new file mode 100644 index 000000000..efe86fd0e --- /dev/null +++ b/pkgs/core/supabase/migrations/20251130011221_pgflow_temp_shape_utilities.sql @@ -0,0 +1,133 @@ +-- Create "_compare_flow_shapes" function +CREATE FUNCTION "pgflow"."_compare_flow_shapes" ("p_local" jsonb, "p_db" jsonb) RETURNS text[] LANGUAGE plpgsql STABLE SET "search_path" = '' AS $BODY$ +DECLARE + v_differences text[] := '{}'; + v_local_steps jsonb; + v_db_steps jsonb; + v_local_count int; + v_db_count int; + v_max_count int; + v_idx int; + v_local_step jsonb; + v_db_step jsonb; + v_local_deps text; + v_db_deps text; +BEGIN + v_local_steps := p_local->'steps'; + v_db_steps := p_db->'steps'; + v_local_count := jsonb_array_length(COALESCE(v_local_steps, '[]'::jsonb)); + v_db_count := jsonb_array_length(COALESCE(v_db_steps, '[]'::jsonb)); + + -- Compare step counts + IF v_local_count != v_db_count THEN + v_differences := array_append( + v_differences, + format('Step count differs: %s vs %s', v_local_count, v_db_count) + ); + END IF; + + -- Compare steps by index + v_max_count := GREATEST(v_local_count, v_db_count); + + FOR v_idx IN 0..(v_max_count - 1) LOOP + v_local_step := v_local_steps->v_idx; + v_db_step := v_db_steps->v_idx; + + IF v_local_step IS NULL THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: missing in first shape (second has '%s')$$, + v_idx, + v_db_step->>'slug' + ) + ); + ELSIF v_db_step IS NULL THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: missing in second shape (first has '%s')$$, + v_idx, + v_local_step->>'slug' + ) + ); + ELSE + -- Compare slug + IF v_local_step->>'slug' != v_db_step->>'slug' THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: slug differs '%s' vs '%s'$$, + v_idx, + v_local_step->>'slug', + v_db_step->>'slug' + ) + ); + END IF; + + -- Compare step type + IF v_local_step->>'stepType' != v_db_step->>'stepType' THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: type differs '%s' vs '%s'$$, + v_idx, + v_local_step->>'stepType', + v_db_step->>'stepType' + ) + ); + END IF; + + -- Compare dependencies (convert arrays to comma-separated strings) + SELECT string_agg(dep, ', ' ORDER BY dep) + INTO v_local_deps + FROM jsonb_array_elements_text(COALESCE(v_local_step->'dependencies', '[]'::jsonb)) AS dep; + + SELECT string_agg(dep, ', ' ORDER BY dep) + INTO v_db_deps + FROM jsonb_array_elements_text(COALESCE(v_db_step->'dependencies', '[]'::jsonb)) AS dep; + + IF COALESCE(v_local_deps, '') != COALESCE(v_db_deps, '') THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: dependencies differ [%s] vs [%s]$$, + v_idx, + COALESCE(v_local_deps, ''), + COALESCE(v_db_deps, '') + ) + ); + END IF; + END IF; + END LOOP; + + RETURN v_differences; +END; +$BODY$; +-- Create "_get_flow_shape" function +CREATE FUNCTION "pgflow"."_get_flow_shape" ("p_flow_slug" text) RETURNS jsonb LANGUAGE sql STABLE SET "search_path" = '' AS $$ +SELECT jsonb_build_object( + 'steps', + COALESCE( + jsonb_agg( + jsonb_build_object( + 'slug', step.step_slug, + 'stepType', step.step_type, + 'dependencies', COALESCE( + ( + SELECT jsonb_agg(dep.dep_slug ORDER BY dep.dep_slug) + FROM pgflow.deps AS dep + WHERE dep.flow_slug = step.flow_slug + AND dep.step_slug = step.step_slug + ), + '[]'::jsonb + ) + ) + ORDER BY step.step_index + ), + '[]'::jsonb + ) + ) + FROM pgflow.steps AS step + WHERE step.flow_slug = p_flow_slug; +$$; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 39c502d22..6b10371b4 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:TiGfW/P3IHRUFlm9TwimGZLK/80Gh5RfQm+x21jBuic= +h1:GJGcig/hHnOUiqhMaJfe/JiwaD1pl1vGFaG0SxeAmRc= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -11,3 +11,4 @@ h1:TiGfW/P3IHRUFlm9TwimGZLK/80Gh5RfQm+x21jBuic= 20251006073122_pgflow_add_map_step_type.sql h1:D/skgKpaVg5TM8bPovo9FUutQfg35/AzkxEcasYwytY= 20251103222045_pgflow_fix_broadcast_order_and_timestamp_handling.sql h1:K/XnZpOmxfelsaNoJbR5HxhBrs/oW4aYja222h5cps4= 20251104080523_pgflow_upgrade_pgmq_1_5_1.sql h1:Fw7zpMWnjhAHQ0qBJAprAvGl7dJMd8ExNHg8aKvkzTg= +20251130011221_pgflow_temp_shape_utilities.sql h1:KzcP/xJjwfQ7BTbxdgaBzkfPztQcoUwuAmnZTBVqoIE= diff --git a/pkgs/core/supabase/tests/compare_flow_shapes/dependencies_differ.test.sql b/pkgs/core/supabase/tests/compare_flow_shapes/dependencies_differ.test.sql new file mode 100644 index 000000000..6ddc9422f --- /dev/null +++ b/pkgs/core/supabase/tests/compare_flow_shapes/dependencies_differ.test.sql @@ -0,0 +1,27 @@ +begin; +select plan(1); +select pgflow_tests.reset_db(); + +-- Test: Different dependencies at same index should be detected +select ok( + $$Step at index 1: dependencies differ [alpha] vs [beta]$$ = ANY( + pgflow._compare_flow_shapes( + '{ + "steps": [ + {"slug": "first", "stepType": "single", "dependencies": []}, + {"slug": "second", "stepType": "single", "dependencies": ["alpha"]} + ] + }'::jsonb, + '{ + "steps": [ + {"slug": "first", "stepType": "single", "dependencies": []}, + {"slug": "second", "stepType": "single", "dependencies": ["beta"]} + ] + }'::jsonb + ) + ), + 'Should detect dependency difference' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/compare_flow_shapes/matching_shapes.test.sql b/pkgs/core/supabase/tests/compare_flow_shapes/matching_shapes.test.sql new file mode 100644 index 000000000..f138afb1b --- /dev/null +++ b/pkgs/core/supabase/tests/compare_flow_shapes/matching_shapes.test.sql @@ -0,0 +1,26 @@ +begin; +select plan(1); +select pgflow_tests.reset_db(); + +-- Test: Identical shapes should return empty differences array +select is( + pgflow._compare_flow_shapes( + '{ + "steps": [ + {"slug": "first", "stepType": "single", "dependencies": []}, + {"slug": "second", "stepType": "single", "dependencies": ["first"]} + ] + }'::jsonb, + '{ + "steps": [ + {"slug": "first", "stepType": "single", "dependencies": []}, + {"slug": "second", "stepType": "single", "dependencies": ["first"]} + ] + }'::jsonb + ), + '{}'::text[], + 'Identical shapes should have no differences' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/compare_flow_shapes/slug_differs.test.sql b/pkgs/core/supabase/tests/compare_flow_shapes/slug_differs.test.sql new file mode 100644 index 000000000..313a5442d --- /dev/null +++ b/pkgs/core/supabase/tests/compare_flow_shapes/slug_differs.test.sql @@ -0,0 +1,25 @@ +begin; +select plan(1); +select pgflow_tests.reset_db(); + +-- Test: Different slugs at same index should be detected +select ok( + $$Step at index 0: slug differs 'first' vs 'different'$$ = ANY( + pgflow._compare_flow_shapes( + '{ + "steps": [ + {"slug": "first", "stepType": "single", "dependencies": []} + ] + }'::jsonb, + '{ + "steps": [ + {"slug": "different", "stepType": "single", "dependencies": []} + ] + }'::jsonb + ) + ), + 'Should detect slug difference at same index' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/compare_flow_shapes/step_count_differs.test.sql b/pkgs/core/supabase/tests/compare_flow_shapes/step_count_differs.test.sql new file mode 100644 index 000000000..6332269e8 --- /dev/null +++ b/pkgs/core/supabase/tests/compare_flow_shapes/step_count_differs.test.sql @@ -0,0 +1,26 @@ +begin; +select plan(1); +select pgflow_tests.reset_db(); + +-- Test: Different step counts should be detected +select ok( + 'Step count differs: 2 vs 1' = ANY( + pgflow._compare_flow_shapes( + '{ + "steps": [ + {"slug": "first", "stepType": "single", "dependencies": []}, + {"slug": "second", "stepType": "single", "dependencies": ["first"]} + ] + }'::jsonb, + '{ + "steps": [ + {"slug": "first", "stepType": "single", "dependencies": []} + ] + }'::jsonb + ) + ), + 'Should detect step count difference' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/compare_flow_shapes/step_type_differs.test.sql b/pkgs/core/supabase/tests/compare_flow_shapes/step_type_differs.test.sql new file mode 100644 index 000000000..b533a46ba --- /dev/null +++ b/pkgs/core/supabase/tests/compare_flow_shapes/step_type_differs.test.sql @@ -0,0 +1,25 @@ +begin; +select plan(1); +select pgflow_tests.reset_db(); + +-- Test: Different step types at same index should be detected +select ok( + $$Step at index 0: type differs 'single' vs 'map'$$ = ANY( + pgflow._compare_flow_shapes( + '{ + "steps": [ + {"slug": "first", "stepType": "single", "dependencies": []} + ] + }'::jsonb, + '{ + "steps": [ + {"slug": "first", "stepType": "map", "dependencies": []} + ] + }'::jsonb + ) + ), + 'Should detect step type difference' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/get_flow_shape/basic_shape.test.sql b/pkgs/core/supabase/tests/get_flow_shape/basic_shape.test.sql new file mode 100644 index 000000000..573e276dc --- /dev/null +++ b/pkgs/core/supabase/tests/get_flow_shape/basic_shape.test.sql @@ -0,0 +1,25 @@ +begin; +select plan(1); +select pgflow_tests.reset_db(); + +-- Setup: Create a simple flow with 3 steps +select pgflow.create_flow('test_flow'); +select pgflow.add_step('test_flow', 'first'); +select pgflow.add_step('test_flow', 'second', array['first']); +select pgflow.add_step('test_flow', 'third', array['second']); + +-- Test: Get flow shape +select is( + pgflow._get_flow_shape('test_flow'), + '{ + "steps": [ + {"slug": "first", "stepType": "single", "dependencies": []}, + {"slug": "second", "stepType": "single", "dependencies": ["first"]}, + {"slug": "third", "stepType": "single", "dependencies": ["second"]} + ] + }'::jsonb, + 'Should return correct shape for simple sequential flow' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/get_flow_shape/map_steps.test.sql b/pkgs/core/supabase/tests/get_flow_shape/map_steps.test.sql new file mode 100644 index 000000000..292bb9a3d --- /dev/null +++ b/pkgs/core/supabase/tests/get_flow_shape/map_steps.test.sql @@ -0,0 +1,31 @@ +begin; +select plan(1); +select pgflow_tests.reset_db(); + +-- Setup: Create flow with map steps +select pgflow.create_flow('map_flow'); +select pgflow.add_step( + flow_slug => 'map_flow', + step_slug => 'root_map', + step_type => 'map' +); +select pgflow.add_step( + flow_slug => 'map_flow', + step_slug => 'process', + deps_slugs => array['root_map'] +); + +-- Test: Get flow shape with map step +select is( + pgflow._get_flow_shape('map_flow'), + '{ + "steps": [ + {"slug": "root_map", "stepType": "map", "dependencies": []}, + {"slug": "process", "stepType": "single", "dependencies": ["root_map"]} + ] + }'::jsonb, + 'Should correctly identify map step type' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/get_flow_shape/multiple_deps_sorted.test.sql b/pkgs/core/supabase/tests/get_flow_shape/multiple_deps_sorted.test.sql new file mode 100644 index 000000000..a92436dd1 --- /dev/null +++ b/pkgs/core/supabase/tests/get_flow_shape/multiple_deps_sorted.test.sql @@ -0,0 +1,29 @@ +begin; +select plan(1); +select pgflow_tests.reset_db(); + +-- Setup: Create flow where a step has multiple dependencies +-- Dependencies should be returned sorted alphabetically +select pgflow.create_flow('multi_deps'); +select pgflow.add_step('multi_deps', 'alpha'); +select pgflow.add_step('multi_deps', 'beta'); +select pgflow.add_step('multi_deps', 'gamma'); +-- 'final' depends on all three - they should appear sorted +select pgflow.add_step('multi_deps', 'final', array['gamma', 'alpha', 'beta']); + +-- Test: Dependencies should be sorted alphabetically +select is( + pgflow._get_flow_shape('multi_deps'), + '{ + "steps": [ + {"slug": "alpha", "stepType": "single", "dependencies": []}, + {"slug": "beta", "stepType": "single", "dependencies": []}, + {"slug": "gamma", "stepType": "single", "dependencies": []}, + {"slug": "final", "stepType": "single", "dependencies": ["alpha", "beta", "gamma"]} + ] + }'::jsonb, + 'Dependencies should be sorted alphabetically' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/get_flow_shape/nonexistent_flow.test.sql b/pkgs/core/supabase/tests/get_flow_shape/nonexistent_flow.test.sql new file mode 100644 index 000000000..6c8c41615 --- /dev/null +++ b/pkgs/core/supabase/tests/get_flow_shape/nonexistent_flow.test.sql @@ -0,0 +1,13 @@ +begin; +select plan(1); +select pgflow_tests.reset_db(); + +-- Test: Nonexistent flow should return empty steps array +select is( + pgflow._get_flow_shape('nonexistent'), + '{"steps": []}'::jsonb, + 'Should return empty steps for nonexistent flow' +); + +select finish(); +rollback;