From fa287c6b92791664d3eaa2abab95def5d4a352ea Mon Sep 17 00:00:00 2001 From: Paul Stretenowich Date: Mon, 17 Nov 2025 16:36:30 -0500 Subject: [PATCH 1/3] Adding a unique constraint check when editing --- project_tracking/api/project.py | 26 ++++++++++----------- project_tracking/db_actions/modification.py | 24 ++++++++++++++++++- 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/project_tracking/api/project.py b/project_tracking/api/project.py index 94bb869..a5c5f69 100644 --- a/project_tracking/api/project.py +++ b/project_tracking/api/project.py @@ -341,10 +341,10 @@ def specimens(project_id: str, specimen_id: str=None, sample_id: str=None, reads The include_relationships query allows to include related entities in the output (true): return: specimens with related entities included - Ex: /project//specimens?json={"include_relationships": true} + Ex: /project//specimens?json={"include_relationships": true} (false): return: specimens without related entities (default behavior) - Ex: /project//specimens?json={"include_relationships": false} + Ex: /project//specimens?json={"include_relationships": false} """ # Don't serialize relationships by default aka when requesting all specimens include_relationships = False @@ -449,10 +449,10 @@ def samples(project_id: str, specimen_id: str=None, sample_id: str=None, readset The include_relationships query allows to include related entities in the output (true): return: samples with related entities included - Ex: /project//samples?json={"include_relationships": true} + Ex: /project//samples?json={"include_relationships": true} (false): return: samples without related entities (default behavior) - Ex: /project//samples?json={"include_relationships": false} + Ex: /project//samples?json={"include_relationships": false} """ # Don't serialize relationships by default aka when requesting all samples include_relationships = False @@ -558,10 +558,10 @@ def readsets(project_id: str, specimen_id: str=None, sample_id: str=None, readse The include_relationships query allows to include related entities in the output (true): return: readsets with related entities included - Ex: /project//readsets?json={"include_relationships": true} + Ex: /project//readsets?json={"include_relationships": true} (false): return: readsets without related entities (default behavior) - Ex: /project//readsets?json={"include_relationships": false} + Ex: /project//readsets?json={"include_relationships": false} """ # Don't serialize relationships by default aka when requesting all readsets include_relationships = False @@ -643,10 +643,10 @@ def operations(project_id: str, readset_id: str=None, operation_id: str=None, di The include_relationships query allows to include related entities in the output (true): return: operations with related entities included - Ex: /project//operations?json={"include_relationships": true} + Ex: /project//operations?json={"include_relationships": true} (false): return: operations without related entities (default behavior) - Ex: /project//operations?json={"include_relationships": false} + Ex: /project//operations?json={"include_relationships": false} """ # Don't serialize relationships by default aka when requesting all operations include_relationships = False @@ -719,10 +719,10 @@ def jobs(project_id: str, readset_id: str=None, job_id: str=None, digest_data=No The include_relationships query allows to include related entities in the output (true): return: jobs with related entities included - Ex: /project//jobs?json={"include_relationships": true} + Ex: /project//jobs?json={"include_relationships": true} (false): return: jobs without related entities (default behavior) - Ex: /project//jobs?json={"include_relationships": false} + Ex: /project//jobs?json={"include_relationships": false} """ # Don't serialize relationships by default aka when requesting all jobs include_relationships = False @@ -824,10 +824,10 @@ def files(project_id: str, specimen_id: str=None, sample_id: str=None, readset_i The include_relationships query allows to include related entities in the output (true): return: files with related entities included - Ex: /project//files?json={"include_relationships": true} + Ex: /project//files?json={"include_relationships": true} (false): return: files without related entities (default behavior) - Ex: /project//files?json={"include_relationships": false} + Ex: /project//files?json={"include_relationships": false} """ # Don't serialize relationships by default aka when requesting all files include_relationships = False @@ -951,7 +951,7 @@ def metrics(project_id: str, specimen_id: str=None, sample_id: str=None, readset The include_relationships query allows to include related entities in the output (true): return: metrics with related entities included - Ex: /project//metrics?json={"include_relationships": true} + Ex: /project//metrics?json={"include_relationships": true} (false): return: metrics without related entities (default behavior) """ diff --git a/project_tracking/db_actions/modification.py b/project_tracking/db_actions/modification.py index 4baacc1..c01ae12 100644 --- a/project_tracking/db_actions/modification.py +++ b/project_tracking/db_actions/modification.py @@ -4,6 +4,7 @@ # Standard library import logging import json +import re # Third-party from sqlalchemy import select @@ -12,13 +13,25 @@ from sqlalchemy.orm.exc import UnmappedClassError # Local modules -from .errors import DidNotFindError, RequestError +from .errors import DidNotFindError, RequestError, UniqueConstraintError from .. import vocabulary as vb from .. import database from .. import model logger = logging.getLogger(__name__) +def parse_unique_violation(error_msg: str): + """Extract table, column, and value from a psycopg2 UniqueViolation error message.""" + # Extract column and value + key_match = re.search(r'Key \(([^)]+)\)=\(([^)]+)\)', error_msg) + column, value = key_match.groups() if key_match else (None, None) + + # Extract table name from SQL + table_match = re.search(r'(?:INSERT INTO|UPDATE)\s+(\w+)', error_msg) + table = table_match.group(1) if table_match else None + + return table, column, value + # Condition Functions def is_deleted(obj): """ @@ -534,6 +547,15 @@ def edit(ingest_data, session, cascade_mode=None, dry_run=False): except SQLAlchemyError as error: logger.error("Error: %s", error) session.rollback() + error_str = str(error) + if "UniqueViolation" in error_str: + table, column, value = parse_unique_violation(error_str) + if table and column and value: + raise UniqueConstraintError( + message=f"'{table}' with '{column}' '{value}' already exists in the database and '{column}' has to be unique." + ) from error + raise UniqueConstraintError(message="Unique constraint violation occurred, but details could not be parsed.") from error + # If no warning if not ret["DB_ACTION_WARNING"]: From 57aad60badcfa3af6649a8ce26c5376a8eeda7be Mon Sep 17 00:00:00 2001 From: Paul Stretenowich Date: Mon, 17 Nov 2025 16:37:12 -0500 Subject: [PATCH 2/3] Consolidate file dedup to occur not only on files per readsets but overall --- utils/deduplicate_files.sql | 158 ++++++++++++++++++++---------------- 1 file changed, 88 insertions(+), 70 deletions(-) diff --git a/utils/deduplicate_files.sql b/utils/deduplicate_files.sql index a26db14..60f2e1b 100644 --- a/utils/deduplicate_files.sql +++ b/utils/deduplicate_files.sql @@ -11,26 +11,25 @@ DECLARE deleted_files BIGINT := 0; deleted_jf BIGINT := 0; BEGIN - RAISE NOTICE '--- Starting file deduplication per readset (same name + md5sum) ---'; + RAISE NOTICE '--- Starting GLOBAL file deduplication (same name + md5sum) ---'; ---------------------------------------------------------------------- - -- Step 1: Identify duplicate files per readset (same name + md5sum) + -- Step 1: Identify duplicate files globally (same name + md5sum) ---------------------------------------------------------------------- - CREATE TEMP TABLE file_dupes_per_readset ON COMMIT DROP AS - SELECT - rf.readset_id, - f.name, - f.md5sum, - -- Prefer deliverable = true, then lowest id - (ARRAY_AGG(f.id ORDER BY f.deliverable DESC, f.id ASC))[1] AS keep_file_id, - ARRAY_AGG(f.id ORDER BY f.id) AS all_file_ids - FROM file f - JOIN readset_file rf ON rf.file_id = f.id - GROUP BY rf.readset_id, f.name, f.md5sum - HAVING COUNT(*) > 1; - - SELECT COUNT(*) INTO dup_group_count FROM file_dupes_per_readset; - RAISE NOTICE 'Duplicate file groups found: %', dup_group_count; + CREATE TEMP TABLE file_dupes_global ON COMMIT DROP AS + SELECT + f.name, + f.md5sum, + -- Prefer deliverable = true, then lowest id + (ARRAY_AGG(f.id ORDER BY f.deliverable DESC, f.id ASC))[1] AS keep_file_id, + ARRAY_AGG(f.id ORDER BY f.id) AS all_file_ids, + COUNT(*) AS cnt + FROM file f + GROUP BY f.name, f.md5sum + HAVING COUNT(*) > 1; + + SELECT COUNT(*) INTO dup_group_count FROM file_dupes_global; + RAISE NOTICE 'Duplicate file groups found (global): %', dup_group_count; IF dup_group_count = 0 THEN RAISE NOTICE 'No duplicate groups found; nothing to do.'; @@ -39,15 +38,15 @@ BEGIN ---------------------------------------------------------------------- -- Step 2: Delete conflicting readset_file links + -- If a readset already points to the keep_file_id, drop the others. ---------------------------------------------------------------------- DELETE FROM readset_file rf - USING file_dupes_per_readset fd - WHERE rf.readset_id = fd.readset_id - AND rf.file_id = ANY(fd.all_file_ids) + USING file_dupes_global fd + WHERE rf.file_id = ANY(fd.all_file_ids) AND rf.file_id <> fd.keep_file_id + -- If the readset already has the keep_file_id, then this row is redundant AND EXISTS ( - SELECT 1 - FROM readset_file rf2 + SELECT 1 FROM readset_file rf2 WHERE rf2.readset_id = rf.readset_id AND rf2.file_id = fd.keep_file_id ); @@ -57,33 +56,51 @@ BEGIN END IF; ---------------------------------------------------------------------- - -- Step 3: Remap remaining readset_file rows to keep_file_id - ---------------------------------------------------------------------- - UPDATE readset_file rf - SET file_id = fd.keep_file_id - FROM file_dupes_per_readset fd - WHERE rf.readset_id = fd.readset_id - AND rf.file_id = ANY(fd.all_file_ids) - AND rf.file_id <> fd.keep_file_id; - GET DIAGNOSTICS remapped_rf = ROW_COUNT; - IF remapped_rf > 0 THEN - RAISE NOTICE 'Remapped readset_file rows: %', remapped_rf; - END IF; + -- Step 3: Remap remaining readset_file rows to keep_file_id (conflict-safe) + ---------------------------------------------------------------------- + RAISE NOTICE 'Remapping readset_file rows (conflict-safe)...'; + + LOOP + WITH one_update AS ( + SELECT DISTINCT ON (rf.readset_id, fd.keep_file_id) + rf.ctid AS target_ctid, + fd.keep_file_id + FROM readset_file rf + JOIN file_dupes_global fd + ON rf.file_id = ANY(fd.all_file_ids) + AND rf.file_id <> fd.keep_file_id + WHERE NOT EXISTS ( + SELECT 1 + FROM readset_file rf2 + WHERE rf2.readset_id = rf.readset_id + AND rf2.file_id = fd.keep_file_id + ) + ORDER BY rf.readset_id, fd.keep_file_id + LIMIT 50000 -- batch size; tune if needed + ) + UPDATE readset_file rf + SET file_id = ou.keep_file_id + FROM one_update ou + WHERE rf.ctid = ou.target_ctid; + + GET DIAGNOSTICS remapped_rf = ROW_COUNT; + EXIT WHEN remapped_rf = 0; + END LOOP; + + RAISE NOTICE 'Finished remapping readset_file rows (conflict-safe)'; ---------------------------------------------------------------------- - -- Step 4: Optimized safe remap for job_file + -- Step 4: Optimized safe remap for job_file (batched) ---------------------------------------------------------------------- - RAISE NOTICE 'Starting safe remap for job_file (optimized)...'; + RAISE NOTICE 'Starting safe remap for job_file (optimized, global)...'; - -- Build mapping of (job_id, old_file_id) -> keep_file_id CREATE TEMP TABLE job_file_remap ON COMMIT DROP AS SELECT DISTINCT jf.job_id, jf.file_id AS old_file_id, fd.keep_file_id FROM job_file jf - JOIN file_dupes_per_readset fd + JOIN file_dupes_global fd ON jf.file_id = ANY(fd.all_file_ids) AND jf.file_id <> fd.keep_file_id; - -- Indexes to speed joins (temporary) CREATE INDEX idx_jfr_old_file_id ON job_file_remap (old_file_id); CREATE INDEX idx_jfr_job_id_keep ON job_file_remap (job_id, keep_file_id); CREATE INDEX IF NOT EXISTS idx_jf_job_file_id ON job_file (job_id, file_id); @@ -130,14 +147,14 @@ BEGIN EXIT WHEN remapped_jf = 0; END LOOP; - RAISE NOTICE 'Finished job_file remap (optimized)'; + RAISE NOTICE 'Finished job_file remap (optimized, global)'; ---------------------------------------------------------------------- -- Step 5: Remap location ---------------------------------------------------------------------- UPDATE location l SET file_id = fd.keep_file_id - FROM file_dupes_per_readset fd + FROM file_dupes_global fd WHERE l.file_id = ANY(fd.all_file_ids) AND l.file_id <> fd.keep_file_id; GET DIAGNOSTICS remapped_loc = ROW_COUNT; @@ -146,16 +163,17 @@ BEGIN END IF; ---------------------------------------------------------------------- - -- Step 5.5: Final sanity remap for job_file (conflict-safe) + -- Step 6: Final sanity remap for job_file (conflict-safe) + -- (catch anything remaining after batches) ---------------------------------------------------------------------- - RAISE NOTICE 'Performing final sanity remap for job_file (conflict-safe)...'; + RAISE NOTICE 'Performing final sanity remap for job_file (conflict-safe, global)...'; -- Delete job_file rows that would cause a duplicate during final remap DELETE FROM job_file jf USING ( SELECT jf1.job_id, fd.keep_file_id FROM job_file jf1 - JOIN file_dupes_per_readset fd + JOIN file_dupes_global fd ON jf1.file_id = ANY(fd.all_file_ids) AND jf1.file_id <> fd.keep_file_id JOIN job_file jf2 @@ -165,18 +183,18 @@ BEGIN ) dup WHERE jf.job_id = dup.job_id AND jf.file_id IN ( - SELECT unnest(all_file_ids[2:]) FROM file_dupes_per_readset + SELECT unnest(all_file_ids[2:]) FROM file_dupes_global ); GET DIAGNOSTICS deleted_jf = ROW_COUNT; IF deleted_jf > 0 THEN RAISE NOTICE 'Deleted % conflicting job_file rows before final remap', deleted_jf; END IF; - -- Now perform the final remap (non-batched) for remaining rows + -- Final remap for remaining rows WITH to_fix AS ( SELECT jf.ctid AS target_ctid, fd.keep_file_id FROM job_file jf - JOIN file_dupes_per_readset fd + JOIN file_dupes_global fd ON jf.file_id = ANY(fd.all_file_ids) AND jf.file_id <> fd.keep_file_id ) @@ -192,34 +210,34 @@ BEGIN END IF; ---------------------------------------------------------------------- - -- Step 6: Delete redundant file rows (safe) - ---------------------------------------------------------------------- - WITH dup_files AS ( - SELECT unnest(all_file_ids) AS file_id, keep_file_id - FROM file_dupes_per_readset - ) - DELETE FROM file f - WHERE f.id IN ( - SELECT df.file_id - FROM dup_files df - WHERE df.file_id <> df.keep_file_id - AND NOT EXISTS (SELECT 1 FROM readset_file rf WHERE rf.file_id = df.file_id) - AND NOT EXISTS (SELECT 1 FROM job_file jf WHERE jf.file_id = df.file_id) - ); - GET DIAGNOSTICS deleted_files = ROW_COUNT; - IF deleted_files > 0 THEN - RAISE NOTICE 'Deleted % duplicate file rows safely', deleted_files; - ELSE - RAISE NOTICE 'No duplicate file rows could be deleted (still referenced)'; - END IF; + -- Step 7: Delete redundant file rows (safe) + ---------------------------------------------------------------------- + WITH dup_files AS ( + SELECT unnest(all_file_ids) AS file_id, keep_file_id + FROM file_dupes_global + ) + DELETE FROM file f + WHERE f.id IN ( + SELECT df.file_id + FROM dup_files df + WHERE df.file_id <> df.keep_file_id + AND NOT EXISTS (SELECT 1 FROM readset_file rf WHERE rf.file_id = df.file_id) + AND NOT EXISTS (SELECT 1 FROM job_file jf WHERE jf.file_id = df.file_id) + ); + GET DIAGNOSTICS deleted_files = ROW_COUNT; + IF deleted_files > 0 THEN + RAISE NOTICE 'Deleted % duplicate file rows safely', deleted_files; + ELSE + RAISE NOTICE 'No duplicate file rows could be deleted (still referenced)'; + END IF; ---------------------------------------------------------------------- - -- Step 7: Summary + -- Step 8: Summary ---------------------------------------------------------------------- - RAISE NOTICE '--- File deduplication complete ---'; + RAISE NOTICE '--- GLOBAL File deduplication complete ---'; RAISE NOTICE 'Groups=% , readset_file deleted=% , readset_file remapped=% , job_file remapped (last batch)=% , location remapped=% , files deleted=%', dup_group_count, deleted_rf_conflicts, remapped_rf, remapped_jf, remapped_loc, deleted_files; END $$; -COMMIT; \ No newline at end of file +COMMIT; From d37b529a74835bcdc281506035f921197803d2b0 Mon Sep 17 00:00:00 2001 From: Paul Stretenowich Date: Mon, 17 Nov 2025 16:38:00 -0500 Subject: [PATCH 3/3] Merge topups with samples with a different name, remap readsets and add name to alias --- utils/merge_topups_samples.sql | 105 +++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 utils/merge_topups_samples.sql diff --git a/utils/merge_topups_samples.sql b/utils/merge_topups_samples.sql new file mode 100644 index 0000000..5f3955f --- /dev/null +++ b/utils/merge_topups_samples.sql @@ -0,0 +1,105 @@ +BEGIN; + +DO $$ +DECLARE + merge_count int; +BEGIN + ---------------------------------------------------------------------- + -- Step 1: Build temporary table of all merge pairs (automatic + manual) + ---------------------------------------------------------------------- + CREATE TEMP TABLE tmp_merge_pairs AS + WITH manual_exceptions AS ( + SELECT + 'MoHQ-JG-1-40-015001193583V2-1RT' AS keep_name, + 'MoHQ-JG-1-40-015001193631V4-1RT' AS remove_name + UNION ALL SELECT + 'MoHQ-JG-7-5-015000860845V2-1RT', + 'MoHQ-JG-7-5-015000727120V3-1RT' + UNION ALL SELECT + 'MoHQ-MU-34-55-TU-1RT', + 'MoHQ-MU-34-55-MET-1RT' + ), + parsed AS ( + SELECT + s.id, + s.name, + regexp_replace(s.name, '-[0-9]+(RT|DT|DN|FRT)$', '') AS base, + (regexp_match(s.name, '-([0-9]+)(RT|DT|DN|FRT)$'))[1]::int AS num, + (regexp_match(s.name, '-([0-9]+)(RT|DT|DN|FRT)$'))[2] AS type + FROM sample s + WHERE s.name ~ '-[0-9]+(RT|DT|DN|FRT)$' + ), + groups AS ( + SELECT DISTINCT ON (base, type) + base, + type, + id AS id_keep, + name AS name_keep, + num AS num_keep + FROM parsed + ORDER BY base, type, num + ), + auto_pairs AS ( + SELECT + g.id_keep, + g.name_keep, + p.id AS id_remove, + p.name AS name_remove + FROM groups g + JOIN parsed p + ON p.base = g.base + AND p.type = g.type + AND p.num > g.num_keep + ), + manual_pairs AS ( + SELECT + ks.id AS id_keep, + ks.name AS name_keep, + rs.id AS id_remove, + rs.name AS name_remove + FROM manual_exceptions me + JOIN sample ks ON ks.name = me.keep_name + JOIN sample rs ON rs.name = me.remove_name + ) + SELECT * FROM auto_pairs + UNION ALL + SELECT * FROM manual_pairs; + + RAISE NOTICE '% pairs identified for merging.', (SELECT COUNT(*) FROM tmp_merge_pairs); + + ---------------------------------------------------------------------- + -- Step 2: Append duplicate names to alias of keeper + ---------------------------------------------------------------------- + UPDATE sample s + SET alias = COALESCE(s.alias, '[]'::jsonb) || to_jsonb(p.name_remove) + FROM tmp_merge_pairs p + WHERE s.id = p.id_keep + AND NOT EXISTS ( + SELECT 1 + FROM jsonb_array_elements_text(COALESCE(s.alias, '[]'::jsonb)) x(val) + WHERE x.val = p.name_remove + ); + + ---------------------------------------------------------------------- + -- Step 3: Reassign readsets from removed samples to keeper + ---------------------------------------------------------------------- + UPDATE readset r + SET sample_id = p.id_keep + FROM tmp_merge_pairs p + WHERE r.sample_id = p.id_remove; + + ---------------------------------------------------------------------- + -- Step 4: Delete duplicate samples + ---------------------------------------------------------------------- + DELETE FROM sample s + USING tmp_merge_pairs p + WHERE s.id = p.id_remove; + + ---------------------------------------------------------------------- + -- Step 5: Drop temporary table + ---------------------------------------------------------------------- + DROP TABLE tmp_merge_pairs; + +END $$; + +COMMIT;