Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 101 additions & 45 deletions scripts/extract_tracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,38 +45,51 @@
--compress-output Compress output dosage, hapcount, and VCF files (default: False).
"""


def parse_vcf_filepath(vcf_file_path):

if not os.path.exists(vcf_file_path):
raise FileNotFoundError(f"File not found: {vcf_file_path}")

if os.path.getsize(vcf_file_path) == 0:
err_msg = f"The provided vcf file, {vcf_file_path}, was empty"
logger.critical(err_msg)
raise ValueError(err_msg)

zipped = False
path, filename = os.path.split(vcf_file_path)

# check if the file is compressed (ends with '.gz')
if filename.endswith('.vcf.gz'):
if filename.endswith(".vcf.gz"):
zipped = True
prefix = os.path.splitext(os.path.splitext(filename)[0])[0] # remove '.vcf.gz' extension
elif filename.endswith('.vcf'):
prefix = os.path.splitext(filename)[0] # remove '.vcf' extension
prefix = os.path.splitext(os.path.splitext(filename)[0])[
0
] # remove '.vcf.gz' extension
elif filename.endswith(".vcf"):
prefix = os.path.splitext(filename)[0] # remove '.vcf' extension
else:
# File has an unexpected extension
raise ValueError(f"Unexpected file extension: {filename}. VCF file must end in .vcf or .vcf.gz")
raise ValueError(
f"Unexpected file extension: {filename}. VCF file must end in .vcf or .vcf.gz"
)

return {
'path': path,
'prefix': prefix,
'zipped': zipped
}
return {"path": path, "prefix": prefix, "zipped": zipped}


def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=None, compress_output=None):
def extract_tracts(
vcf=str,
msp=str,
num_ancs=int,
output_dir=None,
output_vcf=None,
compress_output=None,
):

# Housekeeping checks.
if not os.path.exists(vcf):
raise ValueError(f"The path '{vcf}' does not exist.")
if not os.path.exists(msp):
raise ValueError(f"The path '{msp}' does not exist.")
raise ValueError(f"The path '{msp}' does not exist.")
if not isinstance(num_ancs, int):
raise ValueError("The 'num_ancs' must be an integer.")
if output_dir is not None:
Expand All @@ -85,13 +98,19 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N
if not isinstance(output_vcf, bool):
raise ValueError("The 'output_vcf' must be a boolean value (True or False).")
if not isinstance(compress_output, bool):
raise ValueError("The 'compress_output' must be a boolean value (True or False).")
if compress_output==True:
raise ValueError(
"The 'compress_output' must be a boolean value (True or False)."
)
if compress_output == True:
logger.info("Files will be gzip compressed (not bgzip)")

vcf_info = parse_vcf_filepath(vcf)
vcf_path, vcf_prefix, zipped = vcf_info['path'], vcf_info['prefix'], vcf_info['zipped']

vcf_path, vcf_prefix, zipped = (
vcf_info["path"],
vcf_info["prefix"],
vcf_info["zipped"],
)

# Log version number
logger.info("# Running script version : %s", __version__)
logger.info("# VCF File : %s", vcf)
Expand All @@ -109,29 +128,42 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N
logger.info("Creating output files for %d ancestries", num_ancs)
for i in range(num_ancs):
output_files[f"dos{i}"] = f"{output_path}.anc{i}.dosage.txt{file_extension}"
output_files[f"ancdos{i}"] = f"{output_path}.anc{i}.hapcount.txt{file_extension}"
output_files[f"ancdos{i}"] = (
f"{output_path}.anc{i}.hapcount.txt{file_extension}"
)
if output_vcf:
output_files[f"vcf{i}"] = f"{output_path}.anc{i}.vcf{file_extension}"

with open(msp) as mspfile, gzip.open(vcf, "rt") if zipped else open(vcf) as vcf, contextlib.ExitStack() as stack:
with open(msp) as mspfile, (
gzip.open(vcf, "rt") if zipped else open(vcf)
) as vcf, contextlib.ExitStack() as stack:
files = {
fname: stack.enter_context(gzip.open(output_file, "wt") if compress_output else open(output_file, "w"))
fname: stack.enter_context(
gzip.open(output_file, "wt")
if compress_output
else open(output_file, "w")
)
for fname, output_file in output_files.items()
}
vcf_header = ""
window = ("", 0, 0) # initialize the current window to check

logger.info("Iterating through VCF file")
for line in vcf:
skip_line=False
skip_line = False
if line.startswith("##"):
if output_vcf:
vcf_header += line
continue
elif line.startswith("#"):
if output_vcf:
vcf_header += line
anc_header = "\t".join([line.strip("# ").split("\t", 9)[item] for item in [0, 1, 2, 3, 4, 9] ])
anc_header = "\t".join(
[
line.strip("# ").split("\t", 9)[item]
for item in [0, 1, 2, 3, 4, 9]
]
)

for i in range(num_ancs): # Write header into each output vcf
files[f"dos{i}"].write(anc_header)
Expand Down Expand Up @@ -164,9 +196,9 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N
# saves the current line until out of the window, then checks next line. VCF and window switches file should be in incremental order.

while not (row[0] == window[0] and (window[1] <= pos < window[2])):
if row[0] == window[0] and window[1]> pos:
skip_line=True #Skip VCF line
break #Break out of msp scanning because the VCF position is still before the windows start
if row[0] == window[0] and window[1] > pos:
skip_line = True # Skip VCF line
break # Break out of msp scanning because the VCF position is still before the windows start
ancs = mspfile.readline()
if ancs.startswith("#"): # skip the header lines
continue
Expand All @@ -176,21 +208,25 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N
ancs_entry = ancs.strip().split("\t", 6)
calls = ancs_entry[6].split("\t")
window = (ancs_entry[0], int(ancs_entry[1]), int(ancs_entry[2]))
if row[0] == window[0] and window[1]> pos:
skip_line=True #Skip VCF line
break #Break out of msp scanning because the VCF position is before the windows start
if row[0] == window[0] and window[1] > pos:
skip_line = True # Skip VCF line
break # Break out of msp scanning because the VCF position is before the windows start
if skip_line:
logger.info(f"VCF position, {pos} is not in an msp window, skipping site")
continue # Skip VCF site and continue to next site
logger.info(
f"VCF position, {pos} is not in an msp window, skipping site"
)
continue # Skip VCF site and continue to next site

# index by the number of individuals in the VCF file, should be half the number in the calls file
for i, geno in enumerate(genos):
geno_parts = geno.split(":")[0].split("|") # assert incase eagle leaves some genos unphased
geno_a,geno_b = map(str, geno_parts[:2])
call_a = str(calls[2*i])
call_b = str(calls[2*i + 1])

counts = {anc: 0 for anc in range(num_ancs)}
geno_parts = geno.split(":")[0].split(
"|"
) # assert incase eagle leaves some genos unphased
geno_a, geno_b = map(str, geno_parts[:2])
call_a = str(calls[2 * i])
call_b = str(calls[2 * i + 1])

counts = {anc: 0 for anc in range(num_ancs)}
anc_counts = {anc: 0 for anc in range(num_ancs)}
for j in range(num_ancs):
if output_vcf:
Expand Down Expand Up @@ -231,18 +267,38 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N

logger.info("Finished extracting tracts per %d ancestries", num_ancs)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--vcf",required=True,help="Path to phased genotypes, VCF file (*.vcf or *.vcf.gz)")
parser.add_argument("--msp",required=True,help="Path to ancestry calls, MSP file (*.msp or *.msp.tsv)")
parser.add_argument("--num-ancs",type=int,required=True,
help="Number of ancestral populations within the VCF file.")
parser.add_argument("--output-dir", help="Directory for output files. Directory must already exist.")
parser.add_argument("--output-vcf", help="Boolean. If ancestry-specific VCF files need to be generated",
action="store_true")
parser.add_argument("--compress-output", help="gzip (not bgzip) all output files",
action="store_true")
parser.add_argument(
"--vcf",
required=True,
help="Path to phased genotypes, VCF file (*.vcf or *.vcf.gz)",
)
parser.add_argument(
"--msp",
required=True,
help="Path to ancestry calls, MSP file (*.msp or *.msp.tsv)",
)
parser.add_argument(
"--num-ancs",
type=int,
required=True,
help="Number of ancestral populations within the VCF file.",
)
parser.add_argument(
"--output-dir", help="Directory for output files. Directory must already exist."
)
parser.add_argument(
"--output-vcf",
help="Boolean. If ancestry-specific VCF files need to be generated",
action="store_true",
)
parser.add_argument(
"--compress-output",
help="gzip (not bgzip) all output files",
action="store_true",
)

args = parser.parse_args()
extract_tracts(**vars(args))