From 3232528e1a38df02daf89435c0dc0be0ebcdefee Mon Sep 17 00:00:00 2001 From: Alan Cleary Date: Thu, 12 Jun 2025 12:16:45 -0600 Subject: [PATCH 01/17] Added Status enum This replaces hard-coded strings used when reading/writing the manifest array. --- src/tiledb/cloud/vcf/ingestion.py | 40 ++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/src/tiledb/cloud/vcf/ingestion.py b/src/tiledb/cloud/vcf/ingestion.py index 3f54872ac..c14192299 100644 --- a/src/tiledb/cloud/vcf/ingestion.py +++ b/src/tiledb/cloud/vcf/ingestion.py @@ -75,6 +75,28 @@ class Contigs(enum.Enum): ALL_DISABLE_MERGE = enum.auto() + +class Status(str, enum.Enum): + """ + The ingestion status of samples in the manifest. + + OK = the VCF file can be ingested + MISSING_INDEX = the VCF file does not have a corresponding index file + MISSING_SAMPLE_NAME = the VCF file does not have a sample name + MULTIPLE_SAMPLES = the VCF file has multiple sample names + DUPLICATE_SAMPLE_NAME = one or more VCF files being ingested have duplicate sample names + BAD_INDEX = the VCF index file could not be properly read + """ + + OK = "ok" + MISSING_INDEX = "missing index" + MISSING_SAMPLE_NAME = "missing sample name" + MULTIPLE_SAMPLES = "multiple samples" + DUPLICATE_SAMPLE_NAME = "duplicate sample name" + BAD_INDEX = "bad index" + + + def get_logger_wrapper( verbose: bool = False, ) -> logging.Logger: @@ -619,7 +641,7 @@ def filter_samples_udf( with tiledb.open(manifest_uri) as A: manifest_df = A.query( - cond="status == 'ok' or status == 'missing index'" + cond=f"status == '{Status.OK}' or status == '{Status.MISSING_INDEX}'" ).df[:] # Sort manifest by sample_name @@ -677,7 +699,7 @@ def file_size(uri: str) -> int: values = defaultdict(list) for vcf_uri in sample_uris: - status = "ok" + status = Status.OK # Check for sample name issues try: @@ -690,13 +712,13 @@ def file_size(uri: str) -> int: continue if not sample_name: - status = "missing sample name" + status = Status.MISSING_SAMPLE_NAME elif len(sample_name.split()) > 1: - status = "multiple samples" + status = Status.MULTIPLE_SAMPLES elif sample_name in keys: # TODO: check for duplicate sample names across all # ingest_manifest_udf calls - status = "duplicate sample name" + status = Status.DUPLICATE_SAMPLE_NAME # Generate a unique sample name for the manifest sample_name_base = sample_name i = 0 @@ -707,14 +729,14 @@ def file_size(uri: str) -> int: # Check for index issues index_uri = find_index(vcf_uri) if not index_uri: - status = "" if status == "ok" else status + "," - status += "missing index" + status = "" if status == Status.OK else status + "," + status += Status.MISSING_INDEX records = 0 else: records = get_record_count(vcf_uri, index_uri) if records is None: - status = "" if status == "ok" else status + "," - status += "bad index" + status = "" if status == Status.OK else status + "," + status += Status.BAD_INDEX keys.append(sample_name) values["status"].append(status) From 7cdd7943221722aaf88bd052072dd207c128deb5 Mon Sep 17 00:00:00 2001 From: Alan Cleary Date: Fri, 13 Jun 2025 17:46:57 -0600 Subject: [PATCH 02/17] Updated how VCF ingestion uses the manifest array Ingest now distinguishes between smaples that are ready to load and samples that have been loaded using a new "ready" status. The status of a sample is changed from "ready" / "missing index" to "ok" upon successful ingestion, allowing failed ingestions to be resumed. --- src/tiledb/cloud/vcf/ingestion.py | 76 ++++++++++++++++++++++++------- 1 file changed, 59 insertions(+), 17 deletions(-) diff --git a/src/tiledb/cloud/vcf/ingestion.py b/src/tiledb/cloud/vcf/ingestion.py index c14192299..0e1be429a 100644 --- a/src/tiledb/cloud/vcf/ingestion.py +++ b/src/tiledb/cloud/vcf/ingestion.py @@ -80,7 +80,8 @@ class Status(str, enum.Enum): """ The ingestion status of samples in the manifest. - OK = the VCF file can be ingested + READY = the VCF file can be ingested + OK = the VCF file was successfully ingested MISSING_INDEX = the VCF file does not have a corresponding index file MISSING_SAMPLE_NAME = the VCF file does not have a sample name MULTIPLE_SAMPLES = the VCF file has multiple sample names @@ -88,6 +89,7 @@ class Status(str, enum.Enum): BAD_INDEX = the VCF index file could not be properly read """ + READY = "ready" OK = "ok" MISSING_INDEX = "missing index" MISSING_SAMPLE_NAME = "missing sample name" @@ -611,6 +613,7 @@ def filter_samples_udf( dataset_uri: str, *, config: Optional[Mapping[str, Any]] = None, + resume: bool = True, verbose: bool = False, ) -> Sequence[str]: """ @@ -618,6 +621,7 @@ def filter_samples_udf( :param dataset_uri: dataset URI :param config: config dictionary, defaults to None + :param resume: enable resume ingestion mode, defaults to True :param verbose: verbose logging, defaults to False :return: sample URIs """ @@ -633,29 +637,38 @@ def filter_samples_udf( dataset_uri, cfg=tiledbvcf.ReadConfig(tiledb_config=config), ) - existing_samples = set(ds.samples()) + dataset_samples = set(ds.samples()) - # Read all samples in the manifest with status == "ok" + # Read all samples in the manifest and samples ready for ingestion group = tiledb.Group(dataset_uri) manifest_uri = group[MANIFEST_ARRAY].uri with tiledb.open(manifest_uri) as A: - manifest_df = A.query( - cond=f"status == '{Status.OK}' or status == '{Status.MISSING_INDEX}'" + manifest_df = A.df[:] + ingest_df = A.query( + cond=f"status == '{Status.READY}' or status == '{Status.MISSING_INDEX}'" ).df[:] + manifest_samples = set(manifest_df.sample_name) # Sort manifest by sample_name - manifest_df = manifest_df.sort_values(by=["sample_name"]) + ingest_df = ingest_df.sort_values(by=["sample_name"]) + ingest_samples = set(ingest_df.sample_name) - # Find samples that have not already been ingested - manifest_samples = set(manifest_df.sample_name) - new_samples = manifest_samples.difference(existing_samples) - manifest_df = manifest_df[manifest_df.sample_name.isin(new_samples)] - result = manifest_df.vcf_uri.to_list() + # Find samples that have already been ingested or failed to ingest + existing_samples = dataset_samples.difference(ingest_samples) + failed_samples = dataset_samples.intersection(ingest_samples) + + # Finalize samples to ingest + if not resume: + ingest_df = ingest_df[~ingest_df.sample_name.isin(failed_samples)] + result = ingest_df.vcf_uri.to_list() + logger.info("%d samples in the dataset.", len(dataset_samples)) + logger.info("%d samples fully ingested.", len(existing_samples)) + logger.info("%d samples partially ingested.", len(failed_samples)) logger.info("%d samples in the manifest.", len(manifest_samples)) - logger.info("%d samples already ingested.", len(existing_samples)) - logger.info("%d new samples to ingest.", len(result)) + logger.info("%d samples ready to be ingested.", len(ingest_samples)) + logger.info("%d samples will be ingested.", len(result)) prof.write("count", len(result)) return result @@ -699,7 +712,7 @@ def file_size(uri: str) -> int: values = defaultdict(list) for vcf_uri in sample_uris: - status = Status.OK + status = Status.READY # Check for sample name issues try: @@ -729,13 +742,13 @@ def file_size(uri: str) -> int: # Check for index issues index_uri = find_index(vcf_uri) if not index_uri: - status = "" if status == Status.OK else status + "," + status = "" if status == Status.READY else status + "," status += Status.MISSING_INDEX records = 0 else: records = get_record_count(vcf_uri, index_uri) if records is None: - status = "" if status == Status.OK else status + "," + status = "" if status == Status.READY else status + "," status += Status.BAD_INDEX keys.append(sample_name) @@ -768,6 +781,7 @@ def ingest_samples_udf( verbose: bool = False, trace_id: Optional[str] = None, use_remote_tmp: bool = False, + disable_manifest: bool = False, ) -> None: """ Ingest samples into the dataset. @@ -788,6 +802,7 @@ def ingest_samples_udf( :param trace_id: trace ID for logging, defaults to None :param use_remote_tmp: use remote tmp space if VCFs need to be bgzipped, defaults to False (preferred for small VCFs) + :param disable_manifest: disable manifest update, defaults to False """ import tiledbvcf @@ -840,6 +855,7 @@ def create_index_file_worker(uri: str) -> Optional[str]: # Filter out failed index operations sample_uris = [uri for uri in sample_uris if uri] + # Ingest samples with indexes level = "debug" if verbose else "info" tiledbvcf.config_logging(level, "ingest.log") ds = tiledbvcf.Dataset( @@ -860,6 +876,24 @@ def create_index_file_worker(uri: str) -> Optional[str]: prof.write("log", extra=read_file("ingest.log")) + # Update manifest status for ingested samples + if not disable_manifest: + group = tiledb.Group(dataset_uri) + manifest_uri = group[MANIFEST_ARRAY].uri + + with tiledb.open(manifest_uri) as A: + manifest_df = A.query(cond=f"vcf_uri in {sample_uris}").df[:] + manifest_df["status"] = Status.OK + + manifest_dict = manifest_df.to_dict(orient="list") + sample_names = manifest_dict["sample_name"] + with tiledb.open(manifest_uri, "d") as A: + A.query(cond=f"sample_name in {sample_names}").submit() + + del manifest_dict["sample_name"] + with tiledb.open(manifest_uri, "w") as A: + A[sample_names] = manifest_dict + # Cleanup tmp space if use_remote_tmp and tmp_uris: vfs = tiledb.VFS() @@ -1162,6 +1196,7 @@ def ingest_samples_dag( consolidate_stats: bool = False, use_remote_tmp: bool = False, sample_list_uri: Optional[str] = None, + disable_manifest: bool = False, ) -> None: """ Create a DAG to ingest samples into the dataset. @@ -1190,6 +1225,7 @@ def ingest_samples_dag( :param use_remote_tmp: use remote tmp space if VCFs need to be bgzipped, defaults to False (preferred for small VCFs) :param sample_list_uri: URI with a list of VCF URIs, defaults to None + :param disable_manifest: disable manifest update, defaults to False """ logger = get_logger_wrapper(verbose) @@ -1216,6 +1252,7 @@ def ingest_samples_dag( filter_samples_udf, dataset_uri, config=config, + resume=resume, verbose=verbose, name="Filter VCF samples", resource_class="large", @@ -1227,7 +1264,7 @@ def ingest_samples_dag( sample_uris = sample_uris.result() if not sample_uris: - logger.info("No new samples to ingest.") + logger.info("No samples to ingest.") return None, [] # Limit number of samples to ingest @@ -1290,6 +1327,8 @@ def ingest_samples_dag( consolidate_dataset_udf, dataset_uri, config=config, + exclude=None, + include=[MANIFEST_ARRAY, LOG_ARRAY], id=f"vcf-consol-{i//workers}", verbose=verbose, resources=CONSOLIDATE_RESOURCES, @@ -1317,6 +1356,7 @@ def ingest_samples_dag( resources=ingest_resources, name=f"Ingest VCF {i+1}/{num_partitions}", access_credentials_name=acn, + disable_manifest=disable_manifest, ) if prev_consolidate: @@ -1498,6 +1538,7 @@ def ingest_vcf_annotations( resources=ingest_resources, name=f"Ingest annotations {i+1}/{len(vcf_uris)}", access_credentials_name=acn, + disable_manifest=True, ) # Set dependencies so that the ingest nodes run in parallel @@ -1678,6 +1719,7 @@ def ingest_vcf( consolidate_stats=consolidate_stats, use_remote_tmp=use_remote_tmp, sample_list_uri=sample_list_uri if disable_manifest else None, + disable_manifest=disable_manifest, ) # Register the dataset on TileDB Cloud From ae3d84be8910abf4ec85aa910b267a960f361f3a Mon Sep 17 00:00:00 2001 From: Alan Cleary Date: Mon, 16 Jun 2025 16:04:37 -0600 Subject: [PATCH 03/17] Removed resume TODO comment --- src/tiledb/cloud/vcf/ingestion.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/tiledb/cloud/vcf/ingestion.py b/src/tiledb/cloud/vcf/ingestion.py index 0e1be429a..0267e2f98 100644 --- a/src/tiledb/cloud/vcf/ingestion.py +++ b/src/tiledb/cloud/vcf/ingestion.py @@ -1247,7 +1247,6 @@ def ingest_samples_dag( ) # Get list of sample uris that have not been ingested yet - # TODO: handle second pass resume sample_uris = graph.submit( filter_samples_udf, dataset_uri, From 1cdbb7834aa005d1dbb59b8585d6ee4b89f4426f Mon Sep 17 00:00:00 2001 From: Alan Cleary Date: Mon, 16 Jun 2025 16:26:18 -0600 Subject: [PATCH 04/17] Code formatting --- src/tiledb/cloud/vcf/ingestion.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/tiledb/cloud/vcf/ingestion.py b/src/tiledb/cloud/vcf/ingestion.py index 0267e2f98..8e6d26228 100644 --- a/src/tiledb/cloud/vcf/ingestion.py +++ b/src/tiledb/cloud/vcf/ingestion.py @@ -75,7 +75,6 @@ class Contigs(enum.Enum): ALL_DISABLE_MERGE = enum.auto() - class Status(str, enum.Enum): """ The ingestion status of samples in the manifest. @@ -85,7 +84,7 @@ class Status(str, enum.Enum): MISSING_INDEX = the VCF file does not have a corresponding index file MISSING_SAMPLE_NAME = the VCF file does not have a sample name MULTIPLE_SAMPLES = the VCF file has multiple sample names - DUPLICATE_SAMPLE_NAME = one or more VCF files being ingested have duplicate sample names + DUPLICATE_SAMPLE_NAME = one or more VCF files have duplicate sample names BAD_INDEX = the VCF index file could not be properly read """ @@ -98,7 +97,6 @@ class Status(str, enum.Enum): BAD_INDEX = "bad index" - def get_logger_wrapper( verbose: bool = False, ) -> logging.Logger: @@ -643,11 +641,10 @@ def filter_samples_udf( group = tiledb.Group(dataset_uri) manifest_uri = group[MANIFEST_ARRAY].uri + cond = f"status == '{Status.READY}' or status == '{Status.MISSING_INDEX}'" with tiledb.open(manifest_uri) as A: manifest_df = A.df[:] - ingest_df = A.query( - cond=f"status == '{Status.READY}' or status == '{Status.MISSING_INDEX}'" - ).df[:] + ingest_df = A.query(cond=cond).df[:] manifest_samples = set(manifest_df.sample_name) # Sort manifest by sample_name From 04b1832952293960edcfd7c75fec2e06051228e1 Mon Sep 17 00:00:00 2001 From: Alan Cleary Date: Fri, 20 Jun 2025 11:21:35 -0600 Subject: [PATCH 05/17] Updated VCF ingest log messages to be more structured This includes renaming some variables to add context to the messages. --- src/tiledb/cloud/vcf/ingestion.py | 152 ++++++++++++++++-------------- 1 file changed, 82 insertions(+), 70 deletions(-) diff --git a/src/tiledb/cloud/vcf/ingestion.py b/src/tiledb/cloud/vcf/ingestion.py index 8e6d26228..bc5fbd28e 100644 --- a/src/tiledb/cloud/vcf/ingestion.py +++ b/src/tiledb/cloud/vcf/ingestion.py @@ -207,7 +207,7 @@ def create_dataset_udf( # Check if the dataset already exists with tiledb.scope_ctx(config): if tiledb.object_type(dataset_uri) != "group": - logger.info("Creating dataset: %r", dataset_uri) + logger.info(f"Creating dataset: {dataset_uri=}") # vcf_attrs overrides extra_attrs if vcf_attrs: @@ -243,7 +243,7 @@ def create_dataset_udf( write_log_event(log_uri, "create_dataset_udf", "create", data=dataset_uri) else: - logger.info("Using existing dataset: %r", dataset_uri) + logger.info(f"Using existing dataset: {dataset_uri=}") return dataset_uri @@ -285,15 +285,13 @@ def register_dataset_udf( except Exception: # tiledb.object_type raises an exception if the namespace does not exist - logger.error( - "Error checking if %r is registered. Bad namespace?", tiledb_uri - ) + logger.error(f"Error when checking if dataset is registered: {tiledb_uri=}") raise if found: - logger.info("Dataset already registered at %r.", tiledb_uri) + logger.info(f"Dataset is already registered: {tiledb_uri=}") else: - logger.info("Registering dataset at %r.", tiledb_uri) + logger.info(f"Registering dataset: {tiledb_uri=}") tiledb.cloud.groups.register( dataset_uri, name=register_name, @@ -331,15 +329,15 @@ def read_uris_udf( cmd = ("zcat", "-f") process_stream(uri=list_uri, cmd=cmd, output_uri=local_list) - result = [] + vcf_uris = [] for line in open(local_list): - result.append(line.strip()) - if max_files and len(result) == max_files: + vcf_uris.append(line.strip()) + if max_files and len(vcf_uris) == max_files: break - logger.info("Found %d VCF files.", len(result)) + logger.info(f"Reading VCF URIs from URI: {list_uri=}, {len(vcf_uris)=}") - return result + return vcf_uris def read_metadata_uris_udf( @@ -369,15 +367,18 @@ def read_metadata_uris_udf( with Profiler(group_uri=dataset_uri, group_member=LOG_ARRAY) as prof: with tiledb.open(metadata_uri) as A: df = A.query(dims=[], attrs=[metadata_attr]).df[:] - results = df[metadata_attr].to_list() + vcf_uris = df[metadata_attr].to_list() if max_files: - results = results[:max_files] + vcf_uris = vcf_uris[:max_files] - logger.info("Read %d VCF URIs from the metadata array.", len(results)) - prof.write("count", len(results)) + logger.info( + "Reading VCF URIs from the metadata array: " + f"{metadata_uri=}, {metadata_attr=}, {len(vcf_uris)=}" + ) + prof.write("count", len(vcf_uris)) - return results + return vcf_uris def find_uris_udf( @@ -461,7 +462,7 @@ def callback(uri, size_bytes): # Add one trailing slash to search_uri search_uri = search_uri.rstrip("/") + "/" - results = find( + vcf_uris = find( search_uri, config=config, include=include, @@ -469,10 +470,13 @@ def callback(uri, size_bytes): max_count=max_files, ) - logger.info("Found %d VCF files.", len(results)) - prof.write("count", len(results)) + logger.info( + "Searching for VCF URIs: " + f"{search_uri=}, {include=}, {exclude=}, {len(vcf_uris)=}" + ) + prof.write("count", len(vcf_uris)) - return results + return vcf_uris def find_uris_aws_udf( @@ -556,16 +560,19 @@ def find_uris_aws_udf( # Build list of URIs from command output. # Example line from s3: # (dryrun) download: s3://1000genomes-dragen-v3.7.6/foo to foo - result = [] + vcf_uris = [] if res_stdout: for line in res_stdout.splitlines(): line = line.split()[2] if use_s3 else line - result.append(line) + vcf_uris.append(line) - logger.info("Found %d VCF files.", len(result)) - prof.write("count", len(result)) + logger.info( + "Searching for VCF URIs with AWS CLI: " + f"{search_uri=}, {include=}, {exclude=}, {len(vcf_uris)=}" + ) + prof.write("count", len(vcf_uris)) - return result + return vcf_uris def filter_uris_udf( @@ -598,13 +605,12 @@ def filter_uris_udf( # Find URIs that are not in the manifest sample_uris_set = set(sample_uris) manifest_uris = set(manifest_df.vcf_uri) - result = sorted(list(sample_uris_set.difference(manifest_uris))) + new_uris = sorted(list(sample_uris_set.difference(manifest_uris))) - logger.info("%d URIs in the manifest.", len(manifest_uris)) - logger.info("%d new URIs.", len(result)) - prof.write("count", len(result)) + logger.info(f"Filtering URIs: {len(manifest_uris)=}, {len(new_uris)=}") + prof.write("count", len(new_uris)) - return result + return new_uris def filter_samples_udf( @@ -644,31 +650,34 @@ def filter_samples_udf( cond = f"status == '{Status.READY}' or status == '{Status.MISSING_INDEX}'" with tiledb.open(manifest_uri) as A: manifest_df = A.df[:] - ingest_df = A.query(cond=cond).df[:] + ready_df = A.query(cond=cond).df[:] manifest_samples = set(manifest_df.sample_name) # Sort manifest by sample_name - ingest_df = ingest_df.sort_values(by=["sample_name"]) - ingest_samples = set(ingest_df.sample_name) + ready_df = ready_df.sort_values(by=["sample_name"]) + ready_samples = set(ready_df.sample_name) # Find samples that have already been ingested or failed to ingest - existing_samples = dataset_samples.difference(ingest_samples) - failed_samples = dataset_samples.intersection(ingest_samples) + ingested_samples = dataset_samples.difference(ready_samples) + incomplete_samples = dataset_samples.intersection(ready_samples) # Finalize samples to ingest if not resume: - ingest_df = ingest_df[~ingest_df.sample_name.isin(failed_samples)] - result = ingest_df.vcf_uri.to_list() - - logger.info("%d samples in the dataset.", len(dataset_samples)) - logger.info("%d samples fully ingested.", len(existing_samples)) - logger.info("%d samples partially ingested.", len(failed_samples)) - logger.info("%d samples in the manifest.", len(manifest_samples)) - logger.info("%d samples ready to be ingested.", len(ingest_samples)) - logger.info("%d samples will be ingested.", len(result)) - prof.write("count", len(result)) + ready_df = ready_df[~ready_df.sample_name.isin(incomplete_samples)] + queued_samples = ready_df.vcf_uri.to_list() + + logger.info( + "Filtering samples: " + f"{len(dataset_samples)=}, " + f"{len(ingested_samples)=}, " + f"{len(incomplete_samples)=}, " + f"{len(manifest_samples)=}, " + f"{len(ready_samples)=}, " + f"{len(queued_samples)=}" + ) + prof.write("count", len(queued_samples)) - return result + return queued_samples def ingest_manifest_udf( @@ -715,10 +724,7 @@ def file_size(uri: str) -> int: try: sample_name = get_sample_name(vcf_uri) except Exception: - logger.warning( - "Skipping invalid VCF file: %r", - vcf_uri, - ) + logger.warning(f"Skipping invalid VCF file: {vcf_uri=}") continue if not sample_name: @@ -830,12 +836,12 @@ def create_index_file_worker(uri: str) -> Optional[str]: """ with tiledb.scope_ctx(config): if create_index or not find_index(uri): - logger.info("indexing %r", uri) + logger.info(f"Indexing VCF: {uri=}") try: create_index_file(uri) except RuntimeError as exc: logger.warning("%r: %s", uri, exc) - logger.info("sort, bgzip, and index %r", uri) + logger.info(f"sort, bgzip, and index: {uri=}") try: uri = sort_and_bgzip(uri, tmp_space=tmp_space) tmp_uris.append(uri) @@ -895,10 +901,10 @@ def create_index_file_worker(uri: str) -> Optional[str]: if use_remote_tmp and tmp_uris: vfs = tiledb.VFS() for uri in tmp_uris: - logger.debug("removing %r", uri) + logger.debug(f"Removing tmp file: {uri=}") vfs.remove_file(uri) - logger.info("max memory usage: %.3f GiB", max_memory_usage() / (1 << 30)) + logger.info("Max memory usage: %.3f GiB", max_memory_usage() / (1 << 30)) def consolidate_dataset_udf( @@ -956,7 +962,9 @@ def consolidate_dataset_udf( modes += ["fragments"] for mode in modes: - logger.debug("Consolidating %r in %r (%s)", mode, uri, name) + logger.debug( + f"Consolidating dataset array: {mode=}, {uri=}, {name=}" + ) config = tiledb.Config({"sm.consolidation.mode": mode}) try: tiledb.consolidate(uri, config=config) @@ -964,14 +972,14 @@ def consolidate_dataset_udf( print(e) for mode in modes: - logger.debug("Vacuuming %r in %r (%s)", mode, uri, name) + logger.debug(f"Vacuuming dataset array: {mode=}, {uri=}, {name=}") config = tiledb.Config({"sm.vacuum.mode": mode}) try: tiledb.vacuum(uri, config=config) except Exception as e: print(e) - logger.info("max memory usage: %.3f GiB", max_memory_usage() / (1 << 30)) + logger.info("Max memory usage: %.3f GiB", max_memory_usage() / (1 << 30)) # -------------------------------------------------------------------- @@ -1108,10 +1116,10 @@ def ingest_manifest_dag( sample_uris = filtered_sample_uris.result() if not sample_uris: - logger.info("All samples found are already in the manifest.") + logger.info("All samples found are already in the manifest") return - logger.info("Found %d new URIs.", len(sample_uris)) + logger.info(f"Found new URIs: {len(sample_uris)=}") graph = dag.DAG( name="vcf-populate-manifest", @@ -1170,7 +1178,7 @@ def ingest_manifest_dag( if consolidate: consolidate.depends_on(ingest) - logger.info("Populating the manifest.") + logger.info("Populating the manifest") run_dag(graph) @@ -1260,7 +1268,7 @@ def ingest_samples_dag( sample_uris = sample_uris.result() if not sample_uris: - logger.info("No samples to ingest.") + logger.info("No samples to ingest") return None, [] # Limit number of samples to ingest @@ -1306,9 +1314,13 @@ def ingest_samples_dag( if ingest_resources is None: ingest_resources = {"cpu": f"{threads}", "memory": f"{node_memory_mb}Mi"} - logger.debug("partitions=%d, consolidates=%d", num_partitions, num_consolidates) - logger.debug("ingest_resources=%s", ingest_resources) - logger.debug("consolidate_resources=%s", CONSOLIDATE_RESOURCES) + logger.debug( + "Ingesting samples DAG configuration: " + f"{num_partitions=}, " + f"{num_consolidates=}, " + f"{ingest_resources=}, " + f"{CONSOLIDATE_RESOURCES=}" + ) # This loop creates a DAG with the following structure: # - Submit N ingest tasks in parallel, where N is `workers` or less @@ -1386,12 +1398,12 @@ def group_member_uri(group_uri, group_member, config): dependencies=[consolidate], ) - logger.info("Ingesting %d samples.", len(sample_uris)) + logger.info(f"Ingesting samples: {len(sample_uris)=}") run_dag(graph, wait=False) logger.info( - "VCF samples ingestion submitted -" - " https://cloud.tiledb.com/activity/taskgraphs/%s/%s", + "VCF samples ingestion submitted: " + "https://cloud.tiledb.com/activity/taskgraphs/%s/%s", graph.namespace, graph.server_graph_uuid, ) @@ -1448,7 +1460,7 @@ def ingest_vcf_annotations( raise ValueError("acn must be provided to register the dataset") logger = get_logger_wrapper(verbose) - logger.info("Ingesting annotation VCF into %r", dataset_uri) + logger.info("Ingesting annotation VCF: {dataset_uri=}") if search_uri: # Create and run DAG to find VCF URIs. @@ -1670,7 +1682,7 @@ def ingest_vcf( dataset_uri = dataset_uri.rstrip("/") logger = get_logger_wrapper(verbose) - logger.info("Ingesting VCF samples into %r", dataset_uri) + logger.info(f"Ingesting VCF samples: {dataset_uri=}") # Add VCF URIs to the manifest ingest_manifest_dag( From 5d4884aea03becfbbea1cb0f2939813dabcce225 Mon Sep 17 00:00:00 2001 From: Alan Cleary Date: Mon, 23 Jun 2025 13:55:01 -0600 Subject: [PATCH 06/17] Added __str__() special method to VCF Status enum This ensures that a member's value is saved to the ingestion manifest, rather than the member's name. --- src/tiledb/cloud/vcf/ingestion.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/tiledb/cloud/vcf/ingestion.py b/src/tiledb/cloud/vcf/ingestion.py index 0f90fab04..e47444c02 100644 --- a/src/tiledb/cloud/vcf/ingestion.py +++ b/src/tiledb/cloud/vcf/ingestion.py @@ -96,6 +96,9 @@ class Status(str, enum.Enum): DUPLICATE_SAMPLE_NAME = "duplicate sample name" BAD_INDEX = "bad index" + def __str__(self) -> str: + return self.value + def get_logger_wrapper( verbose: bool = False, From 320d028c7dd71f77623a40a15b9c1f27a7740c82 Mon Sep 17 00:00:00 2001 From: Alan Cleary Date: Thu, 3 Jul 2025 16:39:13 -0600 Subject: [PATCH 07/17] Added wait parameter to ingest_vcf This allows the function to be blocking, if need be. The default value is False to preserve the previous unparameterized behavior. --- src/tiledb/cloud/vcf/ingestion.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/tiledb/cloud/vcf/ingestion.py b/src/tiledb/cloud/vcf/ingestion.py index ea6d0da88..dec0f8b1f 100644 --- a/src/tiledb/cloud/vcf/ingestion.py +++ b/src/tiledb/cloud/vcf/ingestion.py @@ -1206,6 +1206,7 @@ def ingest_samples_dag( use_remote_tmp: bool = False, sample_list_uri: Optional[str] = None, disable_manifest: bool = False, + wait: bool = False, ) -> None: """ Create a DAG to ingest samples into the dataset. @@ -1235,6 +1236,7 @@ def ingest_samples_dag( defaults to False (preferred for small VCFs) :param sample_list_uri: URI with a list of VCF URIs, defaults to None :param disable_manifest: disable manifest update, defaults to False + :param wait: wait for the ingestion to complete before returning, defaults to False """ logger = get_logger_wrapper(verbose) @@ -1403,7 +1405,6 @@ def group_member_uri(group_uri, group_member, config): ) logger.info(f"Ingesting samples: {len(sample_uris)=}") - run_dag(graph, wait=False) logger.info( "VCF samples ingestion submitted: " @@ -1411,6 +1412,7 @@ def group_member_uri(group_uri, group_member, config): graph.namespace, graph.server_graph_uuid, ) + run_dag(graph, wait=wait) # -------------------------------------------------------------------- @@ -1615,6 +1617,7 @@ def ingest_vcf( aws_find_mode: bool = False, use_remote_tmp: bool = False, disable_manifest: bool = False, + wait: bool = False, ) -> None: """ Ingest samples into a dataset. @@ -1664,6 +1667,7 @@ def ingest_vcf( :param use_remote_tmp: use remote tmp space if VCFs need to be sorted and bgzipped, defaults to False (preferred for small VCFs) :param disable_manifest: disable manifest creation, defaults to False + :param wait: wait for the ingestion to complete before returning, defaults to False """ # Validate user input @@ -1732,6 +1736,7 @@ def ingest_vcf( use_remote_tmp=use_remote_tmp, sample_list_uri=sample_list_uri if disable_manifest else None, disable_manifest=disable_manifest, + wait=wait, ) # Register the dataset on TileDB Cloud From 00d1f88562fd509ccdaef5ca9bd6a1107b92738a Mon Sep 17 00:00:00 2001 From: Alan Cleary Date: Thu, 3 Jul 2025 16:43:07 -0600 Subject: [PATCH 08/17] Added ingest_vcf to vcf module export This allows the function to be used externally. --- src/tiledb/cloud/vcf/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/tiledb/cloud/vcf/__init__.py b/src/tiledb/cloud/vcf/__init__.py index bb6559b11..5c3257c7f 100644 --- a/src/tiledb/cloud/vcf/__init__.py +++ b/src/tiledb/cloud/vcf/__init__.py @@ -3,6 +3,7 @@ from .ingestion import create_dataset_udf as create_dataset from .ingestion import ingest from .ingestion import ingest_annotations +from .ingestion import ingest_vcf from .ingestion import register_dataset_udf as register_dataset from .query import build_read_dag from .query import read @@ -21,6 +22,7 @@ "create_dataset", "ingest", "ingest_annotations", + "ingest_vcf", "register_dataset", "build_read_dag", "read", From 9b5207330be21549de1d048d65f255d16f158b9c Mon Sep 17 00:00:00 2001 From: Alan Cleary Date: Thu, 3 Jul 2025 17:02:44 -0600 Subject: [PATCH 09/17] Added base class for testing VCF ingestion The class provides common setup, log capturing during ingestion, and tests common to all ingestion types. --- tests/vcf/test_ingest.py | 168 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 tests/vcf/test_ingest.py diff --git a/tests/vcf/test_ingest.py b/tests/vcf/test_ingest.py new file mode 100644 index 000000000..934448db9 --- /dev/null +++ b/tests/vcf/test_ingest.py @@ -0,0 +1,168 @@ +import io +import logging +import re +import unittest +from contextlib import redirect_stdout + +import tiledbvcf + +import tiledb.cloud +import tiledb.cloud.vcf +from tiledb.cloud._vendor import cloudpickle +from tiledb.cloud.utilities import get_logger + +# Pickle the vcf module by value, so tests run on the latest code. +cloudpickle.register_pickle_by_value(tiledb.cloud.vcf) + +# Test data location +S3_BUCKET = "s3://tiledb-unittest/vcf-ingestion-test" + +# Test VCF URIs +READY_URI = f"{S3_BUCKET}/01-ready.vcf.gz" # "OK" after ingestion +MISSING_INDEX_URI = f"{S3_BUCKET}/02-missing-index.vcf.gz" # "OK" after ingestion +MISSING_SAMPLE_NAME_URI = f"{S3_BUCKET}/03-missing-sample-name.vcf.gz" +MULTIPLE_SAMPLES_URI = f"{S3_BUCKET}/04-multiple-samples.vcf.gz" +DUPLICATE_SAMPLE_NAME_URI = f"{S3_BUCKET}/05-duplicate-sample-name.vcf.gz" +BAD_INDEX_URI = f"{S3_BUCKET}/06-bad-index.vcf.gz" + +# Test data samples +READY_SAMPLE_NAME = "ready" +MISSING_INDEX_SAMPLE_NAME = "missing index" + +# Array names +LOG_ARRAY = "log" +MANIFEST_ARRAY = "manifest" + +# Filter samples log message +FILTER_SAMPLES_LOG = ( + "Filtering samples: " + "len(dataset_samples)={}, " + "len(ingested_samples)={}, " + "len(incomplete_samples)={}, " + "len(manifest_samples)={}, " + "len(ready_samples)={}, " + "len(queued_samples)={}" +) + + +def outputs2msgs(outputs: list[str]) -> list[str]: + """ + Extracts the messages from log outputs with the format + "[%(asctime)s] [%(module)s] [%(funcName)s] [%(levelname)s] %(message)s" + """ + pattern = re.compile(r"\[.*\] \[.*\] \[.*\] \[.*\] ") + msgs = [] + for o in outputs: + if pattern.match(o): + msg = pattern.sub("", o) + msgs.append(msg) + else: + msgs.append(o) + return msgs + + +class TestVCFIngestionBase(unittest.TestCase): + __unittest_skip__ = True + + @classmethod + def _setup(cls) -> None: + cls.data_uri = S3_BUCKET + ( + cls.namespace, + cls.storage_path, + cls.acn, + ) = tiledb.cloud.groups._default_ns_path_cred() + cls.namespace = cls.namespace.rstrip("/") + cls.storage_path = cls.storage_path.rstrip("/") + cls.array_name = tiledb.cloud._common.testonly.random_name("vcf-test") + cls.dataset_uri = ( + f"tiledb://{cls.namespace}/{cls.storage_path}/{cls.array_name}" + ) + + @classmethod + def _ingest(cls) -> None: + raise NotImplementedError("Implement VCF ingestion") + + @classmethod + def setUpClass(cls) -> None: + cls._setup() + + # Capture local and cloud logs during ingestion + logger = get_logger(logging.INFO) + f = io.StringIO() # cloud logs are printed + with cls.assertLogs(cls, logger=logger) as lg, redirect_stdout(f): + cls._ingest() + local_logs = list(map(lambda r: r.getMessage(), lg.records)) + cloud_logs = outputs2msgs(f.getvalue().splitlines()) + cls.logs = local_logs + cloud_logs + + @classmethod + def tearDownClass(cls) -> None: + if tiledb.object_type(cls.dataset_uri): + tiledb.cloud.asset.delete(cls.dataset_uri, recursive=True) + + def test_dataset_creation(self): + self.assertIn(f"Creating dataset: dataset_uri='{self.dataset_uri}'", self.logs) + + def test_dataset_group_and_arrays(self): + self.assertEqual(tiledb.object_type(self.dataset_uri), "group") + group = tiledb.Group(self.dataset_uri) + manifest_uri = group[MANIFEST_ARRAY].uri + self.assertEqual(tiledb.object_type(manifest_uri), "array") + self.assertEqual(tiledb.object_type(group[LOG_ARRAY].uri), "array") + + def test_filter_uris(self): + self.assertIn( + "Filtering URIs: len(manifest_uris)=0, len(new_uris)=6", self.logs + ) + + def test_filter_samples(self): + self.assertIn(FILTER_SAMPLES_LOG.format(0, 0, 0, 5, 2, 2), self.logs) + + def test_manifest(self): + group = tiledb.Group(self.dataset_uri) + manifest_uri = group[MANIFEST_ARRAY].uri + with tiledb.open(manifest_uri) as A: + manifest_df = A.df[:] + + ok_df = manifest_df[manifest_df["status"] == "ok"] + self.assertEqual(len(ok_df), 2) + self.assertIn(READY_URI, ok_df["vcf_uri"].tolist()) + self.assertIn(READY_URI + ".tbi", ok_df["index_uri"].tolist()) + self.assertIn(MISSING_INDEX_URI, ok_df["vcf_uri"].tolist()) + self.assertIn("None", ok_df["index_uri"].tolist()) + + # NOTE: This code path is currently unreachable; an error is logged instead + self.assertIn( + f"Skipping invalid VCF file: vcf_uri='{MISSING_SAMPLE_NAME_URI}'", self.logs + ) + + multiple_samples_df = manifest_df[manifest_df["status"] == "multiple samples"] + self.assertEqual(len(multiple_samples_df), 1) + + duplicate_sample_name_df = manifest_df[ + manifest_df["status"] == "duplicate sample name" + ] + self.assertEqual(len(duplicate_sample_name_df), 1) + + bad_index_df = manifest_df[manifest_df["status"] == "bad index"] + self.assertEqual(len(bad_index_df), 1) + + def test_dataset(self): + ds = tiledbvcf.Dataset( + self.dataset_uri, + cfg=tiledbvcf.ReadConfig(tiledb_config=self.config), + ) + samples = ds.samples() + self.assertEqual(len(samples), 2) + self.assertIn(READY_SAMPLE_NAME, samples) + self.assertIn(MISSING_INDEX_SAMPLE_NAME, samples) + + +# TODO: search_uri, pattern, ignore + +# TODO: sample_list_uri, disable_manifest + +# TODO: metadata_uri, metadata_attr + +# TODO: test resume From 377bb0c4fe2458711bebfa63f234af62683f2029 Mon Sep 17 00:00:00 2001 From: Alan Cleary Date: Thu, 3 Jul 2025 17:03:46 -0600 Subject: [PATCH 10/17] Added class for testing VCF ingestion via search URI This class extends the base class for VCF ingestion, implementing ingestion via search URI and search-specific test cases. --- tests/vcf/test_ingest.py | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/tests/vcf/test_ingest.py b/tests/vcf/test_ingest.py index 934448db9..5205bed48 100644 --- a/tests/vcf/test_ingest.py +++ b/tests/vcf/test_ingest.py @@ -159,7 +159,35 @@ def test_dataset(self): self.assertIn(MISSING_INDEX_SAMPLE_NAME, samples) -# TODO: search_uri, pattern, ignore +class TestVCFIngestionSearch(TestVCFIngestionBase): + __unittest_skip__ = False + + @classmethod + def _setup(cls): + super(TestVCFIngestionSearch, cls)._setup() + cls.search_uri = cls.data_uri + "/" + cls.search_pattern = "*.vcf.gz" + + @classmethod + def _ingest(cls) -> None: + tiledb.cloud.vcf.ingest_vcf( + dataset_uri=cls.dataset_uri, + search_uri=cls.search_uri, + pattern=cls.search_pattern, + config=cls.config, + wait=True, + ) + + def test_find_uris_logs(self): + msg = ( + "Searching for VCF URIs: " + f"search_uri='{self.search_uri}', " + f"include='{self.search_pattern}', " + "exclude=None, " + "len(vcf_uris)=6" + ) + self.assertIn(msg, self.logs) + # TODO: sample_list_uri, disable_manifest From 5950f584b910d8c539deb6cda1355c5a6a53be1c Mon Sep 17 00:00:00 2001 From: Alan Cleary Date: Mon, 14 Jul 2025 12:23:49 -0600 Subject: [PATCH 11/17] Added class for testing VCF ingestion via a sample list This class extends the base class for VCF ingestion, implementing ingestion via sample list and list-specific test cases. --- tests/vcf/test_ingest.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/tests/vcf/test_ingest.py b/tests/vcf/test_ingest.py index 5205bed48..eef81808f 100644 --- a/tests/vcf/test_ingest.py +++ b/tests/vcf/test_ingest.py @@ -189,7 +189,30 @@ def test_find_uris_logs(self): self.assertIn(msg, self.logs) -# TODO: sample_list_uri, disable_manifest +class TestVCFIngestionSampleList(TestVCFIngestionBase): + __unittest_skip__ = False + + @classmethod + def _setup(cls): + super(TestVCFIngestionSampleList, cls)._setup() + cls.sample_list_uri = cls.data_uri + "/sample-list.txt" + + @classmethod + def _ingest(cls) -> None: + tiledb.cloud.vcf.ingest_vcf( + dataset_uri=cls.dataset_uri, + sample_list_uri=cls.sample_list_uri, + config=cls.config, + wait=True, + ) + + def test_read_uris_logs(self): + msg = ( + "Reading VCF URIs from URI: " + f"list_uri='{self.sample_list_uri}', len(vcf_uris)=6" + ) + self.assertIn(msg, self.logs) + # TODO: metadata_uri, metadata_attr From a1869d461f92ec5aa45f425bf6c094cb42c43fa1 Mon Sep 17 00:00:00 2001 From: Alan Cleary Date: Mon, 14 Jul 2025 12:27:36 -0600 Subject: [PATCH 12/17] Added class for testing VCF ingestion via a metadata array This class extends the base class for VCF ingestion, implementing ingestion via metadata array and metadata-specific test cases. --- tests/vcf/test_ingest.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/tests/vcf/test_ingest.py b/tests/vcf/test_ingest.py index eef81808f..765855580 100644 --- a/tests/vcf/test_ingest.py +++ b/tests/vcf/test_ingest.py @@ -214,6 +214,29 @@ def test_read_uris_logs(self): self.assertIn(msg, self.logs) -# TODO: metadata_uri, metadata_attr +class TestVCFIngestionMetadata(TestVCFIngestionBase): + __unittest_skip__ = False + + @classmethod + def _setup(cls): + super(TestVCFIngestionMetadata, cls)._setup() + cls.metadata_uri = cls.data_uri + "/metadata-array/" + + @classmethod + def _ingest(cls) -> None: + tiledb.cloud.vcf.ingest_vcf( + dataset_uri=cls.dataset_uri, + metadata_uri=cls.metadata_uri, + config=cls.config, + wait=True, + ) + + def test_read_metadata_logs(self): + msg = ( + "Reading VCF URIs from the metadata array: " + f"metadata_uri='{self.metadata_uri}', metadata_attr='uri', len(vcf_uris)=6" + ) + self.assertIn(msg, self.logs) + # TODO: test resume From 50f5faf6edbfcab6a9bf96686aeeba36d4b10e99 Mon Sep 17 00:00:00 2001 From: Alan Cleary Date: Mon, 14 Jul 2025 12:33:23 -0600 Subject: [PATCH 13/17] Moved common VCF ingestion tests into a separate class from the base class This allows the base class configure, setup, and teardown to be leveraged by classes that don't need to run the common tests. --- tests/vcf/test_ingest.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/vcf/test_ingest.py b/tests/vcf/test_ingest.py index 765855580..fee9b79e3 100644 --- a/tests/vcf/test_ingest.py +++ b/tests/vcf/test_ingest.py @@ -101,6 +101,10 @@ def tearDownClass(cls) -> None: if tiledb.object_type(cls.dataset_uri): tiledb.cloud.asset.delete(cls.dataset_uri, recursive=True) + +class TestVCFIngestionCommon(TestVCFIngestionBase): + __unittest_skip__ = True + def test_dataset_creation(self): self.assertIn(f"Creating dataset: dataset_uri='{self.dataset_uri}'", self.logs) @@ -159,7 +163,7 @@ def test_dataset(self): self.assertIn(MISSING_INDEX_SAMPLE_NAME, samples) -class TestVCFIngestionSearch(TestVCFIngestionBase): +class TestVCFIngestionSearch(TestVCFIngestionCommon): __unittest_skip__ = False @classmethod @@ -189,7 +193,7 @@ def test_find_uris_logs(self): self.assertIn(msg, self.logs) -class TestVCFIngestionSampleList(TestVCFIngestionBase): +class TestVCFIngestionSampleList(TestVCFIngestionCommon): __unittest_skip__ = False @classmethod @@ -214,7 +218,7 @@ def test_read_uris_logs(self): self.assertIn(msg, self.logs) -class TestVCFIngestionMetadata(TestVCFIngestionBase): +class TestVCFIngestionMetadata(TestVCFIngestionCommon): __unittest_skip__ = False @classmethod From eff4d97c236e48a62b4d705d1aeebc7230ccb62e Mon Sep 17 00:00:00 2001 From: Alan Cleary Date: Mon, 14 Jul 2025 12:41:21 -0600 Subject: [PATCH 14/17] Minor code formatting revisions in VCF ingestion common tests --- tests/vcf/test_ingest.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/vcf/test_ingest.py b/tests/vcf/test_ingest.py index fee9b79e3..7100d614c 100644 --- a/tests/vcf/test_ingest.py +++ b/tests/vcf/test_ingest.py @@ -112,8 +112,9 @@ def test_dataset_group_and_arrays(self): self.assertEqual(tiledb.object_type(self.dataset_uri), "group") group = tiledb.Group(self.dataset_uri) manifest_uri = group[MANIFEST_ARRAY].uri + log_uri = group[LOG_ARRAY].uri self.assertEqual(tiledb.object_type(manifest_uri), "array") - self.assertEqual(tiledb.object_type(group[LOG_ARRAY].uri), "array") + self.assertEqual(tiledb.object_type(log_uri), "array") def test_filter_uris(self): self.assertIn( @@ -131,10 +132,12 @@ def test_manifest(self): ok_df = manifest_df[manifest_df["status"] == "ok"] self.assertEqual(len(ok_df), 2) - self.assertIn(READY_URI, ok_df["vcf_uri"].tolist()) - self.assertIn(READY_URI + ".tbi", ok_df["index_uri"].tolist()) - self.assertIn(MISSING_INDEX_URI, ok_df["vcf_uri"].tolist()) - self.assertIn("None", ok_df["index_uri"].tolist()) + ok_vcfs = ok_df["vcf_uri"].tolist() + ok_indexes = ok_df["index_uri"].tolist() + self.assertIn(READY_URI, ok_vcfs) + self.assertIn(READY_URI + ".tbi", ok_indexes) + self.assertIn(MISSING_INDEX_URI, ok_vcfs) + self.assertIn("None", ok_indexes) # NOTE: This code path is currently unreachable; an error is logged instead self.assertIn( From 7831b58e62f66d3d903474633d275a370eed0c21 Mon Sep 17 00:00:00 2001 From: Alan Cleary Date: Mon, 14 Jul 2025 13:00:53 -0600 Subject: [PATCH 15/17] Added class for testing resuming failed VCF ingestions This class extends the base class for VCF ingestion, implementing ingestions that simulate a failed ingestion with subsequent (non-)resume ingestions, as well as resume-specific test cases. Note that causing an ingestion to fail programmatically is not support by VCF; ingestion failure is enabled for this class by monkey patching the sample ingestion DAG function, allowing failures to be simulated deterministically. --- tests/vcf/test_ingest.py | 68 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/tests/vcf/test_ingest.py b/tests/vcf/test_ingest.py index 7100d614c..11fac8539 100644 --- a/tests/vcf/test_ingest.py +++ b/tests/vcf/test_ingest.py @@ -11,6 +11,26 @@ from tiledb.cloud._vendor import cloudpickle from tiledb.cloud.utilities import get_logger + +def ingest_samples_with_failure_dag(*args, config={}, **kwargs) -> None: + """ + Wraps the original `ingest_samples_dag` function and reads the config to determine + if an ingestion failure should be simulated, i.e. the manifest is not updated after + ingestion. + """ + from tiledb.cloud.vcf.ingestion import _ingest_samples_dag + + if config.get("test.vcf.fail_ingest", False): + kwargs["disable_manifest"] = True + return _ingest_samples_dag(*args, config=config, **kwargs) + + +# Monkey patch the failure function +tiledb.cloud.vcf.ingestion._ingest_samples_dag = ( + tiledb.cloud.vcf.ingestion.ingest_samples_dag +) +tiledb.cloud.vcf.ingestion.ingest_samples_dag = ingest_samples_with_failure_dag + # Pickle the vcf module by value, so tests run on the latest code. cloudpickle.register_pickle_by_value(tiledb.cloud.vcf) @@ -246,4 +266,50 @@ def test_read_metadata_logs(self): self.assertIn(msg, self.logs) -# TODO: test resume +class TestVCFIngestionResume(TestVCFIngestionBase): + __unittest_skip__ = False + + @classmethod + def _setup(cls): + super(TestVCFIngestionResume, cls)._setup() + cls.metadata_uri = cls.data_uri + "/metadata-array/" + + @classmethod + def _ingest(cls) -> None: + # Ingest with "failure" + tiledb.cloud.vcf.ingest_vcf( + dataset_uri=cls.dataset_uri, + metadata_uri=cls.metadata_uri, + config=cls.config | {"test.vcf.fail_ingest": True}, + wait=True, + ) + # Re-ingest without resume + tiledb.cloud.vcf.ingest_vcf( + dataset_uri=cls.dataset_uri, + metadata_uri=cls.metadata_uri, + config=cls.config, + wait=True, + resume=False, + ) + # Re-ingest with resume + tiledb.cloud.vcf.ingest_vcf( + dataset_uri=cls.dataset_uri, + metadata_uri=cls.metadata_uri, + config=cls.config, + wait=True, + ) + + def test_existing_dataset(self): + self.assertIn( + f"Using existing dataset: dataset_uri='{self.dataset_uri}'", + self.logs, + ) + + def test_failed_ingest(self): + self.assertIn(FILTER_SAMPLES_LOG.format(0, 0, 0, 5, 2, 2), self.logs) + + def test_no_resume(self): + self.assertIn(FILTER_SAMPLES_LOG.format(2, 0, 2, 5, 2, 0), self.logs) + + def test_resume(self): + self.assertIn(FILTER_SAMPLES_LOG.format(2, 0, 2, 5, 2, 2), self.logs) From 6ad2aa186d59c53356e38c34766188b2435b453f Mon Sep 17 00:00:00 2001 From: Alan Cleary Date: Mon, 14 Jul 2025 13:53:04 -0600 Subject: [PATCH 16/17] Disabled all VCF ingestion tests This is to prevent them from being run by CI. --- tests/vcf/test_ingest.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/vcf/test_ingest.py b/tests/vcf/test_ingest.py index 11fac8539..e089477cc 100644 --- a/tests/vcf/test_ingest.py +++ b/tests/vcf/test_ingest.py @@ -187,7 +187,7 @@ def test_dataset(self): class TestVCFIngestionSearch(TestVCFIngestionCommon): - __unittest_skip__ = False + __unittest_skip__ = True @classmethod def _setup(cls): @@ -217,7 +217,7 @@ def test_find_uris_logs(self): class TestVCFIngestionSampleList(TestVCFIngestionCommon): - __unittest_skip__ = False + __unittest_skip__ = True @classmethod def _setup(cls): @@ -242,7 +242,7 @@ def test_read_uris_logs(self): class TestVCFIngestionMetadata(TestVCFIngestionCommon): - __unittest_skip__ = False + __unittest_skip__ = True @classmethod def _setup(cls): @@ -267,7 +267,7 @@ def test_read_metadata_logs(self): class TestVCFIngestionResume(TestVCFIngestionBase): - __unittest_skip__ = False + __unittest_skip__ = True @classmethod def _setup(cls): From 9a7f936050a852f5988ae54fce22b7f437945c57 Mon Sep 17 00:00:00 2001 From: Alan Cleary Date: Mon, 14 Jul 2025 13:55:40 -0600 Subject: [PATCH 17/17] Moved tiledbvcf import into the single test case that uses it This is to prevent an import error during CI since TileDB-VCF is not installed in the CI environment. --- tests/vcf/test_ingest.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/vcf/test_ingest.py b/tests/vcf/test_ingest.py index e089477cc..fd2759c64 100644 --- a/tests/vcf/test_ingest.py +++ b/tests/vcf/test_ingest.py @@ -4,8 +4,6 @@ import unittest from contextlib import redirect_stdout -import tiledbvcf - import tiledb.cloud import tiledb.cloud.vcf from tiledb.cloud._vendor import cloudpickle @@ -176,6 +174,8 @@ def test_manifest(self): self.assertEqual(len(bad_index_df), 1) def test_dataset(self): + import tiledbvcf + ds = tiledbvcf.Dataset( self.dataset_uri, cfg=tiledbvcf.ReadConfig(tiledb_config=self.config),