From 4cd34c0028f5dd68e0b1d82cd171e7c071d76405 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Sun, 30 Nov 2025 02:30:14 +0100 Subject: [PATCH] feat(core): add ensure_flow_compiled function --- .../0100_function_ensure_flow_compiled.sql | 51 ++++++++++++++++++ pkgs/core/src/database-types.ts | 4 ++ ...12803_pgflow_temp_ensure_flow_compiled.sql | 40 ++++++++++++++ pkgs/core/supabase/migrations/atlas.sum | 3 +- .../compiles_missing_flow.test.sql | 36 +++++++++++++ .../fails_on_production_mismatch.test.sql | 52 +++++++++++++++++++ .../recompiles_in_development_mode.test.sql | 42 +++++++++++++++ .../verifies_matching_shape.test.sql | 47 +++++++++++++++++ 8 files changed, 274 insertions(+), 1 deletion(-) create mode 100644 pkgs/core/schemas/0100_function_ensure_flow_compiled.sql create mode 100644 pkgs/core/supabase/migrations/20251130012803_pgflow_temp_ensure_flow_compiled.sql create mode 100644 pkgs/core/supabase/tests/ensure_flow_compiled/compiles_missing_flow.test.sql create mode 100644 pkgs/core/supabase/tests/ensure_flow_compiled/fails_on_production_mismatch.test.sql create mode 100644 pkgs/core/supabase/tests/ensure_flow_compiled/recompiles_in_development_mode.test.sql create mode 100644 pkgs/core/supabase/tests/ensure_flow_compiled/verifies_matching_shape.test.sql diff --git a/pkgs/core/schemas/0100_function_ensure_flow_compiled.sql b/pkgs/core/schemas/0100_function_ensure_flow_compiled.sql new file mode 100644 index 000000000..cf62ae26b --- /dev/null +++ b/pkgs/core/schemas/0100_function_ensure_flow_compiled.sql @@ -0,0 +1,51 @@ +-- Ensure a flow is compiled in the database +-- Handles both development (auto-recompile) and production (fail on mismatch) modes +-- Returns: { status: 'compiled' | 'verified' | 'recompiled' | 'mismatch', differences: text[] } +create or replace function pgflow.ensure_flow_compiled( + p_flow_slug text, + p_shape jsonb, + p_mode text default 'production' -- 'development' | 'production' +) +returns jsonb +language plpgsql +volatile +set search_path to '' +as $$ +DECLARE + v_flow_exists boolean; + v_db_shape jsonb; + v_differences text[]; +BEGIN + -- 1. Check if flow exists + SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug) + INTO v_flow_exists; + + -- 2. If flow missing: compile (both modes) + IF NOT v_flow_exists THEN + PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape); + RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb); + END IF; + + -- 3. Get current shape from DB + v_db_shape := pgflow._get_flow_shape(p_flow_slug); + + -- 4. Compare shapes + v_differences := pgflow._compare_flow_shapes(p_shape, v_db_shape); + + -- 5. If shapes match: return verified + IF array_length(v_differences, 1) IS NULL THEN + RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb); + END IF; + + -- 6. Shapes differ - handle by mode + IF p_mode = 'development' THEN + -- Recompile in dev mode: full deletion + fresh compile + PERFORM pgflow.delete_flow_and_data(p_flow_slug); + PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape); + RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences)); + ELSE + -- Fail in production mode + RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences)); + END IF; +END; +$$; diff --git a/pkgs/core/src/database-types.ts b/pkgs/core/src/database-types.ts index 41c461803..ea7af71c9 100644 --- a/pkgs/core/src/database-types.ts +++ b/pkgs/core/src/database-types.ts @@ -448,6 +448,10 @@ export type Database = { Args: { p_flow_slug: string } Returns: undefined } + ensure_flow_compiled: { + Args: { p_flow_slug: string; p_mode?: string; p_shape: Json } + Returns: Json + } fail_task: { Args: { error_message: string diff --git a/pkgs/core/supabase/migrations/20251130012803_pgflow_temp_ensure_flow_compiled.sql b/pkgs/core/supabase/migrations/20251130012803_pgflow_temp_ensure_flow_compiled.sql new file mode 100644 index 000000000..1bfb6634d --- /dev/null +++ b/pkgs/core/supabase/migrations/20251130012803_pgflow_temp_ensure_flow_compiled.sql @@ -0,0 +1,40 @@ +-- Create "ensure_flow_compiled" function +CREATE FUNCTION "pgflow"."ensure_flow_compiled" ("p_flow_slug" text, "p_shape" jsonb, "p_mode" text DEFAULT 'production') RETURNS jsonb LANGUAGE plpgsql SET "search_path" = '' AS $$ +DECLARE + v_flow_exists boolean; + v_db_shape jsonb; + v_differences text[]; +BEGIN + -- 1. Check if flow exists + SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug) + INTO v_flow_exists; + + -- 2. If flow missing: compile (both modes) + IF NOT v_flow_exists THEN + PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape); + RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb); + END IF; + + -- 3. Get current shape from DB + v_db_shape := pgflow._get_flow_shape(p_flow_slug); + + -- 4. Compare shapes + v_differences := pgflow._compare_flow_shapes(p_shape, v_db_shape); + + -- 5. If shapes match: return verified + IF array_length(v_differences, 1) IS NULL THEN + RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb); + END IF; + + -- 6. Shapes differ - handle by mode + IF p_mode = 'development' THEN + -- Recompile in dev mode: full deletion + fresh compile + PERFORM pgflow.delete_flow_and_data(p_flow_slug); + PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape); + RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences)); + ELSE + -- Fail in production mode + RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences)); + END IF; +END; +$$; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 363cd58f0..802c5ed26 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:JnAOhrfq6ppZAuSX1Q14eD0gt5obztnyVM2MToozmvA= +h1:hsI4u+P4rwzRfYUEWgaWMGHQuhCCJcT12gVMHqHV830= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -13,3 +13,4 @@ h1:JnAOhrfq6ppZAuSX1Q14eD0gt5obztnyVM2MToozmvA= 20251104080523_pgflow_upgrade_pgmq_1_5_1.sql h1:Fw7zpMWnjhAHQ0qBJAprAvGl7dJMd8ExNHg8aKvkzTg= 20251130011221_pgflow_temp_shape_utilities.sql h1:KzcP/xJjwfQ7BTbxdgaBzkfPztQcoUwuAmnZTBVqoIE= 20251130012043_pgflow_temp_compilation_utilities.sql h1:Qn7RxYkbFd36hJYhOsuJdrcSlo8itqhmdAQLfmrP9+Y= +20251130012803_pgflow_temp_ensure_flow_compiled.sql h1:RvuDNy53B03P5mzs9JUoVYMA725V6aCVoPSp59Gh9ko= diff --git a/pkgs/core/supabase/tests/ensure_flow_compiled/compiles_missing_flow.test.sql b/pkgs/core/supabase/tests/ensure_flow_compiled/compiles_missing_flow.test.sql new file mode 100644 index 000000000..ac4f6cb91 --- /dev/null +++ b/pkgs/core/supabase/tests/ensure_flow_compiled/compiles_missing_flow.test.sql @@ -0,0 +1,36 @@ +begin; +select plan(3); +select pgflow_tests.reset_db(); + +-- Test: Missing flow should be compiled (default production mode) +select is( + ( + select result->>'status' + from pgflow.ensure_flow_compiled( + 'new_flow', + '{ + "steps": [ + {"slug": "first", "stepType": "single", "dependencies": []} + ] + }'::jsonb + ) as result + ), + 'compiled', + 'Should return compiled status for missing flow' +); + +-- Verify flow was actually created +select is( + (select count(*)::int from pgflow.flows where flow_slug = 'new_flow'), + 1, + 'Flow should be created in database' +); + +select is( + (select count(*)::int from pgflow.steps where flow_slug = 'new_flow'), + 1, + 'Step should be created in database' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/ensure_flow_compiled/fails_on_production_mismatch.test.sql b/pkgs/core/supabase/tests/ensure_flow_compiled/fails_on_production_mismatch.test.sql new file mode 100644 index 000000000..cc461010e --- /dev/null +++ b/pkgs/core/supabase/tests/ensure_flow_compiled/fails_on_production_mismatch.test.sql @@ -0,0 +1,52 @@ +begin; +select plan(3); +select pgflow_tests.reset_db(); + +-- Setup: Create flow with different shape +select pgflow.create_flow('prod_flow'); +select pgflow.add_step('prod_flow', 'old_step'); + +-- Test: Different shape in production mode should return mismatch +select is( + ( + select result->>'status' + from pgflow.ensure_flow_compiled( + 'prod_flow', + '{ + "steps": [ + {"slug": "new_step", "stepType": "single", "dependencies": []} + ] + }'::jsonb, + 'production' + ) as result + ), + 'mismatch', + 'Should return mismatch status in production mode' +); + +-- Verify differences are returned +select ok( + ( + select jsonb_array_length(result->'differences') > 0 + from pgflow.ensure_flow_compiled( + 'prod_flow', + '{ + "steps": [ + {"slug": "new_step", "stepType": "single", "dependencies": []} + ] + }'::jsonb, + 'production' + ) as result + ), + 'Should return differences for production mismatch' +); + +-- Verify database was NOT modified +select is( + (select step_slug from pgflow.steps where flow_slug = 'prod_flow'), + 'old_step', + 'Database should not be modified on production mismatch' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/ensure_flow_compiled/recompiles_in_development_mode.test.sql b/pkgs/core/supabase/tests/ensure_flow_compiled/recompiles_in_development_mode.test.sql new file mode 100644 index 000000000..edda9e999 --- /dev/null +++ b/pkgs/core/supabase/tests/ensure_flow_compiled/recompiles_in_development_mode.test.sql @@ -0,0 +1,42 @@ +begin; +select plan(3); +select pgflow_tests.reset_db(); + +-- Setup: Create flow with different shape +select pgflow.create_flow('dev_flow'); +select pgflow.add_step('dev_flow', 'old_step'); + +-- Test: Different shape in development mode should recompile +select is( + ( + select result->>'status' + from pgflow.ensure_flow_compiled( + 'dev_flow', + '{ + "steps": [ + {"slug": "new_step", "stepType": "single", "dependencies": []} + ] + }'::jsonb, + 'development' + ) as result + ), + 'recompiled', + 'Should return recompiled status in development mode' +); + +-- Verify old step is gone +select is( + (select count(*)::int from pgflow.steps where flow_slug = 'dev_flow' and step_slug = 'old_step'), + 0, + 'Old step should be deleted' +); + +-- Verify new step exists +select is( + (select count(*)::int from pgflow.steps where flow_slug = 'dev_flow' and step_slug = 'new_step'), + 1, + 'New step should be created' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/ensure_flow_compiled/verifies_matching_shape.test.sql b/pkgs/core/supabase/tests/ensure_flow_compiled/verifies_matching_shape.test.sql new file mode 100644 index 000000000..80ca90ab6 --- /dev/null +++ b/pkgs/core/supabase/tests/ensure_flow_compiled/verifies_matching_shape.test.sql @@ -0,0 +1,47 @@ +begin; +select plan(2); +select pgflow_tests.reset_db(); + +-- Setup: Create flow first +select pgflow.create_flow('existing_flow'); +select pgflow.add_step('existing_flow', 'first'); +select pgflow.add_step('existing_flow', 'second', array['first']); + +-- Test: Matching shape should return verified +select is( + ( + select result->>'status' + from pgflow.ensure_flow_compiled( + 'existing_flow', + '{ + "steps": [ + {"slug": "first", "stepType": "single", "dependencies": []}, + {"slug": "second", "stepType": "single", "dependencies": ["first"]} + ] + }'::jsonb + ) as result + ), + 'verified', + 'Should return verified status for matching shape' +); + +-- Verify differences array is empty +select is( + ( + select jsonb_array_length(result->'differences') + from pgflow.ensure_flow_compiled( + 'existing_flow', + '{ + "steps": [ + {"slug": "first", "stepType": "single", "dependencies": []}, + {"slug": "second", "stepType": "single", "dependencies": ["first"]} + ] + }'::jsonb + ) as result + ), + 0, + 'Differences should be empty for matching shape' +); + +select finish(); +rollback;