diff --git a/src/tiledb/cloud/vcf/__init__.py b/src/tiledb/cloud/vcf/__init__.py index bb6559b11..5c3257c7f 100644 --- a/src/tiledb/cloud/vcf/__init__.py +++ b/src/tiledb/cloud/vcf/__init__.py @@ -3,6 +3,7 @@ from .ingestion import create_dataset_udf as create_dataset from .ingestion import ingest from .ingestion import ingest_annotations +from .ingestion import ingest_vcf from .ingestion import register_dataset_udf as register_dataset from .query import build_read_dag from .query import read @@ -21,6 +22,7 @@ "create_dataset", "ingest", "ingest_annotations", + "ingest_vcf", "register_dataset", "build_read_dag", "read", diff --git a/src/tiledb/cloud/vcf/ingestion.py b/src/tiledb/cloud/vcf/ingestion.py index 301838803..a107b4ded 100644 --- a/src/tiledb/cloud/vcf/ingestion.py +++ b/src/tiledb/cloud/vcf/ingestion.py @@ -87,6 +87,31 @@ class Contigs(enum.Enum): ALL_DISABLE_MERGE = enum.auto() +class Status(str, enum.Enum): + """ + The ingestion status of samples in the manifest. + + READY = the VCF file can be ingested + OK = the VCF file was successfully ingested + MISSING_INDEX = the VCF file does not have a corresponding index file + MISSING_SAMPLE_NAME = the VCF file does not have a sample name + MULTIPLE_SAMPLES = the VCF file has multiple sample names + DUPLICATE_SAMPLE_NAME = one or more VCF files have duplicate sample names + BAD_INDEX = the VCF index file could not be properly read + """ + + READY = "ready" + OK = "ok" + MISSING_INDEX = "missing index" + MISSING_SAMPLE_NAME = "missing sample name" + MULTIPLE_SAMPLES = "multiple samples" + DUPLICATE_SAMPLE_NAME = "duplicate sample name" + BAD_INDEX = "bad index" + + def __str__(self) -> str: + return self.value + + def get_logger_wrapper( verbose: bool = False, ) -> logging.Logger: @@ -197,7 +222,7 @@ def create_dataset_udf( # Check if the dataset already exists with tiledb.scope_ctx(config): if tiledb.object_type(dataset_uri) != "group": - logger.info("Creating dataset: %r", dataset_uri) + logger.info(f"Creating dataset: {dataset_uri=}") # vcf_attrs overrides extra_attrs if vcf_attrs: @@ -233,7 +258,7 @@ def create_dataset_udf( write_log_event(log_uri, "create_dataset_udf", "create", data=dataset_uri) else: - logger.info("Using existing dataset: %r", dataset_uri) + logger.info(f"Using existing dataset: {dataset_uri=}") return dataset_uri @@ -274,17 +299,17 @@ def register_dataset_udf( ) except Exception: - # tiledb.object_type raises an exception if the namespace does not - # exist + # tiledb.object_type raises an exception if the namespace does not exist logger.error( - "Error checking if %r is registered. Bad namespace?", tiledb_uri + f"Error when checking if dataset is registered: {tiledb_uri=}. " + "Bad namespace?" ) raise if found: - logger.info("Dataset already registered at %r.", tiledb_uri) + logger.info(f"Dataset is already registered: {tiledb_uri=}") else: - logger.info("Registering dataset at %r.", tiledb_uri) + logger.info(f"Registering dataset: {tiledb_uri=}") tiledb.cloud.groups.register( dataset_uri, name=register_name, @@ -322,15 +347,15 @@ def read_uris_udf( cmd = ("zcat", "-f") process_stream(uri=list_uri, cmd=cmd, output_uri=local_list) - result = [] + vcf_uris = [] for line in open(local_list): - result.append(line.strip()) - if max_files and len(result) == max_files: + vcf_uris.append(line.strip()) + if max_files and len(vcf_uris) == max_files: break - logger.info("Found %d VCF files.", len(result)) + logger.info(f"Reading VCF URIs from URI: {list_uri=}, {len(vcf_uris)=}") - return result + return vcf_uris def read_metadata_uris_udf( @@ -360,15 +385,18 @@ def read_metadata_uris_udf( with Profiler(group_uri=dataset_uri, group_member=LOG_ARRAY) as prof: with tiledb.open(metadata_uri) as A: df = A.query(dims=[], attrs=[metadata_attr]).df[:] - results = df[metadata_attr].to_list() + vcf_uris = df[metadata_attr].to_list() if max_files: - results = results[:max_files] + vcf_uris = vcf_uris[:max_files] - logger.info("Read %d VCF URIs from the metadata array.", len(results)) - prof.write("count", len(results)) + logger.info( + "Reading VCF URIs from the metadata array: " + f"{metadata_uri=}, {metadata_attr=}, {len(vcf_uris)=}" + ) + prof.write("count", len(vcf_uris)) - return results + return vcf_uris def find_uris_udf( @@ -452,7 +480,7 @@ def callback(uri, size_bytes): # Add one trailing slash to search_uri search_uri = search_uri.rstrip("/") + "/" - results = find( + vcf_uris = find( search_uri, config=config, include=include, @@ -460,10 +488,13 @@ def callback(uri, size_bytes): max_count=max_files, ) - logger.info("Found %d VCF files.", len(results)) - prof.write("count", len(results)) + logger.info( + "Searching for VCF URIs: " + f"{search_uri=}, {include=}, {exclude=}, {len(vcf_uris)=}" + ) + prof.write("count", len(vcf_uris)) - return results + return vcf_uris def find_uris_aws_udf( @@ -547,16 +578,19 @@ def find_uris_aws_udf( # Build list of URIs from command output. # Example line from s3: # (dryrun) download: s3://1000genomes-dragen-v3.7.6/foo to foo - result = [] + vcf_uris = [] if res_stdout: for line in res_stdout.splitlines(): line = line.split()[2] if use_s3 else line - result.append(line) + vcf_uris.append(line) - logger.info("Found %d VCF files.", len(result)) - prof.write("count", len(result)) + logger.info( + "Searching for VCF URIs with AWS CLI: " + f"{search_uri=}, {include=}, {exclude=}, {len(vcf_uris)=}" + ) + prof.write("count", len(vcf_uris)) - return result + return vcf_uris def filter_uris_udf( @@ -589,19 +623,19 @@ def filter_uris_udf( # Find URIs that are not in the manifest sample_uris_set = set(sample_uris) manifest_uris = set(manifest_df.vcf_uri) - result = sorted(list(sample_uris_set.difference(manifest_uris))) + new_uris = sorted(list(sample_uris_set.difference(manifest_uris))) - logger.info("%d URIs in the manifest.", len(manifest_uris)) - logger.info("%d new URIs.", len(result)) - prof.write("count", len(result)) + logger.info(f"Filtering URIs: {len(manifest_uris)=}, {len(new_uris)=}") + prof.write("count", len(new_uris)) - return result + return new_uris def filter_samples_udf( dataset_uri: str, *, config: Optional[Mapping[str, Any]] = None, + resume: bool = True, verbose: bool = False, ) -> Sequence[str]: """ @@ -609,6 +643,7 @@ def filter_samples_udf( :param dataset_uri: dataset URI :param config: config dictionary, defaults to None + :param resume: enable resume ingestion mode, defaults to True :param verbose: verbose logging, defaults to False :return: sample URIs """ @@ -624,32 +659,43 @@ def filter_samples_udf( dataset_uri, cfg=tiledbvcf.ReadConfig(tiledb_config=config), ) - existing_samples = set(ds.samples()) + dataset_samples = set(ds.samples()) - # Read all samples in the manifest with status == "ok" + # Read all samples in the manifest and samples ready for ingestion group = tiledb.Group(dataset_uri) manifest_uri = group[MANIFEST_ARRAY].uri + cond = f"status == '{Status.READY}' or status == '{Status.MISSING_INDEX}'" with tiledb.open(manifest_uri) as A: - manifest_df = A.query( - cond="status == 'ok' or status == 'missing index'" - ).df[:] - - # Sort manifest by sample_name - manifest_df = manifest_df.sort_values(by=["sample_name"]) - - # Find samples that have not already been ingested + manifest_df = A.df[:] + ready_df = A.query(cond=cond).df[:] manifest_samples = set(manifest_df.sample_name) - new_samples = manifest_samples.difference(existing_samples) - manifest_df = manifest_df[manifest_df.sample_name.isin(new_samples)] - result = manifest_df.vcf_uri.to_list() - logger.info("%d samples in the manifest.", len(manifest_samples)) - logger.info("%d samples already ingested.", len(existing_samples)) - logger.info("%d new samples to ingest.", len(result)) - prof.write("count", len(result)) + # Sort manifest by sample_name + ready_df = ready_df.sort_values(by=["sample_name"]) + ready_samples = set(ready_df.sample_name) + + # Find samples that have already been ingested or failed to ingest + ingested_samples = dataset_samples.difference(ready_samples) + incomplete_samples = dataset_samples.intersection(ready_samples) + + # Finalize samples to ingest + if not resume: + ready_df = ready_df[~ready_df.sample_name.isin(incomplete_samples)] + queued_samples = ready_df.vcf_uri.to_list() + + logger.info( + "Filtering samples: " + f"{len(dataset_samples)=}, " + f"{len(ingested_samples)=}, " + f"{len(incomplete_samples)=}, " + f"{len(manifest_samples)=}, " + f"{len(ready_samples)=}, " + f"{len(queued_samples)=}" + ) + prof.write("count", len(queued_samples)) - return result + return queued_samples def ingest_manifest_udf( @@ -690,26 +736,23 @@ def file_size(uri: str) -> int: values = defaultdict(list) for vcf_uri in sample_uris: - status = "ok" + status = Status.READY # Check for sample name issues try: sample_name = get_sample_name(vcf_uri) except Exception: - logger.warning( - "Skipping invalid VCF file: %r", - vcf_uri, - ) + logger.warning(f"Skipping invalid VCF file: {vcf_uri=}") continue if not sample_name: - status = "missing sample name" + status = Status.MISSING_SAMPLE_NAME elif len(sample_name.split(",")) > 1: - status = "multiple samples" + status = Status.MULTIPLE_SAMPLES elif sample_name in keys: # TODO: check for duplicate sample names across all # ingest_manifest_udf calls - status = "duplicate sample name" + status = Status.DUPLICATE_SAMPLE_NAME # Generate a unique sample name for the manifest sample_name_base = sample_name i = 0 @@ -720,14 +763,14 @@ def file_size(uri: str) -> int: # Check for index issues index_uri = find_index(vcf_uri) if not index_uri: - status = "" if status == "ok" else status + "," - status += "missing index" + status = "" if status == Status.READY else status + "," + status += Status.MISSING_INDEX records = 0 else: records = get_record_count(vcf_uri, index_uri) if records is None: - status = "" if status == "ok" else status + "," - status += "bad index" + status = "" if status == Status.READY else status + "," + status += Status.BAD_INDEX records = 0 keys.append(sample_name) @@ -760,6 +803,7 @@ def ingest_samples_udf( verbose: bool = False, trace_id: Optional[str] = None, use_remote_tmp: bool = False, + disable_manifest: bool = False, ) -> None: """ Ingest samples into the dataset. @@ -780,6 +824,7 @@ def ingest_samples_udf( :param trace_id: trace ID for logging, defaults to None :param use_remote_tmp: use remote tmp space if VCFs need to be bgzipped, defaults to False (preferred for small VCFs) + :param disable_manifest: disable manifest update, defaults to False """ import tiledbvcf @@ -810,12 +855,12 @@ def create_index_file_worker(uri: str) -> Optional[str]: """ with tiledb.scope_ctx(config): if create_index or not find_index(uri): - logger.info("indexing %r", uri) + logger.info(f"Indexing VCF: {uri=}") try: create_index_file(uri) except RuntimeError as exc: logger.warning("%r: %s", uri, exc) - logger.info("sort, bgzip, and index %r", uri) + logger.info(f"sort, bgzip, and index: {uri=}") try: uri = sort_and_bgzip(uri, tmp_space=tmp_space) tmp_uris.append(uri) @@ -832,6 +877,7 @@ def create_index_file_worker(uri: str) -> Optional[str]: # Filter out failed index operations sample_uris = [uri for uri in sample_uris if uri] + # Ingest samples with indexes level = "debug" if verbose else "info" tiledbvcf.config_logging(level, "ingest.log") ds = tiledbvcf.Dataset( @@ -852,14 +898,32 @@ def create_index_file_worker(uri: str) -> Optional[str]: prof.write("log", extra=read_file("ingest.log")) + # Update manifest status for ingested samples + if not disable_manifest: + group = tiledb.Group(dataset_uri) + manifest_uri = group[MANIFEST_ARRAY].uri + + with tiledb.open(manifest_uri) as A: + manifest_df = A.query(cond=f"vcf_uri in {sample_uris}").df[:] + manifest_df["status"] = Status.OK + + manifest_dict = manifest_df.to_dict(orient="list") + sample_names = manifest_dict["sample_name"] + with tiledb.open(manifest_uri, "d") as A: + A.query(cond=f"sample_name in {sample_names}").submit() + + del manifest_dict["sample_name"] + with tiledb.open(manifest_uri, "w") as A: + A[sample_names] = manifest_dict + # Cleanup tmp space if use_remote_tmp and tmp_uris: vfs = tiledb.VFS() for uri in tmp_uris: - logger.debug("removing %r", uri) + logger.debug(f"Removing tmp file: {uri=}") vfs.remove_file(uri) - logger.info("max memory usage: %.3f GiB", max_memory_usage() / (1 << 30)) + logger.info("Max memory usage: %.3f GiB", max_memory_usage() / (1 << 30)) def consolidate_dataset_udf( @@ -918,7 +982,9 @@ def consolidate_dataset_udf( modes += ["fragments"] for mode in modes: - logger.debug("Consolidating %r in %r (%s)", mode, uri, name) + logger.debug( + f"Consolidating dataset array: {mode=}, {uri=}, {name=}" + ) config = tiledb.Config({"sm.consolidation.mode": mode}) try: tiledb.consolidate(uri, config=config) @@ -926,14 +992,14 @@ def consolidate_dataset_udf( print(e) for mode in modes: - logger.debug("Vacuuming %r in %r (%s)", mode, uri, name) + logger.debug(f"Vacuuming dataset array: {mode=}, {uri=}, {name=}") config = tiledb.Config({"sm.vacuum.mode": mode}) try: tiledb.vacuum(uri, config=config) except Exception as e: print(e) - logger.info("max memory usage: %.3f GiB", max_memory_usage() / (1 << 30)) + logger.info("Max memory usage: %.3f GiB", max_memory_usage() / (1 << 30)) # -------------------------------------------------------------------- @@ -1090,10 +1156,10 @@ def ingest_manifest_dag( sample_uris = filtered_sample_uris.result() if not sample_uris: - logger.info("All samples found are already in the manifest.") + logger.info("All samples found are already in the manifest") return - logger.info("Found %d new URIs.", len(sample_uris)) + logger.info(f"Found new URIs: {len(sample_uris)=}") graph = dag.DAG( name="vcf-populate-manifest", @@ -1152,7 +1218,7 @@ def ingest_manifest_dag( if consolidate: consolidate.depends_on(ingest) - logger.info("Populating the manifest.") + logger.info("Populating the manifest") run_dag(graph) @@ -1174,6 +1240,8 @@ def ingest_samples_dag( consolidate_stats: bool = False, use_remote_tmp: bool = False, sample_list_uri: Optional[str] = None, + disable_manifest: bool = False, + wait: bool = False, ingest_resources: Optional[Mapping[str, str]] = None, consolidate_resources: Optional[Mapping[str, str]] = CONSOLIDATE_RESOURCES, filter_samples_resources: Optional[Mapping[str, str]] = FILTER_SAMPLES_RESOURCES, @@ -1205,6 +1273,8 @@ def ingest_samples_dag( :param use_remote_tmp: use remote tmp space if VCFs need to be bgzipped, defaults to False (preferred for small VCFs) :param sample_list_uri: URI with a list of VCF URIs, defaults to None + :param disable_manifest: disable manifest update, defaults to False + :param wait: wait for the ingestion to complete before returning, defaults to False :param ingest_resources: manual override for ingest UDF resources, defaults to None :param consolidate_resources: manual override for consolidate UDF resources, @@ -1234,11 +1304,11 @@ def ingest_samples_dag( ) # Get list of sample uris that have not been ingested yet - # TODO: handle second pass resume sample_uris = graph.submit( filter_samples_udf, dataset_uri, config=config, + resume=resume, verbose=verbose, name="Filter VCF samples", resources=filter_samples_resources, @@ -1250,7 +1320,7 @@ def ingest_samples_dag( sample_uris = sample_uris.result() if not sample_uris: - logger.info("No new samples to ingest.") + logger.info("No samples to ingest") return None, [] # Limit number of samples to ingest @@ -1296,9 +1366,13 @@ def ingest_samples_dag( if ingest_resources is None: ingest_resources = {"cpu": f"{threads}", "memory": f"{node_memory_mb}Mi"} - logger.debug("partitions=%d, consolidates=%d", num_partitions, num_consolidates) - logger.debug("ingest_resources=%s", ingest_resources) - logger.debug("consolidate_resources=%s", consolidate_resources) + logger.debug( + "Ingesting samples DAG configuration: " + f"{num_partitions=}, " + f"{num_consolidates=}, " + f"{ingest_resources=}, " + f"{consolidate_resources=}" + ) # This loop creates a DAG with the following structure: # - Submit N ingest tasks in parallel, where N is `workers` or less @@ -1313,6 +1387,8 @@ def ingest_samples_dag( consolidate_dataset_udf, dataset_uri, config=config, + exclude=None, + include=[MANIFEST_ARRAY, LOG_ARRAY], id=f"vcf-consol-{i // workers}", verbose=verbose, resources=consolidate_resources, @@ -1340,6 +1416,7 @@ def ingest_samples_dag( resources=ingest_resources, name=f"Ingest VCF {i + 1}/{num_partitions}", access_credentials_name=acn, + disable_manifest=disable_manifest, ) if prev_consolidate: @@ -1375,15 +1452,15 @@ def group_member_uri(group_uri, group_member, config): group_fragments_resources=group_fragments_resources, ) - logger.info("Ingesting %d samples.", len(sample_uris)) - run_dag(graph, wait=False) + logger.info(f"Ingesting samples: {len(sample_uris)=}") logger.info( - "VCF samples ingestion submitted -" - " https://cloud.tiledb.com/activity/taskgraphs/%s/%s", + "VCF samples ingestion submitted: " + "https://cloud.tiledb.com/activity/taskgraphs/%s/%s", graph.namespace, graph.server_graph_uuid, ) + run_dag(graph, wait=wait) # -------------------------------------------------------------------- @@ -1448,7 +1525,7 @@ def ingest_vcf_annotations( raise ValueError("acn must be provided to register the dataset") logger = get_logger_wrapper(verbose) - logger.info("Ingesting annotation VCF into %r", dataset_uri) + logger.info("Ingesting annotation VCF: {dataset_uri=}") if search_uri: # Create and run DAG to find VCF URIs. @@ -1536,6 +1613,7 @@ def ingest_vcf_annotations( resources=ingest_resources, name=f"Ingest annotations {i + 1}/{len(vcf_uris)}", access_credentials_name=acn, + disable_manifest=True, ) # Set dependencies so that the ingest nodes run in parallel @@ -1600,6 +1678,7 @@ def ingest_vcf( aws_find_mode: bool = False, use_remote_tmp: bool = False, disable_manifest: bool = False, + wait: bool = False, ingest_resources: Optional[Mapping[str, str]] = None, consolidate_resources: Optional[Mapping[str, str]] = CONSOLIDATE_RESOURCES, manifest_resources: Optional[Mapping[str, str]] = MANIFEST_RESOURCES, @@ -1656,6 +1735,7 @@ def ingest_vcf( :param use_remote_tmp: use remote tmp space if VCFs need to be sorted and bgzipped, defaults to False (preferred for small VCFs) :param disable_manifest: disable manifest creation, defaults to False + :param wait: wait for the ingestion to complete before returning, defaults to False :param ingest_resources: manual override for ingest UDF resources, defaults to None :param consolidate_resources: manual override for consolidate UDF resources, defaults to CONSOLIDATE_RESOURCES @@ -1692,7 +1772,7 @@ def ingest_vcf( dataset_uri = dataset_uri.rstrip("/") logger = get_logger_wrapper(verbose) - logger.info("Ingesting VCF samples into %r", dataset_uri) + logger.info(f"Ingesting VCF samples: {dataset_uri=}") # Add VCF URIs to the manifest ingest_manifest_dag( @@ -1742,6 +1822,8 @@ def ingest_vcf( consolidate_stats=consolidate_stats, use_remote_tmp=use_remote_tmp, sample_list_uri=sample_list_uri if disable_manifest else None, + disable_manifest=disable_manifest, + wait=wait, consolidate_resources=consolidate_resources, filter_samples_resources=filter_samples_resources, group_fragments_resources=group_fragments_resources, diff --git a/tests/vcf/test_ingest.py b/tests/vcf/test_ingest.py new file mode 100644 index 000000000..fd2759c64 --- /dev/null +++ b/tests/vcf/test_ingest.py @@ -0,0 +1,315 @@ +import io +import logging +import re +import unittest +from contextlib import redirect_stdout + +import tiledb.cloud +import tiledb.cloud.vcf +from tiledb.cloud._vendor import cloudpickle +from tiledb.cloud.utilities import get_logger + + +def ingest_samples_with_failure_dag(*args, config={}, **kwargs) -> None: + """ + Wraps the original `ingest_samples_dag` function and reads the config to determine + if an ingestion failure should be simulated, i.e. the manifest is not updated after + ingestion. + """ + from tiledb.cloud.vcf.ingestion import _ingest_samples_dag + + if config.get("test.vcf.fail_ingest", False): + kwargs["disable_manifest"] = True + return _ingest_samples_dag(*args, config=config, **kwargs) + + +# Monkey patch the failure function +tiledb.cloud.vcf.ingestion._ingest_samples_dag = ( + tiledb.cloud.vcf.ingestion.ingest_samples_dag +) +tiledb.cloud.vcf.ingestion.ingest_samples_dag = ingest_samples_with_failure_dag + +# Pickle the vcf module by value, so tests run on the latest code. +cloudpickle.register_pickle_by_value(tiledb.cloud.vcf) + +# Test data location +S3_BUCKET = "s3://tiledb-unittest/vcf-ingestion-test" + +# Test VCF URIs +READY_URI = f"{S3_BUCKET}/01-ready.vcf.gz" # "OK" after ingestion +MISSING_INDEX_URI = f"{S3_BUCKET}/02-missing-index.vcf.gz" # "OK" after ingestion +MISSING_SAMPLE_NAME_URI = f"{S3_BUCKET}/03-missing-sample-name.vcf.gz" +MULTIPLE_SAMPLES_URI = f"{S3_BUCKET}/04-multiple-samples.vcf.gz" +DUPLICATE_SAMPLE_NAME_URI = f"{S3_BUCKET}/05-duplicate-sample-name.vcf.gz" +BAD_INDEX_URI = f"{S3_BUCKET}/06-bad-index.vcf.gz" + +# Test data samples +READY_SAMPLE_NAME = "ready" +MISSING_INDEX_SAMPLE_NAME = "missing index" + +# Array names +LOG_ARRAY = "log" +MANIFEST_ARRAY = "manifest" + +# Filter samples log message +FILTER_SAMPLES_LOG = ( + "Filtering samples: " + "len(dataset_samples)={}, " + "len(ingested_samples)={}, " + "len(incomplete_samples)={}, " + "len(manifest_samples)={}, " + "len(ready_samples)={}, " + "len(queued_samples)={}" +) + + +def outputs2msgs(outputs: list[str]) -> list[str]: + """ + Extracts the messages from log outputs with the format + "[%(asctime)s] [%(module)s] [%(funcName)s] [%(levelname)s] %(message)s" + """ + pattern = re.compile(r"\[.*\] \[.*\] \[.*\] \[.*\] ") + msgs = [] + for o in outputs: + if pattern.match(o): + msg = pattern.sub("", o) + msgs.append(msg) + else: + msgs.append(o) + return msgs + + +class TestVCFIngestionBase(unittest.TestCase): + __unittest_skip__ = True + + @classmethod + def _setup(cls) -> None: + cls.data_uri = S3_BUCKET + ( + cls.namespace, + cls.storage_path, + cls.acn, + ) = tiledb.cloud.groups._default_ns_path_cred() + cls.namespace = cls.namespace.rstrip("/") + cls.storage_path = cls.storage_path.rstrip("/") + cls.array_name = tiledb.cloud._common.testonly.random_name("vcf-test") + cls.dataset_uri = ( + f"tiledb://{cls.namespace}/{cls.storage_path}/{cls.array_name}" + ) + + @classmethod + def _ingest(cls) -> None: + raise NotImplementedError("Implement VCF ingestion") + + @classmethod + def setUpClass(cls) -> None: + cls._setup() + + # Capture local and cloud logs during ingestion + logger = get_logger(logging.INFO) + f = io.StringIO() # cloud logs are printed + with cls.assertLogs(cls, logger=logger) as lg, redirect_stdout(f): + cls._ingest() + local_logs = list(map(lambda r: r.getMessage(), lg.records)) + cloud_logs = outputs2msgs(f.getvalue().splitlines()) + cls.logs = local_logs + cloud_logs + + @classmethod + def tearDownClass(cls) -> None: + if tiledb.object_type(cls.dataset_uri): + tiledb.cloud.asset.delete(cls.dataset_uri, recursive=True) + + +class TestVCFIngestionCommon(TestVCFIngestionBase): + __unittest_skip__ = True + + def test_dataset_creation(self): + self.assertIn(f"Creating dataset: dataset_uri='{self.dataset_uri}'", self.logs) + + def test_dataset_group_and_arrays(self): + self.assertEqual(tiledb.object_type(self.dataset_uri), "group") + group = tiledb.Group(self.dataset_uri) + manifest_uri = group[MANIFEST_ARRAY].uri + log_uri = group[LOG_ARRAY].uri + self.assertEqual(tiledb.object_type(manifest_uri), "array") + self.assertEqual(tiledb.object_type(log_uri), "array") + + def test_filter_uris(self): + self.assertIn( + "Filtering URIs: len(manifest_uris)=0, len(new_uris)=6", self.logs + ) + + def test_filter_samples(self): + self.assertIn(FILTER_SAMPLES_LOG.format(0, 0, 0, 5, 2, 2), self.logs) + + def test_manifest(self): + group = tiledb.Group(self.dataset_uri) + manifest_uri = group[MANIFEST_ARRAY].uri + with tiledb.open(manifest_uri) as A: + manifest_df = A.df[:] + + ok_df = manifest_df[manifest_df["status"] == "ok"] + self.assertEqual(len(ok_df), 2) + ok_vcfs = ok_df["vcf_uri"].tolist() + ok_indexes = ok_df["index_uri"].tolist() + self.assertIn(READY_URI, ok_vcfs) + self.assertIn(READY_URI + ".tbi", ok_indexes) + self.assertIn(MISSING_INDEX_URI, ok_vcfs) + self.assertIn("None", ok_indexes) + + # NOTE: This code path is currently unreachable; an error is logged instead + self.assertIn( + f"Skipping invalid VCF file: vcf_uri='{MISSING_SAMPLE_NAME_URI}'", self.logs + ) + + multiple_samples_df = manifest_df[manifest_df["status"] == "multiple samples"] + self.assertEqual(len(multiple_samples_df), 1) + + duplicate_sample_name_df = manifest_df[ + manifest_df["status"] == "duplicate sample name" + ] + self.assertEqual(len(duplicate_sample_name_df), 1) + + bad_index_df = manifest_df[manifest_df["status"] == "bad index"] + self.assertEqual(len(bad_index_df), 1) + + def test_dataset(self): + import tiledbvcf + + ds = tiledbvcf.Dataset( + self.dataset_uri, + cfg=tiledbvcf.ReadConfig(tiledb_config=self.config), + ) + samples = ds.samples() + self.assertEqual(len(samples), 2) + self.assertIn(READY_SAMPLE_NAME, samples) + self.assertIn(MISSING_INDEX_SAMPLE_NAME, samples) + + +class TestVCFIngestionSearch(TestVCFIngestionCommon): + __unittest_skip__ = True + + @classmethod + def _setup(cls): + super(TestVCFIngestionSearch, cls)._setup() + cls.search_uri = cls.data_uri + "/" + cls.search_pattern = "*.vcf.gz" + + @classmethod + def _ingest(cls) -> None: + tiledb.cloud.vcf.ingest_vcf( + dataset_uri=cls.dataset_uri, + search_uri=cls.search_uri, + pattern=cls.search_pattern, + config=cls.config, + wait=True, + ) + + def test_find_uris_logs(self): + msg = ( + "Searching for VCF URIs: " + f"search_uri='{self.search_uri}', " + f"include='{self.search_pattern}', " + "exclude=None, " + "len(vcf_uris)=6" + ) + self.assertIn(msg, self.logs) + + +class TestVCFIngestionSampleList(TestVCFIngestionCommon): + __unittest_skip__ = True + + @classmethod + def _setup(cls): + super(TestVCFIngestionSampleList, cls)._setup() + cls.sample_list_uri = cls.data_uri + "/sample-list.txt" + + @classmethod + def _ingest(cls) -> None: + tiledb.cloud.vcf.ingest_vcf( + dataset_uri=cls.dataset_uri, + sample_list_uri=cls.sample_list_uri, + config=cls.config, + wait=True, + ) + + def test_read_uris_logs(self): + msg = ( + "Reading VCF URIs from URI: " + f"list_uri='{self.sample_list_uri}', len(vcf_uris)=6" + ) + self.assertIn(msg, self.logs) + + +class TestVCFIngestionMetadata(TestVCFIngestionCommon): + __unittest_skip__ = True + + @classmethod + def _setup(cls): + super(TestVCFIngestionMetadata, cls)._setup() + cls.metadata_uri = cls.data_uri + "/metadata-array/" + + @classmethod + def _ingest(cls) -> None: + tiledb.cloud.vcf.ingest_vcf( + dataset_uri=cls.dataset_uri, + metadata_uri=cls.metadata_uri, + config=cls.config, + wait=True, + ) + + def test_read_metadata_logs(self): + msg = ( + "Reading VCF URIs from the metadata array: " + f"metadata_uri='{self.metadata_uri}', metadata_attr='uri', len(vcf_uris)=6" + ) + self.assertIn(msg, self.logs) + + +class TestVCFIngestionResume(TestVCFIngestionBase): + __unittest_skip__ = True + + @classmethod + def _setup(cls): + super(TestVCFIngestionResume, cls)._setup() + cls.metadata_uri = cls.data_uri + "/metadata-array/" + + @classmethod + def _ingest(cls) -> None: + # Ingest with "failure" + tiledb.cloud.vcf.ingest_vcf( + dataset_uri=cls.dataset_uri, + metadata_uri=cls.metadata_uri, + config=cls.config | {"test.vcf.fail_ingest": True}, + wait=True, + ) + # Re-ingest without resume + tiledb.cloud.vcf.ingest_vcf( + dataset_uri=cls.dataset_uri, + metadata_uri=cls.metadata_uri, + config=cls.config, + wait=True, + resume=False, + ) + # Re-ingest with resume + tiledb.cloud.vcf.ingest_vcf( + dataset_uri=cls.dataset_uri, + metadata_uri=cls.metadata_uri, + config=cls.config, + wait=True, + ) + + def test_existing_dataset(self): + self.assertIn( + f"Using existing dataset: dataset_uri='{self.dataset_uri}'", + self.logs, + ) + + def test_failed_ingest(self): + self.assertIn(FILTER_SAMPLES_LOG.format(0, 0, 0, 5, 2, 2), self.logs) + + def test_no_resume(self): + self.assertIn(FILTER_SAMPLES_LOG.format(2, 0, 2, 5, 2, 0), self.logs) + + def test_resume(self): + self.assertIn(FILTER_SAMPLES_LOG.format(2, 0, 2, 5, 2, 2), self.logs)