Skip to content
Merged

Dev #77

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions project_tracking/api/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,10 @@ def specimens(project_id: str, specimen_id: str=None, sample_id: str=None, reads
The include_relationships query allows to include related entities in the output
(true):
return: specimens with related entities included
Ex: /project/<project>/specimens?json={"include_relationships": true}
Ex: /project/<project>/specimens?json={"include_relationships": true}
(false):
return: specimens without related entities (default behavior)
Ex: /project/<project>/specimens?json={"include_relationships": false}
Ex: /project/<project>/specimens?json={"include_relationships": false}
"""
# Don't serialize relationships by default aka when requesting all specimens
include_relationships = False
Expand Down Expand Up @@ -449,10 +449,10 @@ def samples(project_id: str, specimen_id: str=None, sample_id: str=None, readset
The include_relationships query allows to include related entities in the output
(true):
return: samples with related entities included
Ex: /project/<project>/samples?json={"include_relationships": true}
Ex: /project/<project>/samples?json={"include_relationships": true}
(false):
return: samples without related entities (default behavior)
Ex: /project/<project>/samples?json={"include_relationships": false}
Ex: /project/<project>/samples?json={"include_relationships": false}
"""
# Don't serialize relationships by default aka when requesting all samples
include_relationships = False
Expand Down Expand Up @@ -558,10 +558,10 @@ def readsets(project_id: str, specimen_id: str=None, sample_id: str=None, readse
The include_relationships query allows to include related entities in the output
(true):
return: readsets with related entities included
Ex: /project/<project>/readsets?json={"include_relationships": true}
Ex: /project/<project>/readsets?json={"include_relationships": true}
(false):
return: readsets without related entities (default behavior)
Ex: /project/<project>/readsets?json={"include_relationships": false}
Ex: /project/<project>/readsets?json={"include_relationships": false}
"""
# Don't serialize relationships by default aka when requesting all readsets
include_relationships = False
Expand Down Expand Up @@ -643,10 +643,10 @@ def operations(project_id: str, readset_id: str=None, operation_id: str=None, di
The include_relationships query allows to include related entities in the output
(true):
return: operations with related entities included
Ex: /project/<project>/operations?json={"include_relationships": true}
Ex: /project/<project>/operations?json={"include_relationships": true}
(false):
return: operations without related entities (default behavior)
Ex: /project/<project>/operations?json={"include_relationships": false}
Ex: /project/<project>/operations?json={"include_relationships": false}
"""
# Don't serialize relationships by default aka when requesting all operations
include_relationships = False
Expand Down Expand Up @@ -719,10 +719,10 @@ def jobs(project_id: str, readset_id: str=None, job_id: str=None, digest_data=No
The include_relationships query allows to include related entities in the output
(true):
return: jobs with related entities included
Ex: /project/<project>/jobs?json={"include_relationships": true}
Ex: /project/<project>/jobs?json={"include_relationships": true}
(false):
return: jobs without related entities (default behavior)
Ex: /project/<project>/jobs?json={"include_relationships": false}
Ex: /project/<project>/jobs?json={"include_relationships": false}
"""
# Don't serialize relationships by default aka when requesting all jobs
include_relationships = False
Expand Down Expand Up @@ -824,10 +824,10 @@ def files(project_id: str, specimen_id: str=None, sample_id: str=None, readset_i
The include_relationships query allows to include related entities in the output
(true):
return: files with related entities included
Ex: /project/<project>/files?json={"include_relationships": true}
Ex: /project/<project>/files?json={"include_relationships": true}
(false):
return: files without related entities (default behavior)
Ex: /project/<project>/files?json={"include_relationships": false}
Ex: /project/<project>/files?json={"include_relationships": false}
"""
# Don't serialize relationships by default aka when requesting all files
include_relationships = False
Expand Down Expand Up @@ -951,7 +951,7 @@ def metrics(project_id: str, specimen_id: str=None, sample_id: str=None, readset
The include_relationships query allows to include related entities in the output
(true):
return: metrics with related entities included
Ex: /project/<project>/metrics?json={"include_relationships": true}
Ex: /project/<project>/metrics?json={"include_relationships": true}
(false):
return: metrics without related entities (default behavior)
"""
Expand Down
24 changes: 23 additions & 1 deletion project_tracking/db_actions/modification.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Standard library
import logging
import json
import re

# Third-party
from sqlalchemy import select
Expand All @@ -12,13 +13,25 @@
from sqlalchemy.orm.exc import UnmappedClassError

# Local modules
from .errors import DidNotFindError, RequestError
from .errors import DidNotFindError, RequestError, UniqueConstraintError
from .. import vocabulary as vb
from .. import database
from .. import model

logger = logging.getLogger(__name__)

def parse_unique_violation(error_msg: str):
"""Extract table, column, and value from a psycopg2 UniqueViolation error message."""
# Extract column and value
key_match = re.search(r'Key \(([^)]+)\)=\(([^)]+)\)', error_msg)
column, value = key_match.groups() if key_match else (None, None)

# Extract table name from SQL
table_match = re.search(r'(?:INSERT INTO|UPDATE)\s+(\w+)', error_msg)
table = table_match.group(1) if table_match else None

return table, column, value

# Condition Functions
def is_deleted(obj):
"""
Expand Down Expand Up @@ -534,6 +547,15 @@ def edit(ingest_data, session, cascade_mode=None, dry_run=False):
except SQLAlchemyError as error:
logger.error("Error: %s", error)
session.rollback()
error_str = str(error)
if "UniqueViolation" in error_str:
table, column, value = parse_unique_violation(error_str)
if table and column and value:
raise UniqueConstraintError(
message=f"'{table}' with '{column}' '{value}' already exists in the database and '{column}' has to be unique."
) from error
raise UniqueConstraintError(message="Unique constraint violation occurred, but details could not be parsed.") from error


# If no warning
if not ret["DB_ACTION_WARNING"]:
Expand Down
158 changes: 88 additions & 70 deletions utils/deduplicate_files.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,25 @@ DECLARE
deleted_files BIGINT := 0;
deleted_jf BIGINT := 0;
BEGIN
RAISE NOTICE '--- Starting file deduplication per readset (same name + md5sum) ---';
RAISE NOTICE '--- Starting GLOBAL file deduplication (same name + md5sum) ---';

----------------------------------------------------------------------
-- Step 1: Identify duplicate files per readset (same name + md5sum)
-- Step 1: Identify duplicate files globally (same name + md5sum)
----------------------------------------------------------------------
CREATE TEMP TABLE file_dupes_per_readset ON COMMIT DROP AS
SELECT
rf.readset_id,
f.name,
f.md5sum,
-- Prefer deliverable = true, then lowest id
(ARRAY_AGG(f.id ORDER BY f.deliverable DESC, f.id ASC))[1] AS keep_file_id,
ARRAY_AGG(f.id ORDER BY f.id) AS all_file_ids
FROM file f
JOIN readset_file rf ON rf.file_id = f.id
GROUP BY rf.readset_id, f.name, f.md5sum
HAVING COUNT(*) > 1;

SELECT COUNT(*) INTO dup_group_count FROM file_dupes_per_readset;
RAISE NOTICE 'Duplicate file groups found: %', dup_group_count;
CREATE TEMP TABLE file_dupes_global ON COMMIT DROP AS
SELECT
f.name,
f.md5sum,
-- Prefer deliverable = true, then lowest id
(ARRAY_AGG(f.id ORDER BY f.deliverable DESC, f.id ASC))[1] AS keep_file_id,
ARRAY_AGG(f.id ORDER BY f.id) AS all_file_ids,
COUNT(*) AS cnt
FROM file f
GROUP BY f.name, f.md5sum
HAVING COUNT(*) > 1;

SELECT COUNT(*) INTO dup_group_count FROM file_dupes_global;
RAISE NOTICE 'Duplicate file groups found (global): %', dup_group_count;

IF dup_group_count = 0 THEN
RAISE NOTICE 'No duplicate groups found; nothing to do.';
Expand All @@ -39,15 +38,15 @@ BEGIN

----------------------------------------------------------------------
-- Step 2: Delete conflicting readset_file links
-- If a readset already points to the keep_file_id, drop the others.
----------------------------------------------------------------------
DELETE FROM readset_file rf
USING file_dupes_per_readset fd
WHERE rf.readset_id = fd.readset_id
AND rf.file_id = ANY(fd.all_file_ids)
USING file_dupes_global fd
WHERE rf.file_id = ANY(fd.all_file_ids)
AND rf.file_id <> fd.keep_file_id
-- If the readset already has the keep_file_id, then this row is redundant
AND EXISTS (
SELECT 1
FROM readset_file rf2
SELECT 1 FROM readset_file rf2
WHERE rf2.readset_id = rf.readset_id
AND rf2.file_id = fd.keep_file_id
);
Expand All @@ -57,33 +56,51 @@ BEGIN
END IF;

----------------------------------------------------------------------
-- Step 3: Remap remaining readset_file rows to keep_file_id
----------------------------------------------------------------------
UPDATE readset_file rf
SET file_id = fd.keep_file_id
FROM file_dupes_per_readset fd
WHERE rf.readset_id = fd.readset_id
AND rf.file_id = ANY(fd.all_file_ids)
AND rf.file_id <> fd.keep_file_id;
GET DIAGNOSTICS remapped_rf = ROW_COUNT;
IF remapped_rf > 0 THEN
RAISE NOTICE 'Remapped readset_file rows: %', remapped_rf;
END IF;
-- Step 3: Remap remaining readset_file rows to keep_file_id (conflict-safe)
----------------------------------------------------------------------
RAISE NOTICE 'Remapping readset_file rows (conflict-safe)...';

LOOP
WITH one_update AS (
SELECT DISTINCT ON (rf.readset_id, fd.keep_file_id)
rf.ctid AS target_ctid,
fd.keep_file_id
FROM readset_file rf
JOIN file_dupes_global fd
ON rf.file_id = ANY(fd.all_file_ids)
AND rf.file_id <> fd.keep_file_id
WHERE NOT EXISTS (
SELECT 1
FROM readset_file rf2
WHERE rf2.readset_id = rf.readset_id
AND rf2.file_id = fd.keep_file_id
)
ORDER BY rf.readset_id, fd.keep_file_id
LIMIT 50000 -- batch size; tune if needed
)
UPDATE readset_file rf
SET file_id = ou.keep_file_id
FROM one_update ou
WHERE rf.ctid = ou.target_ctid;

GET DIAGNOSTICS remapped_rf = ROW_COUNT;
EXIT WHEN remapped_rf = 0;
END LOOP;

RAISE NOTICE 'Finished remapping readset_file rows (conflict-safe)';

----------------------------------------------------------------------
-- Step 4: Optimized safe remap for job_file
-- Step 4: Optimized safe remap for job_file (batched)
----------------------------------------------------------------------
RAISE NOTICE 'Starting safe remap for job_file (optimized)...';
RAISE NOTICE 'Starting safe remap for job_file (optimized, global)...';

-- Build mapping of (job_id, old_file_id) -> keep_file_id
CREATE TEMP TABLE job_file_remap ON COMMIT DROP AS
SELECT DISTINCT jf.job_id, jf.file_id AS old_file_id, fd.keep_file_id
FROM job_file jf
JOIN file_dupes_per_readset fd
JOIN file_dupes_global fd
ON jf.file_id = ANY(fd.all_file_ids)
AND jf.file_id <> fd.keep_file_id;

-- Indexes to speed joins (temporary)
CREATE INDEX idx_jfr_old_file_id ON job_file_remap (old_file_id);
CREATE INDEX idx_jfr_job_id_keep ON job_file_remap (job_id, keep_file_id);
CREATE INDEX IF NOT EXISTS idx_jf_job_file_id ON job_file (job_id, file_id);
Expand Down Expand Up @@ -130,14 +147,14 @@ BEGIN
EXIT WHEN remapped_jf = 0;
END LOOP;

RAISE NOTICE 'Finished job_file remap (optimized)';
RAISE NOTICE 'Finished job_file remap (optimized, global)';

----------------------------------------------------------------------
-- Step 5: Remap location
----------------------------------------------------------------------
UPDATE location l
SET file_id = fd.keep_file_id
FROM file_dupes_per_readset fd
FROM file_dupes_global fd
WHERE l.file_id = ANY(fd.all_file_ids)
AND l.file_id <> fd.keep_file_id;
GET DIAGNOSTICS remapped_loc = ROW_COUNT;
Expand All @@ -146,16 +163,17 @@ BEGIN
END IF;

----------------------------------------------------------------------
-- Step 5.5: Final sanity remap for job_file (conflict-safe)
-- Step 6: Final sanity remap for job_file (conflict-safe)
-- (catch anything remaining after batches)
----------------------------------------------------------------------
RAISE NOTICE 'Performing final sanity remap for job_file (conflict-safe)...';
RAISE NOTICE 'Performing final sanity remap for job_file (conflict-safe, global)...';

-- Delete job_file rows that would cause a duplicate during final remap
DELETE FROM job_file jf
USING (
SELECT jf1.job_id, fd.keep_file_id
FROM job_file jf1
JOIN file_dupes_per_readset fd
JOIN file_dupes_global fd
ON jf1.file_id = ANY(fd.all_file_ids)
AND jf1.file_id <> fd.keep_file_id
JOIN job_file jf2
Expand All @@ -165,18 +183,18 @@ BEGIN
) dup
WHERE jf.job_id = dup.job_id
AND jf.file_id IN (
SELECT unnest(all_file_ids[2:]) FROM file_dupes_per_readset
SELECT unnest(all_file_ids[2:]) FROM file_dupes_global
);
GET DIAGNOSTICS deleted_jf = ROW_COUNT;
IF deleted_jf > 0 THEN
RAISE NOTICE 'Deleted % conflicting job_file rows before final remap', deleted_jf;
END IF;

-- Now perform the final remap (non-batched) for remaining rows
-- Final remap for remaining rows
WITH to_fix AS (
SELECT jf.ctid AS target_ctid, fd.keep_file_id
FROM job_file jf
JOIN file_dupes_per_readset fd
JOIN file_dupes_global fd
ON jf.file_id = ANY(fd.all_file_ids)
AND jf.file_id <> fd.keep_file_id
)
Expand All @@ -192,34 +210,34 @@ BEGIN
END IF;

----------------------------------------------------------------------
-- Step 6: Delete redundant file rows (safe)
----------------------------------------------------------------------
WITH dup_files AS (
SELECT unnest(all_file_ids) AS file_id, keep_file_id
FROM file_dupes_per_readset
)
DELETE FROM file f
WHERE f.id IN (
SELECT df.file_id
FROM dup_files df
WHERE df.file_id <> df.keep_file_id
AND NOT EXISTS (SELECT 1 FROM readset_file rf WHERE rf.file_id = df.file_id)
AND NOT EXISTS (SELECT 1 FROM job_file jf WHERE jf.file_id = df.file_id)
);
GET DIAGNOSTICS deleted_files = ROW_COUNT;
IF deleted_files > 0 THEN
RAISE NOTICE 'Deleted % duplicate file rows safely', deleted_files;
ELSE
RAISE NOTICE 'No duplicate file rows could be deleted (still referenced)';
END IF;
-- Step 7: Delete redundant file rows (safe)
----------------------------------------------------------------------
WITH dup_files AS (
SELECT unnest(all_file_ids) AS file_id, keep_file_id
FROM file_dupes_global
)
DELETE FROM file f
WHERE f.id IN (
SELECT df.file_id
FROM dup_files df
WHERE df.file_id <> df.keep_file_id
AND NOT EXISTS (SELECT 1 FROM readset_file rf WHERE rf.file_id = df.file_id)
AND NOT EXISTS (SELECT 1 FROM job_file jf WHERE jf.file_id = df.file_id)
);
GET DIAGNOSTICS deleted_files = ROW_COUNT;
IF deleted_files > 0 THEN
RAISE NOTICE 'Deleted % duplicate file rows safely', deleted_files;
ELSE
RAISE NOTICE 'No duplicate file rows could be deleted (still referenced)';
END IF;

----------------------------------------------------------------------
-- Step 7: Summary
-- Step 8: Summary
----------------------------------------------------------------------
RAISE NOTICE '--- File deduplication complete ---';
RAISE NOTICE '--- GLOBAL File deduplication complete ---';
RAISE NOTICE 'Groups=% , readset_file deleted=% , readset_file remapped=% , job_file remapped (last batch)=% , location remapped=% , files deleted=%',
dup_group_count, deleted_rf_conflicts, remapped_rf, remapped_jf, remapped_loc, deleted_files;

END $$;

COMMIT;
COMMIT;
Loading