From 87403ab2f6b89ea4f8432fe67aa808daf93b2899 Mon Sep 17 00:00:00 2001 From: anandu-kv Date: Tue, 16 Sep 2025 14:33:48 +0530 Subject: [PATCH 1/4] chore: Update archival procedure --- .../V10_update_archival_procedure.sql | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 postgres-persistence/src/main/resources/db/migration_postgres/V10_update_archival_procedure.sql diff --git a/postgres-persistence/src/main/resources/db/migration_postgres/V10_update_archival_procedure.sql b/postgres-persistence/src/main/resources/db/migration_postgres/V10_update_archival_procedure.sql new file mode 100644 index 0000000000..cfad3c08f3 --- /dev/null +++ b/postgres-persistence/src/main/resources/db/migration_postgres/V10_update_archival_procedure.sql @@ -0,0 +1,93 @@ + +CREATE TABLE IF NOT EXISTS archival_logs ( + id SERIAL PRIMARY KEY, + log_time TIMESTAMP NOT NULL DEFAULT now(), + log_message TEXT NOT NULL, + archival_date DATE NOT NULL +); + +CREATE OR REPLACE PROCEDURE public.conductor_archive(IN archival_date date) + LANGUAGE plpgsql +AS $procedure$ +DECLARE + deleted_workflows INT := 0; + deleted_wf_def_links INT := 0; + deleted_wf_to_task INT := 0; + deleted_tasks INT := 0; + deleted_task_scheduled INT := 0; + total_deleted INT := 0; + log_message TEXT; +BEGIN + -- Step 1: Collect workflow IDs eligible for deletion + CREATE TEMP TABLE temp_workflows_to_delete ON COMMIT DROP AS + SELECT workflow_id + FROM workflow + WHERE created_on < archival_date + AND (json_data::jsonb ->> 'status') = 'COMPLETED'; + + ALTER TABLE temp_workflows_to_delete ADD PRIMARY KEY (workflow_id); + ANALYZE temp_workflows_to_delete; + + -- Step 2: Cascade deletes + + -- workflow_def_to_workflow + DELETE FROM workflow_def_to_workflow wdw + USING temp_workflows_to_delete tw + WHERE wdw.workflow_id = tw.workflow_id; + GET DIAGNOSTICS deleted_wf_def_links = ROW_COUNT; + + -- workflow_to_task + CREATE TEMP TABLE temp_tasks_to_delete ON COMMIT DROP AS + SELECT wt.task_id + FROM workflow_to_task wt + JOIN temp_workflows_to_delete tw ON wt.workflow_id = tw.workflow_id; + + ALTER TABLE temp_tasks_to_delete ADD PRIMARY KEY (task_id); + ANALYZE temp_tasks_to_delete; + + DELETE FROM workflow_to_task wt + USING temp_workflows_to_delete tw + WHERE wt.workflow_id = tw.workflow_id; + GET DIAGNOSTICS deleted_wf_to_task = ROW_COUNT; + + -- task_scheduled + DELETE FROM task_scheduled ts + USING temp_tasks_to_delete tt + WHERE ts.task_id = tt.task_id; + GET DIAGNOSTICS deleted_task_scheduled = ROW_COUNT; + + -- task + DELETE FROM task t + USING temp_tasks_to_delete tt + WHERE t.task_id = tt.task_id; + GET DIAGNOSTICS deleted_tasks = ROW_COUNT; + + -- workflow + DELETE FROM workflow w + USING temp_workflows_to_delete tw + WHERE w.workflow_id = tw.workflow_id; + GET DIAGNOSTICS deleted_workflows = ROW_COUNT; + + -- Step 3: Logging + total_deleted := deleted_workflows + deleted_wf_def_links + + deleted_wf_to_task + deleted_tasks + deleted_task_scheduled; + + log_message := 'Cleanup completed successfully for COMPLETED workflows before ' || archival_date || '. ' || + 'Total deleted: ' || total_deleted || ' | Breakdown: ' || + 'workflow: ' || deleted_workflows || ', ' || + 'workflow_def_to_workflow: ' || deleted_wf_def_links || ', ' || + 'workflow_to_task: ' || deleted_wf_to_task || ', ' || + 'task: ' || deleted_tasks || ', ' || + 'task_scheduled: ' || deleted_task_scheduled; + + INSERT INTO archival_logs(log_message, archival_date) + VALUES (log_message, archival_date); + +EXCEPTION + WHEN OTHERS THEN + INSERT INTO archival_logs(log_message, archival_date) + VALUES ('Error in cleanup_completed_workflows: ' || SQLERRM, archival_date); + RAISE; +END; +$procedure$ +; \ No newline at end of file From 145f8a38515c716e2c60447cd72b6b37eb4172db Mon Sep 17 00:00:00 2001 From: anandu-kv Date: Tue, 16 Sep 2025 14:46:13 +0530 Subject: [PATCH 2/4] refactor: Check failed, time out and terminated status for archival --- .../db/migration_postgres/V10_update_archival_procedure.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/postgres-persistence/src/main/resources/db/migration_postgres/V10_update_archival_procedure.sql b/postgres-persistence/src/main/resources/db/migration_postgres/V10_update_archival_procedure.sql index cfad3c08f3..87ec3e956c 100644 --- a/postgres-persistence/src/main/resources/db/migration_postgres/V10_update_archival_procedure.sql +++ b/postgres-persistence/src/main/resources/db/migration_postgres/V10_update_archival_procedure.sql @@ -23,7 +23,7 @@ BEGIN SELECT workflow_id FROM workflow WHERE created_on < archival_date - AND (json_data::jsonb ->> 'status') = 'COMPLETED'; + AND (json_data::jsonb ->> 'status') IN ('COMPLETED', 'FAILED', 'TIMED_OUT', 'TERMINATED'); ALTER TABLE temp_workflows_to_delete ADD PRIMARY KEY (workflow_id); ANALYZE temp_workflows_to_delete; @@ -72,7 +72,7 @@ BEGIN total_deleted := deleted_workflows + deleted_wf_def_links + deleted_wf_to_task + deleted_tasks + deleted_task_scheduled; - log_message := 'Cleanup completed successfully for COMPLETED workflows before ' || archival_date || '. ' || + log_message := 'Cleanup completed successfully for COMPLETED, FAILED, TIMED_OUT, and TERMINATED workflows before ' || archival_date || '. ' || 'Total deleted: ' || total_deleted || ' | Breakdown: ' || 'workflow: ' || deleted_workflows || ', ' || 'workflow_def_to_workflow: ' || deleted_wf_def_links || ', ' || From 4813f16e0b1460807670309c8b90330a48a44e38 Mon Sep 17 00:00:00 2001 From: anandu-kv Date: Tue, 16 Sep 2025 14:47:08 +0530 Subject: [PATCH 3/4] refactor: New line --- .../db/migration_postgres/V10_update_archival_procedure.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/postgres-persistence/src/main/resources/db/migration_postgres/V10_update_archival_procedure.sql b/postgres-persistence/src/main/resources/db/migration_postgres/V10_update_archival_procedure.sql index 87ec3e956c..47931ba714 100644 --- a/postgres-persistence/src/main/resources/db/migration_postgres/V10_update_archival_procedure.sql +++ b/postgres-persistence/src/main/resources/db/migration_postgres/V10_update_archival_procedure.sql @@ -90,4 +90,4 @@ EXCEPTION RAISE; END; $procedure$ -; \ No newline at end of file +; From 198efe95fba7282d47444bbe8768a141f28eee03 Mon Sep 17 00:00:00 2001 From: anandu-kv Date: Tue, 16 Sep 2025 18:12:59 +0530 Subject: [PATCH 4/4] refactor: Add workflow index and task index to archival --- .../V10_update_archival_procedure.sql | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/postgres-persistence/src/main/resources/db/migration_postgres/V10_update_archival_procedure.sql b/postgres-persistence/src/main/resources/db/migration_postgres/V10_update_archival_procedure.sql index 47931ba714..fdb67f93ad 100644 --- a/postgres-persistence/src/main/resources/db/migration_postgres/V10_update_archival_procedure.sql +++ b/postgres-persistence/src/main/resources/db/migration_postgres/V10_update_archival_procedure.sql @@ -15,6 +15,8 @@ DECLARE deleted_wf_to_task INT := 0; deleted_tasks INT := 0; deleted_task_scheduled INT := 0; + deleted_workflow_index INT := 0; + deleted_task_index INT := 0; total_deleted INT := 0; log_message TEXT; BEGIN @@ -36,6 +38,12 @@ BEGIN WHERE wdw.workflow_id = tw.workflow_id; GET DIAGNOSTICS deleted_wf_def_links = ROW_COUNT; + -- workflow_index + DELETE FROM workflow_index wi + USING temp_workflows_to_delete tw + WHERE wi.workflow_id = tw.workflow_id; + GET DIAGNOSTICS deleted_workflow_index = ROW_COUNT; + -- workflow_to_task CREATE TEMP TABLE temp_tasks_to_delete ON COMMIT DROP AS SELECT wt.task_id @@ -56,6 +64,12 @@ BEGIN WHERE ts.task_id = tt.task_id; GET DIAGNOSTICS deleted_task_scheduled = ROW_COUNT; + -- task_index + DELETE FROM task_index ti + USING temp_tasks_to_delete tt + WHERE ti.task_id = tt.task_id; + GET DIAGNOSTICS deleted_task_index = ROW_COUNT; + -- task DELETE FROM task t USING temp_tasks_to_delete tt @@ -69,16 +83,18 @@ BEGIN GET DIAGNOSTICS deleted_workflows = ROW_COUNT; -- Step 3: Logging - total_deleted := deleted_workflows + deleted_wf_def_links + - deleted_wf_to_task + deleted_tasks + deleted_task_scheduled; + total_deleted := deleted_workflows + deleted_wf_def_links + deleted_workflow_index + + deleted_wf_to_task + deleted_tasks + deleted_task_scheduled + deleted_task_index; log_message := 'Cleanup completed successfully for COMPLETED, FAILED, TIMED_OUT, and TERMINATED workflows before ' || archival_date || '. ' || 'Total deleted: ' || total_deleted || ' | Breakdown: ' || 'workflow: ' || deleted_workflows || ', ' || 'workflow_def_to_workflow: ' || deleted_wf_def_links || ', ' || + 'workflow_index: ' || deleted_workflow_index || ', ' || 'workflow_to_task: ' || deleted_wf_to_task || ', ' || 'task: ' || deleted_tasks || ', ' || - 'task_scheduled: ' || deleted_task_scheduled; + 'task_scheduled: ' || deleted_task_scheduled || ', ' || + 'task_index: ' || deleted_task_index; INSERT INTO archival_logs(log_message, archival_date) VALUES (log_message, archival_date);