From 4442d1c20a5cf2dc2c5ced4574fb4aabe86b5097 Mon Sep 17 00:00:00 2001 From: "J.T. Baker" Date: Wed, 21 Jan 2026 15:41:40 -0600 Subject: [PATCH] added a safety check to make sure the vcf file isn't empty --- scripts/extract_tracts.py | 146 ++++++++++++++++++++++++++------------ 1 file changed, 101 insertions(+), 45 deletions(-) diff --git a/scripts/extract_tracts.py b/scripts/extract_tracts.py index d7327e6..6487acc 100755 --- a/scripts/extract_tracts.py +++ b/scripts/extract_tracts.py @@ -45,38 +45,51 @@ --compress-output Compress output dosage, hapcount, and VCF files (default: False). """ + def parse_vcf_filepath(vcf_file_path): if not os.path.exists(vcf_file_path): raise FileNotFoundError(f"File not found: {vcf_file_path}") + if os.path.getsize(vcf_file_path) == 0: + err_msg = f"The provided vcf file, {vcf_file_path}, was empty" + logger.critical(err_msg) + raise ValueError(err_msg) + zipped = False path, filename = os.path.split(vcf_file_path) # check if the file is compressed (ends with '.gz') - if filename.endswith('.vcf.gz'): + if filename.endswith(".vcf.gz"): zipped = True - prefix = os.path.splitext(os.path.splitext(filename)[0])[0] # remove '.vcf.gz' extension - elif filename.endswith('.vcf'): - prefix = os.path.splitext(filename)[0] # remove '.vcf' extension + prefix = os.path.splitext(os.path.splitext(filename)[0])[ + 0 + ] # remove '.vcf.gz' extension + elif filename.endswith(".vcf"): + prefix = os.path.splitext(filename)[0] # remove '.vcf' extension else: # File has an unexpected extension - raise ValueError(f"Unexpected file extension: {filename}. VCF file must end in .vcf or .vcf.gz") + raise ValueError( + f"Unexpected file extension: {filename}. VCF file must end in .vcf or .vcf.gz" + ) - return { - 'path': path, - 'prefix': prefix, - 'zipped': zipped - } + return {"path": path, "prefix": prefix, "zipped": zipped} -def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=None, compress_output=None): +def extract_tracts( + vcf=str, + msp=str, + num_ancs=int, + output_dir=None, + output_vcf=None, + compress_output=None, +): # Housekeeping checks. if not os.path.exists(vcf): raise ValueError(f"The path '{vcf}' does not exist.") if not os.path.exists(msp): - raise ValueError(f"The path '{msp}' does not exist.") + raise ValueError(f"The path '{msp}' does not exist.") if not isinstance(num_ancs, int): raise ValueError("The 'num_ancs' must be an integer.") if output_dir is not None: @@ -85,13 +98,19 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N if not isinstance(output_vcf, bool): raise ValueError("The 'output_vcf' must be a boolean value (True or False).") if not isinstance(compress_output, bool): - raise ValueError("The 'compress_output' must be a boolean value (True or False).") - if compress_output==True: + raise ValueError( + "The 'compress_output' must be a boolean value (True or False)." + ) + if compress_output == True: logger.info("Files will be gzip compressed (not bgzip)") vcf_info = parse_vcf_filepath(vcf) - vcf_path, vcf_prefix, zipped = vcf_info['path'], vcf_info['prefix'], vcf_info['zipped'] - + vcf_path, vcf_prefix, zipped = ( + vcf_info["path"], + vcf_info["prefix"], + vcf_info["zipped"], + ) + # Log version number logger.info("# Running script version : %s", __version__) logger.info("# VCF File : %s", vcf) @@ -109,13 +128,21 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N logger.info("Creating output files for %d ancestries", num_ancs) for i in range(num_ancs): output_files[f"dos{i}"] = f"{output_path}.anc{i}.dosage.txt{file_extension}" - output_files[f"ancdos{i}"] = f"{output_path}.anc{i}.hapcount.txt{file_extension}" + output_files[f"ancdos{i}"] = ( + f"{output_path}.anc{i}.hapcount.txt{file_extension}" + ) if output_vcf: output_files[f"vcf{i}"] = f"{output_path}.anc{i}.vcf{file_extension}" - with open(msp) as mspfile, gzip.open(vcf, "rt") if zipped else open(vcf) as vcf, contextlib.ExitStack() as stack: + with open(msp) as mspfile, ( + gzip.open(vcf, "rt") if zipped else open(vcf) + ) as vcf, contextlib.ExitStack() as stack: files = { - fname: stack.enter_context(gzip.open(output_file, "wt") if compress_output else open(output_file, "w")) + fname: stack.enter_context( + gzip.open(output_file, "wt") + if compress_output + else open(output_file, "w") + ) for fname, output_file in output_files.items() } vcf_header = "" @@ -123,7 +150,7 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N logger.info("Iterating through VCF file") for line in vcf: - skip_line=False + skip_line = False if line.startswith("##"): if output_vcf: vcf_header += line @@ -131,7 +158,12 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N elif line.startswith("#"): if output_vcf: vcf_header += line - anc_header = "\t".join([line.strip("# ").split("\t", 9)[item] for item in [0, 1, 2, 3, 4, 9] ]) + anc_header = "\t".join( + [ + line.strip("# ").split("\t", 9)[item] + for item in [0, 1, 2, 3, 4, 9] + ] + ) for i in range(num_ancs): # Write header into each output vcf files[f"dos{i}"].write(anc_header) @@ -164,9 +196,9 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N # saves the current line until out of the window, then checks next line. VCF and window switches file should be in incremental order. while not (row[0] == window[0] and (window[1] <= pos < window[2])): - if row[0] == window[0] and window[1]> pos: - skip_line=True #Skip VCF line - break #Break out of msp scanning because the VCF position is still before the windows start + if row[0] == window[0] and window[1] > pos: + skip_line = True # Skip VCF line + break # Break out of msp scanning because the VCF position is still before the windows start ancs = mspfile.readline() if ancs.startswith("#"): # skip the header lines continue @@ -176,21 +208,25 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N ancs_entry = ancs.strip().split("\t", 6) calls = ancs_entry[6].split("\t") window = (ancs_entry[0], int(ancs_entry[1]), int(ancs_entry[2])) - if row[0] == window[0] and window[1]> pos: - skip_line=True #Skip VCF line - break #Break out of msp scanning because the VCF position is before the windows start + if row[0] == window[0] and window[1] > pos: + skip_line = True # Skip VCF line + break # Break out of msp scanning because the VCF position is before the windows start if skip_line: - logger.info(f"VCF position, {pos} is not in an msp window, skipping site") - continue # Skip VCF site and continue to next site + logger.info( + f"VCF position, {pos} is not in an msp window, skipping site" + ) + continue # Skip VCF site and continue to next site # index by the number of individuals in the VCF file, should be half the number in the calls file for i, geno in enumerate(genos): - geno_parts = geno.split(":")[0].split("|") # assert incase eagle leaves some genos unphased - geno_a,geno_b = map(str, geno_parts[:2]) - call_a = str(calls[2*i]) - call_b = str(calls[2*i + 1]) - - counts = {anc: 0 for anc in range(num_ancs)} + geno_parts = geno.split(":")[0].split( + "|" + ) # assert incase eagle leaves some genos unphased + geno_a, geno_b = map(str, geno_parts[:2]) + call_a = str(calls[2 * i]) + call_b = str(calls[2 * i + 1]) + + counts = {anc: 0 for anc in range(num_ancs)} anc_counts = {anc: 0 for anc in range(num_ancs)} for j in range(num_ancs): if output_vcf: @@ -231,18 +267,38 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N logger.info("Finished extracting tracts per %d ancestries", num_ancs) + if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("--vcf",required=True,help="Path to phased genotypes, VCF file (*.vcf or *.vcf.gz)") - parser.add_argument("--msp",required=True,help="Path to ancestry calls, MSP file (*.msp or *.msp.tsv)") - parser.add_argument("--num-ancs",type=int,required=True, - help="Number of ancestral populations within the VCF file.") - parser.add_argument("--output-dir", help="Directory for output files. Directory must already exist.") - parser.add_argument("--output-vcf", help="Boolean. If ancestry-specific VCF files need to be generated", - action="store_true") - parser.add_argument("--compress-output", help="gzip (not bgzip) all output files", - action="store_true") + parser.add_argument( + "--vcf", + required=True, + help="Path to phased genotypes, VCF file (*.vcf or *.vcf.gz)", + ) + parser.add_argument( + "--msp", + required=True, + help="Path to ancestry calls, MSP file (*.msp or *.msp.tsv)", + ) + parser.add_argument( + "--num-ancs", + type=int, + required=True, + help="Number of ancestral populations within the VCF file.", + ) + parser.add_argument( + "--output-dir", help="Directory for output files. Directory must already exist." + ) + parser.add_argument( + "--output-vcf", + help="Boolean. If ancestry-specific VCF files need to be generated", + action="store_true", + ) + parser.add_argument( + "--compress-output", + help="gzip (not bgzip) all output files", + action="store_true", + ) args = parser.parse_args() extract_tracts(**vars(args)) -