From 2cfdcd584833b6fbb9fb5b23fded95d7e738a0aa Mon Sep 17 00:00:00 2001 From: wlu04 Date: Wed, 12 Apr 2023 18:43:38 -0400 Subject: [PATCH 1/7] new_mixing_pipeline --- mixing_samples.py | 241 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 241 insertions(+) create mode 100644 mixing_samples.py diff --git a/mixing_samples.py b/mixing_samples.py new file mode 100644 index 0000000..5149a69 --- /dev/null +++ b/mixing_samples.py @@ -0,0 +1,241 @@ +import pysam +import pipes +import copy +import random +import hail as hl +import hailtop.batch as hb +import logging +import pandas as pd +import argparse +import numpy as np +from typing import List + +reference = "GRCh38" +CHROM_LENGTHS = hl.get_reference(reference).lengths +BILLING_PROJECT = "gnomad-production" +MY_BUCKET = "gs://gnomad-wenhan/charr_simulation" +TMP_DIR = "gs://gnomad-wenhan/tmp/contam_free_file/" +REF_FASTA_PATH = ( + "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.fasta" +) +REF_FASTA_INDEX = ( + "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.fasta.fai" +) +REF_DICT = "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.dict" +chromosomes = list(map(str, range(1, 23))) + ["X"] +CHROMOSOMES = [f"chr{chrom}" for chrom in chromosomes] +CONTAM_RATES = [0.005, 0.01, 0.02, 0.05, 0.1] +SAMTOOLS_IMAGE = "staphb/samtools:latest" + +logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s") +logger = logging.getLogger("CHARR simulation pipeline") +logger.setLevel(logging.INFO) + +def open_pipes_output(ref_fasta, output_name): + pipe = pipes.Template() + pipe.append( + f"samtools view -C -T {ref_fasta} -h -o {output_name} 2> /dev/null", + "-.", + ) + f = pipe.open( + f"temp.sam", "w" + ) + return f + +def get_read_groups(file): + read = next(file.fetch(until_eof=True)) + rg_ind = [i for i, tuple in enumerate(read.tags) if 'RG' in tuple] + main_rg = read.tags[rg_ind[0]][-1] + return main_rg + +def edit_read_group(read, rg_name): + rg_ind = [i for i, tuple in enumerate(read.tags) if 'RG' in tuple] + if len(rg_ind)>0: + temp_tag = [list(ele) for ele in read.tags] + temp_tag[rg_ind[0]][-1] = rg_name + read.tags = [tuple(ele) for ele in temp_tag] + return read + +def mixing_many_samples( + b: hb.Batch, + input_list: List, + output_list: List, + sample_list: List, + input_ref_fasta: str, + chromosome: str, + contam_rate: float, +): + inputs = [pysam.AlignmentFile(cram_input["cram"], mode="rc", reference_filename=input_ref_fasta) for + cram_input in input_list] + outputs = [open_pipes_output(ref_fasta=input_ref_fasta, output_name=output_list[sample_id]) for sample_id in + sample_list] + main_rgs = [get_read_groups(input) for input in inputs] + next_reads = [[] for input in inputs] + + for pos in range(CHROM_LENGTHS[chromosome]): + next_reads = [[read for read in reads if read.pos == pos] for reads in next_reads] + current_reads = copy.deepcopy(next_reads) + for sample in range(len(inputs)): + while True: + next_read = edit_read_group(next(inputs[sample]), main_rgs[sample]) + # print(f'current sample: {samples[sample]}') + # print(f'current read: {next_read}') + # print(f'current position: {pos}') + # print(f'current read position: {next_read.pos}') + if next_read.pos == pos: + current_reads[sample].append(next_read) + else: + next_reads[sample].append(next_read) + break + + # contamination + current_reads_actually_at_position = [[read for read in reads if read.pos == pos] for reads in current_reads] + for sample in range(len(inputs)): + for read in current_reads_actually_at_position[sample]: + contaminate = np.random.binomial(1, contam_rate) + if not contaminate: + outputs[sample].write(read.tostring() + "\n") + else: + random_ind = random.randint(0, len(inputs) - 1) + random_sample_reads = current_reads_actually_at_position[random_ind] + random_read_ind = random.randint(0, len(random_sample_reads) - 1) + random_read = random_sample_reads[random_read_ind] + outputs[sample].write(random_read.tostring() + "\n") + for output in outputs: + output.close() + + + +def main(): + backend = hb.ServiceBackend( + billing_project=BILLING_PROJECT, + remote_tmpdir=TMP_DIR, + ) + b = hb.Batch( + name=f"Decontam-Mixing-Cram-Files", + requester_pays_project="daly-ibd", + default_python_image="us-central1-docker.pkg.dev/broad-mpg-gnomad/wlu/hail/hail-pysam-samtools:4.16.0", + backend=backend, + ) + + input_ref_fasta = b.read_input_group( + **{ + "fasta": REF_FASTA_PATH, + "fasta.fai": REF_FASTA_INDEX, + "dict": REF_DICT, + } + ) + + logger.info("Loading sample info...") + sample_paths = pd.read_csv(args.input_sample_info, sep=",") + sample_ids = pd.read_csv(args.selected_samples, sep=",") + sample_paths = sample_paths[sample_paths["s"].isin(sample_ids["hgdp_id"])] + samples = [] + cram_files = {} + for sample in sample_paths.iterrows(): + samples.append(sample[1][0]) + cram_files[sample[1][0]] = (f"{sample[1][1]}", f"{sample[1][1]}.crai") + + if args.test: + samples = samples[:3] + print(samples) + + for sample in samples: + input_cram_file = b.read_input_group( + cram=cram_files[sample][0], index=cram_files[sample][1] + ) + j_header = b.new_job( + name=f"get_header_{sample}", + attributes={"sample_id": sample, "job_type": "get_header"}, + ) + j_header.image(SAMTOOLS_IMAGE).storage("15Gi").memory('15Gi') + j_header.command( + f"samtools view -H {input_cram_file['cram']} > {j_header[sample]}" + ) + + for contam_rate in CONTAM_RATES: + for chromosome in CHROMOSOMES: + j_mix = b.new_python_job( + name=f"Mix_all_samples_{chromosome}", + attributes={ + "contam_rate": f"{contam_rate * 100}\%", + "chromosome": chromosome, + "job_type": "mixing_samples", + }, + ) + j_mix.storage("50Gi").memory("10Gi") + cram_paths = [ + f"{MY_BUCKET}/contam_free/crams/cram_by_chrom/{sample_id}/{sample_id}_contam_free_{chromosome}.cram" + for sample_id in samples] + inputs = [b.read_input_group(**{"cram": path, "cram.crai": f"{path}.crai"}) for path in + cram_paths] + outputs = [j_mix[f'{sample}_chromosome'] for sample in samples] + + j_mix.call( + mixing_many_samples, + b=b, + input_list=inputs, + outpu_list=outputs, + sample_list=samples, + input_ref_fasta=input_ref_fasta['fasta'], + chromosome=chromosome, + contam_rate=contam_rate + ) + + j_reheader = b.new_job( + name=f"Reheader_contam_free_file_{chromosome}", + attributes={ + "contam_rate": f"{contam_rate * 100}\%", + "chromosome": chromosome, + "job_type": "run_reheader", + }, + ) + j_reheader.image(SAMTOOLS_IMAGE).storage("20Gi").memory("3.75Gi") + + for sample in samples: + j_header = b.new_job( + name=f"get_header_{sample}", + attributes={"sample_id": sample, "job_type": "get_header"}, + ) + j_header.image(SAMTOOLS_IMAGE).storage("15Gi").memory('15Gi') + j_header.command( + f"samtools view -H {input_cram_file['cram']} > {j_header.ofile}" + ) + output_cram_path = f'{MY_BUCKET}/mixed_samples/v2/crams/cram_by_chrom/{sample}/contam_rate_{contam_rate*100}/{sample}_{chromosome}_{contam_rate*100}.cram' + b.write_output(j_mix[f'{sample}_{chromosome}'], output_cram_path) + + j_reheader.command( + f"samtools reheader {j_header[sample]} {j_mix.ofile} > {j_reheader.ofile1}" + ) + j_reheader.command( + f"samtools index {j_reheader.ofile1} -o {j_reheader.ofile2}" + ) + b.write_output(j_reheader.ofile1, output_cram_path) + b.write_output(j_reheader.ofile2, f"{output_cram_path}.crai") + if args.test: + break + if args.test: + break + b.run() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--input-sample-info", + help="Path to a csv of paths to cram and gvcf files with sample IDs", + nargs="?", + ) + parser.add_argument( + "--selected-samples", + help="Path to a list of sample IDs to run and mix", + nargs="?", + ) + parser.add_argument( + "--test", + help="Whether to run test", + action="store_true", + ) + args = parser.parse_args() + print(args) + main() \ No newline at end of file From e30ade485754f0e070f4e773ee6b1593c9a235c0 Mon Sep 17 00:00:00 2001 From: wlu04 Date: Wed, 12 Apr 2023 19:16:22 -0400 Subject: [PATCH 2/7] minor_fixes --- mixing_samples.py | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/mixing_samples.py b/mixing_samples.py index 5149a69..d2b13e0 100644 --- a/mixing_samples.py +++ b/mixing_samples.py @@ -57,7 +57,6 @@ def edit_read_group(read, rg_name): return read def mixing_many_samples( - b: hb.Batch, input_list: List, output_list: List, sample_list: List, @@ -138,7 +137,6 @@ def main(): if args.test: samples = samples[:3] - print(samples) for sample in samples: input_cram_file = b.read_input_group( @@ -169,13 +167,13 @@ def main(): for sample_id in samples] inputs = [b.read_input_group(**{"cram": path, "cram.crai": f"{path}.crai"}) for path in cram_paths] - outputs = [j_mix[f'{sample}_chromosome'] for sample in samples] + outputs = [j_mix[f'{sample}_{chromosome}'] for sample in samples] + j_mix.call( mixing_many_samples, - b=b, input_list=inputs, - outpu_list=outputs, + output_list=outputs, sample_list=samples, input_ref_fasta=input_ref_fasta['fasta'], chromosome=chromosome, @@ -193,19 +191,11 @@ def main(): j_reheader.image(SAMTOOLS_IMAGE).storage("20Gi").memory("3.75Gi") for sample in samples: - j_header = b.new_job( - name=f"get_header_{sample}", - attributes={"sample_id": sample, "job_type": "get_header"}, - ) - j_header.image(SAMTOOLS_IMAGE).storage("15Gi").memory('15Gi') - j_header.command( - f"samtools view -H {input_cram_file['cram']} > {j_header.ofile}" - ) output_cram_path = f'{MY_BUCKET}/mixed_samples/v2/crams/cram_by_chrom/{sample}/contam_rate_{contam_rate*100}/{sample}_{chromosome}_{contam_rate*100}.cram' b.write_output(j_mix[f'{sample}_{chromosome}'], output_cram_path) j_reheader.command( - f"samtools reheader {j_header[sample]} {j_mix.ofile} > {j_reheader.ofile1}" + f"samtools reheader {j_header[sample]} {j_mix[f'{sample}_{chromosome}']} > {j_reheader.ofile1}" ) j_reheader.command( f"samtools index {j_reheader.ofile1} -o {j_reheader.ofile2}" @@ -238,4 +228,4 @@ def main(): ) args = parser.parse_args() print(args) - main() \ No newline at end of file + main() From 715770aed6e768e30a727081df4d9bc09b1c8236 Mon Sep 17 00:00:00 2001 From: wlu04 Date: Thu, 13 Apr 2023 18:07:04 -0400 Subject: [PATCH 3/7] fixed the logic --- mixing_samples.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/mixing_samples.py b/mixing_samples.py index d2b13e0..dccbe96 100644 --- a/mixing_samples.py +++ b/mixing_samples.py @@ -69,12 +69,17 @@ def mixing_many_samples( outputs = [open_pipes_output(ref_fasta=input_ref_fasta, output_name=output_list[sample_id]) for sample_id in sample_list] main_rgs = [get_read_groups(input) for input in inputs] - next_reads = [[] for input in inputs] + next_reads = [[next(input.fetch(chromosome, until_eof=True))] for input in inputs] + current_pos = [next_read[0].pos for next_read in next_reads] for pos in range(CHROM_LENGTHS[chromosome]): - next_reads = [[read for read in reads if read.pos == pos] for reads in next_reads] + if (not any(x == pos for x in current_pos)): + continue + next_reads = [[read for read in reads if read.pos >= pos] for reads in next_reads] current_reads = copy.deepcopy(next_reads) for sample in range(len(inputs)): + if len(next_reads[sample]) > 0 and next_reads[sample][0].pos != pos: + continue while True: next_read = edit_read_group(next(inputs[sample]), main_rgs[sample]) # print(f'current sample: {samples[sample]}') @@ -86,6 +91,7 @@ def mixing_many_samples( else: next_reads[sample].append(next_read) break + current_pos = [next_read[-1].pos for next_read in next_reads] # contamination current_reads_actually_at_position = [[read for read in reads if read.pos == pos] for reads in current_reads] @@ -102,6 +108,8 @@ def mixing_many_samples( outputs[sample].write(random_read.tostring() + "\n") for output in outputs: output.close() + for input in inputs: + input.close() From ac72ea8fd7bd94c0ebf5332db9f84aafa8f41792 Mon Sep 17 00:00:00 2001 From: wlu04 Date: Fri, 14 Apr 2023 14:58:45 -0400 Subject: [PATCH 4/7] minor fix --- mixing_samples.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mixing_samples.py b/mixing_samples.py index dccbe96..5e8a287 100644 --- a/mixing_samples.py +++ b/mixing_samples.py @@ -2,6 +2,7 @@ import pipes import copy import random +from random import choice import hail as hl import hailtop.batch as hb import logging @@ -101,7 +102,7 @@ def mixing_many_samples( if not contaminate: outputs[sample].write(read.tostring() + "\n") else: - random_ind = random.randint(0, len(inputs) - 1) + random_ind = choice([i for i in range(0,len(inputs) - 1) if i not in [sample]]) random_sample_reads = current_reads_actually_at_position[random_ind] random_read_ind = random.randint(0, len(random_sample_reads) - 1) random_read = random_sample_reads[random_read_ind] From 8c96aeb85379a31277d6b9a9e7a61ffe48c4a8a4 Mon Sep 17 00:00:00 2001 From: wlu04 Date: Wed, 19 Apr 2023 10:16:47 -0400 Subject: [PATCH 5/7] minor fixes --- mixing_samples.py | 139 ++++++++++++++++++++++++++++------------------ 1 file changed, 84 insertions(+), 55 deletions(-) diff --git a/mixing_samples.py b/mixing_samples.py index 5e8a287..94c7dd4 100644 --- a/mixing_samples.py +++ b/mixing_samples.py @@ -67,29 +67,37 @@ def mixing_many_samples( ): inputs = [pysam.AlignmentFile(cram_input["cram"], mode="rc", reference_filename=input_ref_fasta) for cram_input in input_list] - outputs = [open_pipes_output(ref_fasta=input_ref_fasta, output_name=output_list[sample_id]) for sample_id in - sample_list] + outputs = [open_pipes_output(ref_fasta=input_ref_fasta, output_name=output_list[i]) for i in + range(len(sample_list))] main_rgs = [get_read_groups(input) for input in inputs] next_reads = [[next(input.fetch(chromosome, until_eof=True))] for input in inputs] current_pos = [next_read[0].pos for next_read in next_reads] + last_reads = [False for input in inputs] for pos in range(CHROM_LENGTHS[chromosome]): if (not any(x == pos for x in current_pos)): continue + if any(last_reads): + break next_reads = [[read for read in reads if read.pos >= pos] for reads in next_reads] current_reads = copy.deepcopy(next_reads) for sample in range(len(inputs)): if len(next_reads[sample]) > 0 and next_reads[sample][0].pos != pos: continue while True: - next_read = edit_read_group(next(inputs[sample]), main_rgs[sample]) # print(f'current sample: {samples[sample]}') # print(f'current read: {next_read}') # print(f'current position: {pos}') # print(f'current read position: {next_read.pos}') - if next_read.pos == pos: + try: + next_read = edit_read_group(next(inputs[sample]), main_rgs[sample]) + except StopIteration: + last_reads[sample] = True + break + + if next_read.pos == pos and (not last_reads[sample]): current_reads[sample].append(next_read) - else: + elif not last_reads[sample]: next_reads[sample].append(next_read) break current_pos = [next_read[-1].pos for next_read in next_reads] @@ -102,7 +110,12 @@ def mixing_many_samples( if not contaminate: outputs[sample].write(read.tostring() + "\n") else: - random_ind = choice([i for i in range(0,len(inputs) - 1) if i not in [sample]]) + indexes = list( + filter(lambda x: x not in [sample] and len(current_reads_actually_at_position[x]) > 0, + range(0, len(inputs) - 1))) + if len(indexes)==0: + continue + random_ind = choice(indexes) random_sample_reads = current_reads_actually_at_position[random_ind] random_read_ind = random.randint(0, len(random_sample_reads) - 1) random_read = random_sample_reads[random_read_ind] @@ -147,7 +160,8 @@ def main(): if args.test: samples = samples[:3] - for sample in samples: + for i in range(len(samples)): + sample = samples[i] input_cram_file = b.read_input_group( cram=cram_files[sample][0], index=cram_files[sample][1] ) @@ -157,60 +171,75 @@ def main(): ) j_header.image(SAMTOOLS_IMAGE).storage("15Gi").memory('15Gi') j_header.command( - f"samtools view -H {input_cram_file['cram']} > {j_header[sample]}" + f"samtools view -H {input_cram_file['cram']} > {j_header[f'{sample}']}" ) for contam_rate in CONTAM_RATES: for chromosome in CHROMOSOMES: - j_mix = b.new_python_job( - name=f"Mix_all_samples_{chromosome}", - attributes={ - "contam_rate": f"{contam_rate * 100}\%", - "chromosome": chromosome, - "job_type": "mixing_samples", - }, - ) - j_mix.storage("50Gi").memory("10Gi") - cram_paths = [ - f"{MY_BUCKET}/contam_free/crams/cram_by_chrom/{sample_id}/{sample_id}_contam_free_{chromosome}.cram" - for sample_id in samples] - inputs = [b.read_input_group(**{"cram": path, "cram.crai": f"{path}.crai"}) for path in - cram_paths] - outputs = [j_mix[f'{sample}_{chromosome}'] for sample in samples] - - - j_mix.call( - mixing_many_samples, - input_list=inputs, - output_list=outputs, - sample_list=samples, - input_ref_fasta=input_ref_fasta['fasta'], - chromosome=chromosome, - contam_rate=contam_rate - ) - - j_reheader = b.new_job( - name=f"Reheader_contam_free_file_{chromosome}", - attributes={ - "contam_rate": f"{contam_rate * 100}\%", - "chromosome": chromosome, - "job_type": "run_reheader", - }, - ) - j_reheader.image(SAMTOOLS_IMAGE).storage("20Gi").memory("3.75Gi") - - for sample in samples: - output_cram_path = f'{MY_BUCKET}/mixed_samples/v2/crams/cram_by_chrom/{sample}/contam_rate_{contam_rate*100}/{sample}_{chromosome}_{contam_rate*100}.cram' - b.write_output(j_mix[f'{sample}_{chromosome}'], output_cram_path) - - j_reheader.command( - f"samtools reheader {j_header[sample]} {j_mix[f'{sample}_{chromosome}']} > {j_reheader.ofile1}" + if args.test: + chromosome = 'chr21' + output_mix_cram_paths = [f'{MY_BUCKET}/mixed_samples/v2/crams/cram_by_chrom/{sample}/contam_rate_{contam_rate*100}/{sample}_{chromosome}_{contam_rate*100}.cram' for sample in samples] + if any([not hl.hadoop_exists(path) for path in output_mix_cram_paths]): + logger.info( + f"Mixing contamination free crams: {chromosome}_{contam_rate*100}_percent_contamination..." + ) + j_mix = b.new_python_job( + name=f"Mix_all_samples_{chromosome}", + attributes={ + "contam_rate": f"{contam_rate * 100}\%", + "chromosome": chromosome, + "job_type": "mixing_samples", + }, ) - j_reheader.command( - f"samtools index {j_reheader.ofile1} -o {j_reheader.ofile2}" + j_mix.storage("50Gi").memory("10Gi") + cram_paths = [ + f"{MY_BUCKET}/contam_free/crams/cram_by_chrom/{sample_id}/{sample_id}_contam_free_{chromosome}.cram" + for sample_id in samples] + inputs = [b.read_input_group(**{"cram": path, "cram.crai": f"{path}.crai"}) for path in + cram_paths] + outputs = [j_mix[f'{sample}_{chromosome}'] for sample in samples] + + j_mix.call( + mixing_many_samples, + input_list=inputs, + output_list=outputs, + sample_list=samples, + input_ref_fasta=input_ref_fasta['fasta'], + chromosome=chromosome, + contam_rate=contam_rate + ) + + for i in range(len(samples)): + output_cram_path = output_mix_cram_paths[i] + b.write_output(outputs[i], output_cram_path) + + elif any([not hl.hadoop_exists(f"{path}.crai") for path in output_mix_cram_paths]): + logger.info( + f"Reheadering contamination free cram: {chromosome}_{contam_rate*100}_percent_contamination..." + ) + j_reheader = b.new_job( + name=f"Reheader_contam_free_file_{chromosome}", + attributes={ + "contam_rate": f"{contam_rate * 100}\%", + "chromosome": chromosome, + "job_type": "run_reheader", + }, ) - b.write_output(j_reheader.ofile1, output_cram_path) - b.write_output(j_reheader.ofile2, f"{output_cram_path}.crai") + j_reheader.image(SAMTOOLS_IMAGE).storage("20Gi").memory("3.75Gi") + j_reheader.depends_on(j_header) + + for i in range(len(samples)): + if not hl.hadoop_exists(f"{output_mix_cram_paths[i]}.crai"): + tmp_mix_cram = b.read_input(output_mix_cram_paths[i]) + sample = samples[i] + j_reheader.command( + f"samtools reheader {j_header[f'{sample}']} {tmp_mix_cram} > {j_reheader.ofile1}" + ) + j_reheader.command( + f"samtools index {j_reheader.ofile1} -o {j_reheader.ofile2}" + ) + b.write_output(j_reheader.ofile1, output_mix_cram_paths[i]) + b.write_output(j_reheader.ofile2, f"{output_mix_cram_paths[i]}.crai") if args.test: break if args.test: From f858d02e87f8d926461e2073dbcf59999fe591b6 Mon Sep 17 00:00:00 2001 From: wlu04 Date: Wed, 19 Apr 2023 14:50:32 -0400 Subject: [PATCH 6/7] minor pipeline fixes --- mixing_samples.py | 280 +++++++++++++++++++++++++++++++++++++++------- 1 file changed, 242 insertions(+), 38 deletions(-) diff --git a/mixing_samples.py b/mixing_samples.py index 94c7dd4..905f035 100644 --- a/mixing_samples.py +++ b/mixing_samples.py @@ -57,6 +57,20 @@ def edit_read_group(read, rg_name): read.tags = [tuple(ele) for ele in temp_tag] return read +def get_main_rg_list( + b: hb.Batch, + cram_files: dict, + input_ref_fasta: str, + sample_list: list +): + full_inputs = [b.read_input_group(**{"cram": cram_files[sample][0], "cram.crai": f"{cram_files[sample][1]}.crai"}) for sample in + sample_list] + rg_inputs = [pysam.AlignmentFile(full_input, mode="rc", reference_filename=input_ref_fasta) for + full_input in full_inputs] + main_rgs = [get_read_groups(input) for input in rg_inputs] + print(main_rgs) + return(main_rgs) + def mixing_many_samples( input_list: List, output_list: List, @@ -70,7 +84,8 @@ def mixing_many_samples( outputs = [open_pipes_output(ref_fasta=input_ref_fasta, output_name=output_list[i]) for i in range(len(sample_list))] main_rgs = [get_read_groups(input) for input in inputs] - next_reads = [[next(input.fetch(chromosome, until_eof=True))] for input in inputs] + print(main_rgs) + next_reads = [[edit_read_group(next(inputs[i].fetch(chromosome, until_eof=True)), main_rgs[i])] for i in range(len(inputs))] current_pos = [next_read[0].pos for next_read in next_reads] last_reads = [False for input in inputs] @@ -160,25 +175,40 @@ def main(): if args.test: samples = samples[:3] - for i in range(len(samples)): - sample = samples[i] - input_cram_file = b.read_input_group( - cram=cram_files[sample][0], index=cram_files[sample][1] - ) - j_header = b.new_job( - name=f"get_header_{sample}", - attributes={"sample_id": sample, "job_type": "get_header"}, - ) - j_header.image(SAMTOOLS_IMAGE).storage("15Gi").memory('15Gi') - j_header.command( - f"samtools view -H {input_cram_file['cram']} > {j_header[f'{sample}']}" - ) + + + + # for i in range(len(samples)): + # sample = samples[i] + # input_cram_file = b.read_input_group( + # cram=cram_files[sample][0], index=cram_files[sample][1] + # ) + # j_header = b.new_job( + # name=f"get_header_{sample}", + # attributes={"sample_id": sample, "job_type": "get_header"}, + # ) + # j_header.image(SAMTOOLS_IMAGE).storage("15Gi").memory('15Gi') + # j_header.command( + # f"samtools view -H {input_cram_file['cram']} > {j_header[f'{sample}']}" + # ) for contam_rate in CONTAM_RATES: + mixed_crams = {} + mixed_gvcfs = {} + mix_gvcf_job_depend_list = [] + mix_cram_job_depend_list = [] + for sample in samples: + mixing_samples_label = f"{sample}_{contam_rate * 100}" + mixed_crams[mixing_samples_label] = {} + mixed_gvcfs[mixing_samples_label] = {} for chromosome in CHROMOSOMES: if args.test: chromosome = 'chr21' output_mix_cram_paths = [f'{MY_BUCKET}/mixed_samples/v2/crams/cram_by_chrom/{sample}/contam_rate_{contam_rate*100}/{sample}_{chromosome}_{contam_rate*100}.cram' for sample in samples] + output_mix_vcf_paths = [ + f'{MY_BUCKET}/mixed_samples/v2/variant-calling/{sample}/{sample}_{contam_rate * 100}/{sample}_{contam_rate * 100}_{chromosome}.g.vcf' + for sample in samples] + j_reheader_depend_on = None if any([not hl.hadoop_exists(path) for path in output_mix_cram_paths]): logger.info( f"Mixing contamination free crams: {chromosome}_{contam_rate*100}_percent_contamination..." @@ -208,42 +238,201 @@ def main(): chromosome=chromosome, contam_rate=contam_rate ) + j_reheader_depend_on = j_mix + mix_cram_job_depend_list.append(j_mix) for i in range(len(samples)): + mixing_samples_label = f"{samples[i]}_{contam_rate * 100}" output_cram_path = output_mix_cram_paths[i] b.write_output(outputs[i], output_cram_path) + mixed_crams[mixing_samples_label][chromosome] = outputs[i] - elif any([not hl.hadoop_exists(f"{path}.crai") for path in output_mix_cram_paths]): - logger.info( - f"Reheadering contamination free cram: {chromosome}_{contam_rate*100}_percent_contamination..." - ) - j_reheader = b.new_job( - name=f"Reheader_contam_free_file_{chromosome}", + + for i in range(len(samples)): + sample = samples[i] + mixing_samples_label = f"{sample}_{contam_rate * 100}" + j_gvcf_depend_on = None + if not hl.hadoop_exists(f"{output_mix_cram_paths[i]}.crai"): + j_reheader = b.new_job( + name=f"Reheader_contam_free_file_{mixing_samples_label}_{chromosome}", + attributes={ + "sample": sample, + "contam_rate": f"{contam_rate * 100}\%", + "chromosome": chromosome, + "job_type": "run_reheader", + }, + ) + j_reheader.image(SAMTOOLS_IMAGE).storage("40Gi").memory("15Gi") + + if j_reheader_depend_on is not None: + j_reheader.depends_on(j_reheader_depend_on) + logger.info( + f"Reheadering contamination free cram: {sample}_{chromosome}_{contam_rate * 100}_percent_contamination..." + ) + input_cram_file = b.read_input_group( + cram=cram_files[sample][0], index=cram_files[sample][1] + ) + tmp_mix_cram = b.read_input(output_mix_cram_paths[i]) + j_reheader.command( + f"samtools view -H {input_cram_file['cram']} > {j_reheader.header}" + ) + j_reheader.command( + f"samtools reheader {j_reheader.header} {tmp_mix_cram} > {j_reheader.ofile1}" + ) + j_reheader.command( + f"samtools index {j_reheader.ofile1} -o {j_reheader.ofile2}" + ) + mix_cram_job_depend_list.append(j_reheader) + b.write_output(j_reheader.ofile1, output_mix_cram_paths[i]) + b.write_output(j_reheader.ofile2, f"{output_mix_cram_paths[i]}.crai") + mixed_crams[mixing_samples_label][chromosome] = j_reheader.ofile1 + j_gvcf_depend_on.append(j_reheader) + else: + path = output_mix_cram_paths[i] + input_mixed_cram = b.read_input_group(**{"cram": path, "cram.crai": f"{path}.crai"}) + mixed_crams[mixing_samples_label][chromosome] = input_mixed_cram['cram'] + + if not hl.hadoop_exists(output_mix_vcf_paths[i]): + logger.info( + f"Running haplotype caller: {sample}_{chromosome}_{contam_rate*100}_percent_contamination..." + ) + input_chrom_cram_file = b.read_input_group( + **{ + "cram": output_mix_cram_paths[i], + "cram.crai": f"{output_mix_cram_paths[i]}.crai", + } + ) + tmp_gvcf, tmp_job = haplotype_caller_gatk( + b=b, + depend_on=j_gvcf_depend_on, + input_bam=input_chrom_cram_file, + ref_fasta=input_ref_fasta, + interval_list_file=chromosome, + out_dir=f"{MY_BUCKET}/mixed_samples/v2", + contamination=0.0, + bam_filename_no_ext=f"{sample}_{contam_rate * 100}_{chromosome}", + storage=40, + interval_list_name=None, + memory=26, + ) + mix_gvcf_job_depend_list.append(tmp_job) + mixed_gvcfs[mixing_samples_label][chromosome] = tmp_gvcf + else: + tmp_gvcf = b.read_input(output_mix_vcf_paths[i]) + mixed_gvcfs[mixing_samples_label][chromosome] = tmp_gvcf + + if args.test: + break + + mixed_labels=[] + for sample in samples: + mixing_samples_label = f"{sample}_{contam_rate * 100}" + mixed_labels.append(mixing_samples_label) + output_mix_full_cram_path = ( + f"{MY_BUCKET}/mixed_samples/v2/crams/{mixing_samples_label}_mixed.cram" + ) + j_cat_depend_on = None + if not hl.hadoop_exists(output_mix_full_cram_path): + logger.info(f"Concatenating mixed crams: {mixing_samples_label}...") + j_cat = b.new_job( + name=f"Concatenate_mixed_file_{mixing_samples_label}", attributes={ + "sample": sample, "contam_rate": f"{contam_rate * 100}\%", - "chromosome": chromosome, - "job_type": "run_reheader", + "job_type": "concatenate_mixed_crams", }, ) - j_reheader.image(SAMTOOLS_IMAGE).storage("20Gi").memory("3.75Gi") - j_reheader.depends_on(j_header) + if len(mix_cram_job_depend_list) > 0: + j_cat.depends_on(*mix_cram_job_depend_list) + j_cat.image(SAMTOOLS_IMAGE).storage("60Gi").memory("10Gi") + tmp_cram_lst = reduce( + lambda x, y: x + " " + y, mixed_crams[mixing_samples_label].values() + ) + input_cram_file = b.read_input_group( + cram=cram_files[sample][0], index=cram_files[sample][1] + ) + j_cat.command( + f"samtools view -H {input_cram_file['cram']} > {j_cat.header}" + ) + j_cat.command( + f"samtools cat -h {j_cat.header} -o {j_cat.ofile1} {tmp_cram_lst}" + ) + j_cat.command(f"samtools index {j_cat.ofile1} -o {j_cat.ofile2}") + b.write_output(j_cat.ofile1, output_mix_full_cram_path) + b.write_output(j_cat.ofile2, f"{output_mix_full_cram_path}.crai") + j_cat_depend_on = j_cat - for i in range(len(samples)): - if not hl.hadoop_exists(f"{output_mix_cram_paths[i]}.crai"): - tmp_mix_cram = b.read_input(output_mix_cram_paths[i]) - sample = samples[i] - j_reheader.command( - f"samtools reheader {j_header[f'{sample}']} {tmp_mix_cram} > {j_reheader.ofile1}" - ) - j_reheader.command( - f"samtools index {j_reheader.ofile1} -o {j_reheader.ofile2}" - ) - b.write_output(j_reheader.ofile1, output_mix_cram_paths[i]) - b.write_output(j_reheader.ofile2, f"{output_mix_cram_paths[i]}.crai") - if args.test: - break + if hl.hadoop_exists(output_mix_full_cram_path) and not hl.hadoop_exists( + f"{MY_BUCKET}/mixed_samples/v2/verifybamid/{mixing_samples_label}.selfSM" + ): + logger.info(f"Running VerifyBam ID: {mixing_samples_label}...") + run_verifybamid( + b=b, + input_cram_path=output_mix_full_cram_path, + input_crai_path=f"{output_mix_full_cram_path}.crai", + cram_project_id="broad-mpg-gnomad", + ref_fasta_path=REF_FASTA, + contamination_sites_path=CONTAM_SITES, + output_path=f"{MY_BUCKET}/mixed_samples/v2/verifybamid/", + output_prefix=mixing_samples_label, + depend_on=j_cat_depend_on, + disable_sanity_check=args.disable_sanity_check, + ) + if not hl.hadoop_exists( + f"{MY_BUCKET}/mixed_samples/v2/merged-gvcf/{mixing_samples_label}.g.vcf.gz" + ) and hl.hadoop_exists(f'{MY_BUCKET}/mixed_samples/v2/variant-calling/{mixing_samples_label}/*.vcf'): + logger.info(f"Running merge gvcfs: {mixing_samples_label}...") + gvcfs_to_merge = hl.utils.hadoop_ls( + f'{MY_BUCKET}/mixed_samples/v2/variant-calling/{mixing_samples_label}/*.vcf') + gvcfs_list = [] + gvcfs_sizes_sum = 0 + for file in gvcfs_to_merge: + gvcfs_list.append(file['path']) + gvcfs_sizes_sum += bytes_to_gb(file['path']) + merge_disk_size = round(gvcfs_sizes_sum * 2.5) + 15 + merged_vcf, j_merge = merge_vcf( + b=b, + gvcf_list=gvcfs_list, + depend_on=mix_gvcf_job_depend_list, + storage=merge_disk_size, + output_vcf_name=f"{mixing_samples_label}", + out_dir=f"{MY_BUCKET}/mixed_samples/v2", + memory="50", + ) + else: + merged_vcf = b.read_input( + f"{MY_BUCKET}/mixed_samples/v2/merged-gvcf/{mixing_samples_label}.g.vcf.gz" + ) + + if not hl.hadoop_exists( + f"{MY_BUCKET}/mixed_samples/v2/merged-gvcf/{mixing_samples_label}.g.vcf.gz.tbi" + ) and hl.hadoop_exists( + f"{MY_BUCKET}/mixed_samples/v2/merged-gvcf/{mixing_samples_label}.g.vcf.gz" + ): + logger.info(f"Indexing gvcf: {mixing_samples_label}...") + gvcf_index_file = index_gvcf( + b=b, + input_vcf=merged_vcf, + output_vcf_ind_name=f"{mixing_samples_label}", + out_dir=f"{MY_BUCKET}/mixed_samples", + storage="15", + memory="15" + ) if args.test: break + if args.run_freemix_sum_table: + contam_est = [] + split_labels = pd.DataFrame( + [i.split("_") for i in mixed_labels], + columns=["original", "contam_rate"], + ) + for s in mixed_labels: + contam_est.append( + check_contam(1, f"{MY_BUCKET}/mixed_samples/v2/verifybamid/", s) + ) + split_labels["freemix_score"] = contam_est + mixed_table = hl.Table.from_pandas(split_labels) + mixed_table.write(f"{MY_BUCKET}/hgdp_n_way_mixed_sample_freemix_score.ht") b.run() @@ -264,6 +453,21 @@ def main(): help="Whether to run test", action="store_true", ) + parser.add_argument( + "--run-merge-gvcf-files", + help="Whether to run freemix summary table", + action="store_true", + ) + parser.add_argument( + "--run-freemix-sum-table", + help="Whether to run freemix summary table", + action="store_true", + ) + parser.add_argument( + "--disable-sanity-check", + help="Whether to use sanity check in verifybamID", + action="store_true", + ) args = parser.parse_args() print(args) main() From a7cecd091c47c5683e7baf66400761bec0517c26 Mon Sep 17 00:00:00 2001 From: wlu04 Date: Fri, 28 Apr 2023 11:35:55 -0400 Subject: [PATCH 7/7] latest version --- mixing_samples.py | 408 ++++++++++++++++++++++++-------------------- run_vds_combiner.py | 143 +++++++++++++--- 2 files changed, 349 insertions(+), 202 deletions(-) diff --git a/mixing_samples.py b/mixing_samples.py index 905f035..272da2a 100644 --- a/mixing_samples.py +++ b/mixing_samples.py @@ -10,6 +10,14 @@ import argparse import numpy as np from typing import List +from functools import reduce + +from variant_calling.haplotype_caller import haplotype_caller_gatk +from variant_calling.merge_gvcfs import merge_vcf +from variant_calling.gvcf_index import index_gvcf +from variant_calling.variant_calling_pipeline import var_call_pipeline +from variant_calling.get_file_size import bytes_to_gb +from batch_verifybamid import * reference = "GRCh38" CHROM_LENGTHS = hl.get_reference(reference).lengths @@ -49,7 +57,7 @@ def get_read_groups(file): main_rg = read.tags[rg_ind[0]][-1] return main_rg -def edit_read_group(read, rg_name): +def edit_read_group(read, rg_name='ERR1349727'): rg_ind = [i for i, tuple in enumerate(read.tags) if 'RG' in tuple] if len(rg_ind)>0: temp_tag = [list(ele) for ele in read.tags] @@ -57,19 +65,87 @@ def edit_read_group(read, rg_name): read.tags = [tuple(ele) for ele in temp_tag] return read -def get_main_rg_list( - b: hb.Batch, - cram_files: dict, - input_ref_fasta: str, - sample_list: list -): - full_inputs = [b.read_input_group(**{"cram": cram_files[sample][0], "cram.crai": f"{cram_files[sample][1]}.crai"}) for sample in - sample_list] - rg_inputs = [pysam.AlignmentFile(full_input, mode="rc", reference_filename=input_ref_fasta) for - full_input in full_inputs] - main_rgs = [get_read_groups(input) for input in rg_inputs] - print(main_rgs) - return(main_rgs) +# def mixing_many_samples( +# input_list: List, +# output_list: List, +# sample_list: List, +# input_ref_fasta: str, +# chromosome: str, +# contam_rate: float, +# ): +# save_pos=[] +# save_next_reads_len=[] +# save_current_reads_len=[] +# save_current_reads_actually_at_position_len=[] +# +# inputs = [pysam.AlignmentFile(cram_input["cram"], mode="rc", reference_filename=input_ref_fasta) for +# cram_input in input_list] +# outputs = [open_pipes_output(ref_fasta=input_ref_fasta, output_name=output_list[i]) for i in +# range(len(sample_list))] +# # main_rgs = [get_read_groups(input) for input in inputs] +# # print(main_rgs) +# next_reads = [[edit_read_group(next(inputs[i].fetch(chromosome, until_eof=True)))] for i in range(len(inputs))] +# current_pos = [next_read[0].pos for next_read in next_reads] +# last_reads = [False for input in inputs] +# +# for pos in range(CHROM_LENGTHS[chromosome]): +# save_pos.append(pos) +# if (not any(x == pos for x in current_pos)): +# continue +# if any(last_reads): +# break +# next_reads = [[read for read in reads if read.pos >= pos] for reads in next_reads] +# save_next_reads_len.append(np.mean([len(x) for x in next_reads])) +# current_reads = copy.deepcopy(next_reads) +# for sample in range(len(inputs)): +# if len(next_reads[sample]) > 0 and next_reads[sample][0].pos != pos: +# continue +# while True: +# # print(f'current sample: {samples[sample]}') +# # print(f'current read: {next_read}') +# # print(f'current position: {pos}') +# # print(f'current read position: {next_read.pos}') +# try: +# next_read = edit_read_group(next(inputs[sample])) +# except StopIteration: +# last_reads[sample] = True +# break +# +# if next_read.pos == pos and (not last_reads[sample]): +# current_reads[sample].append(next_read) +# elif not last_reads[sample]: +# next_reads[sample].append(next_read) +# break +# save_current_reads_len.append(np.mean([len(x) for x in current_reads])) +# current_pos = [next_read[-1].pos for next_read in next_reads] +# +# # contamination +# current_reads_actually_at_position = [[read for read in reads if read.pos == pos] for reads in current_reads] +# save_current_reads_actually_at_position_len.append(np.mean([len(x) for x in current_reads_actually_at_position])) +# for sample in range(len(inputs)): +# for read in current_reads_actually_at_position[sample]: +# contaminate = np.random.binomial(1, contam_rate) +# if not contaminate: +# outputs[sample].write(read.tostring() + "\n") +# else: +# indexes = list( +# filter(lambda x: x not in [sample] and len(current_reads_actually_at_position[x]) > 0, +# range(0, len(inputs)))) +# if len(indexes)==0: +# continue +# random_ind = choice(indexes) +# random_sample_reads = current_reads_actually_at_position[random_ind] +# random_read_ind = random.randint(0, len(random_sample_reads) - 1) +# random_read = random_sample_reads[random_read_ind] +# outputs[sample].write(random_read.tostring() + "\n") +# for output in outputs: +# output.close() +# for input in inputs: +# input.close() +# df = pd.DataFrame(data=zip(save_pos, save_current_reads_len, save_next_reads_len, save_current_reads_actually_at_position_len), +# columns=['Position', 'Current_reads_length', 'Next_reads_length', 'Current_reads_actually_at_position_length']) +# with hl.hadoop_open(f'{MY_BUCKET}/mixed_samples/v2/reads_summary_{chromosome}.csv', 'w') as f: +# df.to_csv(f) def mixing_many_samples( input_list: List, @@ -83,9 +159,9 @@ def mixing_many_samples( cram_input in input_list] outputs = [open_pipes_output(ref_fasta=input_ref_fasta, output_name=output_list[i]) for i in range(len(sample_list))] - main_rgs = [get_read_groups(input) for input in inputs] - print(main_rgs) - next_reads = [[edit_read_group(next(inputs[i].fetch(chromosome, until_eof=True)), main_rgs[i])] for i in range(len(inputs))] + # main_rgs = [get_read_groups(input) for input in inputs] + # print(main_rgs) + next_reads = [[edit_read_group(next(inputs[i].fetch(chromosome, until_eof=True)))] for i in range(len(inputs))] current_pos = [next_read[0].pos for next_read in next_reads] last_reads = [False for input in inputs] @@ -105,7 +181,7 @@ def mixing_many_samples( # print(f'current position: {pos}') # print(f'current read position: {next_read.pos}') try: - next_read = edit_read_group(next(inputs[sample]), main_rgs[sample]) + next_read = edit_read_group(next(inputs[sample])) except StopIteration: last_reads[sample] = True break @@ -127,7 +203,7 @@ def mixing_many_samples( else: indexes = list( filter(lambda x: x not in [sample] and len(current_reads_actually_at_position[x]) > 0, - range(0, len(inputs) - 1))) + range(0, len(inputs)))) if len(indexes)==0: continue random_ind = choice(indexes) @@ -141,7 +217,6 @@ def mixing_many_samples( input.close() - def main(): backend = hb.ServiceBackend( billing_project=BILLING_PROJECT, @@ -172,27 +247,12 @@ def main(): samples.append(sample[1][0]) cram_files[sample[1][0]] = (f"{sample[1][1]}", f"{sample[1][1]}.crai") - if args.test: - samples = samples[:3] - - - - - # for i in range(len(samples)): - # sample = samples[i] - # input_cram_file = b.read_input_group( - # cram=cram_files[sample][0], index=cram_files[sample][1] - # ) - # j_header = b.new_job( - # name=f"get_header_{sample}", - # attributes={"sample_id": sample, "job_type": "get_header"}, - # ) - # j_header.image(SAMTOOLS_IMAGE).storage("15Gi").memory('15Gi') - # j_header.command( - # f"samtools view -H {input_cram_file['cram']} > {j_header[f'{sample}']}" - # ) + # if args.test: + # samples = samples[:4] + mixed_labels = [] for contam_rate in CONTAM_RATES: + print(contam_rate) mixed_crams = {} mixed_gvcfs = {} mix_gvcf_job_depend_list = [] @@ -201,130 +261,134 @@ def main(): mixing_samples_label = f"{sample}_{contam_rate * 100}" mixed_crams[mixing_samples_label] = {} mixed_gvcfs[mixing_samples_label] = {} - for chromosome in CHROMOSOMES: - if args.test: - chromosome = 'chr21' - output_mix_cram_paths = [f'{MY_BUCKET}/mixed_samples/v2/crams/cram_by_chrom/{sample}/contam_rate_{contam_rate*100}/{sample}_{chromosome}_{contam_rate*100}.cram' for sample in samples] - output_mix_vcf_paths = [ - f'{MY_BUCKET}/mixed_samples/v2/variant-calling/{sample}/{sample}_{contam_rate * 100}/{sample}_{contam_rate * 100}_{chromosome}.g.vcf' - for sample in samples] - j_reheader_depend_on = None - if any([not hl.hadoop_exists(path) for path in output_mix_cram_paths]): - logger.info( - f"Mixing contamination free crams: {chromosome}_{contam_rate*100}_percent_contamination..." - ) - j_mix = b.new_python_job( - name=f"Mix_all_samples_{chromosome}", - attributes={ - "contam_rate": f"{contam_rate * 100}\%", - "chromosome": chromosome, - "job_type": "mixing_samples", - }, - ) - j_mix.storage("50Gi").memory("10Gi") - cram_paths = [ - f"{MY_BUCKET}/contam_free/crams/cram_by_chrom/{sample_id}/{sample_id}_contam_free_{chromosome}.cram" - for sample_id in samples] - inputs = [b.read_input_group(**{"cram": path, "cram.crai": f"{path}.crai"}) for path in - cram_paths] - outputs = [j_mix[f'{sample}_{chromosome}'] for sample in samples] - - j_mix.call( - mixing_many_samples, - input_list=inputs, - output_list=outputs, - sample_list=samples, - input_ref_fasta=input_ref_fasta['fasta'], - chromosome=chromosome, - contam_rate=contam_rate - ) - j_reheader_depend_on = j_mix - mix_cram_job_depend_list.append(j_mix) - - for i in range(len(samples)): - mixing_samples_label = f"{samples[i]}_{contam_rate * 100}" - output_cram_path = output_mix_cram_paths[i] - b.write_output(outputs[i], output_cram_path) - mixed_crams[mixing_samples_label][chromosome] = outputs[i] - - for i in range(len(samples)): - sample = samples[i] - mixing_samples_label = f"{sample}_{contam_rate * 100}" - j_gvcf_depend_on = None - if not hl.hadoop_exists(f"{output_mix_cram_paths[i]}.crai"): - j_reheader = b.new_job( - name=f"Reheader_contam_free_file_{mixing_samples_label}_{chromosome}", + if args.run_mixing: + for chromosome in CHROMOSOMES: + if args.test: + chromosome = 'chr21' + output_mix_cram_paths = [f'{MY_BUCKET}/mixed_samples/v2/crams/cram_by_chrom/{sample}/contam_rate_{contam_rate*100}/{sample}_{chromosome}_{contam_rate*100}.cram' for sample in samples] + output_mix_vcf_paths = [ + f'{MY_BUCKET}/mixed_samples/v2/variant-calling/{sample}_{contam_rate * 100}/{sample}_{contam_rate * 100}_{chromosome}.g.vcf' + for sample in samples] + j_reheader_depend_on = None + if any([not hl.hadoop_exists(path) for path in output_mix_cram_paths]): + logger.info( + f"Mixing contamination free crams: {chromosome}_{contam_rate*100}_percent_contamination..." + ) + j_mix = b.new_python_job( + name=f"Mix_all_samples_{chromosome}_{contam_rate*100}", attributes={ - "sample": sample, "contam_rate": f"{contam_rate * 100}\%", "chromosome": chromosome, - "job_type": "run_reheader", + "job_type": "mixing_samples", }, ) - j_reheader.image(SAMTOOLS_IMAGE).storage("40Gi").memory("15Gi") - - if j_reheader_depend_on is not None: - j_reheader.depends_on(j_reheader_depend_on) - logger.info( - f"Reheadering contamination free cram: {sample}_{chromosome}_{contam_rate * 100}_percent_contamination..." + if chromosome == 'chr2': + j_mix.storage("100Gi").memory("50Gi") + else: + j_mix.storage("100Gi").memory("15Gi") + cram_paths = [ + f"{MY_BUCKET}/contam_free/crams/cram_by_chrom/{sample_id}/{sample_id}_contam_free_{chromosome}.cram" + for sample_id in samples] + inputs = [b.read_input_group(**{"cram": path, "cram.crai": f"{path}.crai"}) for path in + cram_paths] + outputs = [j_mix[f'{sample}_{chromosome}'] for sample in samples] + + j_mix.call( + mixing_many_samples, + input_list=inputs, + output_list=outputs, + sample_list=samples, + input_ref_fasta=input_ref_fasta['fasta'], + chromosome=chromosome, + contam_rate=contam_rate ) - input_cram_file = b.read_input_group( - cram=cram_files[sample][0], index=cram_files[sample][1] - ) - tmp_mix_cram = b.read_input(output_mix_cram_paths[i]) - j_reheader.command( - f"samtools view -H {input_cram_file['cram']} > {j_reheader.header}" - ) - j_reheader.command( - f"samtools reheader {j_reheader.header} {tmp_mix_cram} > {j_reheader.ofile1}" - ) - j_reheader.command( - f"samtools index {j_reheader.ofile1} -o {j_reheader.ofile2}" - ) - mix_cram_job_depend_list.append(j_reheader) - b.write_output(j_reheader.ofile1, output_mix_cram_paths[i]) - b.write_output(j_reheader.ofile2, f"{output_mix_cram_paths[i]}.crai") - mixed_crams[mixing_samples_label][chromosome] = j_reheader.ofile1 - j_gvcf_depend_on.append(j_reheader) - else: - path = output_mix_cram_paths[i] - input_mixed_cram = b.read_input_group(**{"cram": path, "cram.crai": f"{path}.crai"}) - mixed_crams[mixing_samples_label][chromosome] = input_mixed_cram['cram'] + j_reheader_depend_on = j_mix + mix_cram_job_depend_list.append(j_mix) - if not hl.hadoop_exists(output_mix_vcf_paths[i]): - logger.info( - f"Running haplotype caller: {sample}_{chromosome}_{contam_rate*100}_percent_contamination..." - ) - input_chrom_cram_file = b.read_input_group( - **{ - "cram": output_mix_cram_paths[i], - "cram.crai": f"{output_mix_cram_paths[i]}.crai", - } - ) - tmp_gvcf, tmp_job = haplotype_caller_gatk( - b=b, - depend_on=j_gvcf_depend_on, - input_bam=input_chrom_cram_file, - ref_fasta=input_ref_fasta, - interval_list_file=chromosome, - out_dir=f"{MY_BUCKET}/mixed_samples/v2", - contamination=0.0, - bam_filename_no_ext=f"{sample}_{contam_rate * 100}_{chromosome}", - storage=40, - interval_list_name=None, - memory=26, - ) - mix_gvcf_job_depend_list.append(tmp_job) - mixed_gvcfs[mixing_samples_label][chromosome] = tmp_gvcf - else: - tmp_gvcf = b.read_input(output_mix_vcf_paths[i]) - mixed_gvcfs[mixing_samples_label][chromosome] = tmp_gvcf + for i in range(len(samples)): + mixing_samples_label = f"{samples[i]}_{contam_rate * 100}" + output_cram_path = output_mix_cram_paths[i] + b.write_output(outputs[i], output_cram_path) + mixed_crams[mixing_samples_label][chromosome] = outputs[i] - if args.test: - break - mixed_labels=[] + for i in range(len(samples)): + sample = samples[i] + mixing_samples_label = f"{sample}_{contam_rate * 100}" + j_gvcf_depend_on = None + if not hl.hadoop_exists(f"{output_mix_cram_paths[i]}.crai"): + j_reheader = b.new_job( + name=f"Reheader_contam_free_file_{mixing_samples_label}_{chromosome}", + attributes={ + "sample": sample, + "contam_rate": f"{contam_rate * 100}\%", + "chromosome": chromosome, + "job_type": "run_reheader", + }, + ) + j_reheader.image(SAMTOOLS_IMAGE).storage("40Gi").memory("15Gi") + + if j_reheader_depend_on is not None: + j_reheader.depends_on(j_reheader_depend_on) + logger.info( + f"Reheadering contamination free cram: {sample}_{chromosome}_{contam_rate * 100}_percent_contamination..." + ) + header_cram_file = b.read_input_group( + cram=cram_files['HGDP00021'][0], index=cram_files['HGDP00021'][1] + ) + tmp_mix_cram = b.read_input(output_mix_cram_paths[i]) + j_reheader.command( + f"samtools view -H {header_cram_file['cram']} > {j_reheader.header}" + ) + j_reheader.command( + f"samtools reheader {j_reheader.header} {tmp_mix_cram} > {j_reheader.ofile1}" + ) + j_reheader.command( + f"samtools index {j_reheader.ofile1} -o {j_reheader.ofile2}" + ) + mix_cram_job_depend_list.append(j_reheader) + b.write_output(j_reheader.ofile1, output_mix_cram_paths[i]) + b.write_output(j_reheader.ofile2, f"{output_mix_cram_paths[i]}.crai") + mixed_crams[mixing_samples_label][chromosome] = j_reheader.ofile1 + j_gvcf_depend_on = j_reheader + else: + path = output_mix_cram_paths[i] + input_mixed_cram = b.read_input_group(**{"cram": path, "cram.crai": f"{path}.crai"}) + mixed_crams[mixing_samples_label][chromosome] = input_mixed_cram['cram'] + + if not hl.hadoop_exists(output_mix_vcf_paths[i]): + logger.info( + f"Running haplotype caller: {sample}_{chromosome}_{contam_rate*100}_percent_contamination..." + ) + input_chrom_cram_file = b.read_input_group( + **{ + "cram": output_mix_cram_paths[i], + "cram.crai": f"{output_mix_cram_paths[i]}.crai", + } + ) + tmp_gvcf, tmp_job = haplotype_caller_gatk( + b=b, + depend_on=j_gvcf_depend_on, + input_bam=input_chrom_cram_file, + ref_fasta=input_ref_fasta, + interval_list_file=chromosome, + out_dir=f"{MY_BUCKET}/mixed_samples/v2", + contamination=0.0, + bam_filename_no_ext=f"{sample}_{contam_rate * 100}_{chromosome}", + storage=40, + interval_list_name=None, + memory=26, + ) + mix_gvcf_job_depend_list.append(tmp_job) + mixed_gvcfs[mixing_samples_label][chromosome] = tmp_gvcf + else: + tmp_gvcf = b.read_input(output_mix_vcf_paths[i]) + mixed_gvcfs[mixing_samples_label][chromosome] = tmp_gvcf + + if args.test: + break + for sample in samples: mixing_samples_label = f"{sample}_{contam_rate * 100}" mixed_labels.append(mixing_samples_label) @@ -348,11 +412,11 @@ def main(): tmp_cram_lst = reduce( lambda x, y: x + " " + y, mixed_crams[mixing_samples_label].values() ) - input_cram_file = b.read_input_group( - cram=cram_files[sample][0], index=cram_files[sample][1] + header_cram_file = b.read_input_group( + cram=cram_files['HGDP00021'][0], index=cram_files['HGDP00021'][1] ) j_cat.command( - f"samtools view -H {input_cram_file['cram']} > {j_cat.header}" + f"samtools view -H {header_cram_file['cram']} > {j_cat.header}" ) j_cat.command( f"samtools cat -h {j_cat.header} -o {j_cat.ofile1} {tmp_cram_lst}" @@ -380,7 +444,7 @@ def main(): ) if not hl.hadoop_exists( f"{MY_BUCKET}/mixed_samples/v2/merged-gvcf/{mixing_samples_label}.g.vcf.gz" - ) and hl.hadoop_exists(f'{MY_BUCKET}/mixed_samples/v2/variant-calling/{mixing_samples_label}/*.vcf'): + ) and hl.hadoop_exists(f'{MY_BUCKET}/mixed_samples/v2/variant-calling/{sample}_{contam_rate * 100}/{sample}_{contam_rate * 100}_chr1.g.vcf'): logger.info(f"Running merge gvcfs: {mixing_samples_label}...") gvcfs_to_merge = hl.utils.hadoop_ls( f'{MY_BUCKET}/mixed_samples/v2/variant-calling/{mixing_samples_label}/*.vcf') @@ -410,29 +474,16 @@ def main(): f"{MY_BUCKET}/mixed_samples/v2/merged-gvcf/{mixing_samples_label}.g.vcf.gz" ): logger.info(f"Indexing gvcf: {mixing_samples_label}...") - gvcf_index_file = index_gvcf( + index_gvcf( b=b, input_vcf=merged_vcf, output_vcf_ind_name=f"{mixing_samples_label}", - out_dir=f"{MY_BUCKET}/mixed_samples", + out_dir=f"{MY_BUCKET}/mixed_samples/v2", storage="15", memory="15" ) if args.test: break - if args.run_freemix_sum_table: - contam_est = [] - split_labels = pd.DataFrame( - [i.split("_") for i in mixed_labels], - columns=["original", "contam_rate"], - ) - for s in mixed_labels: - contam_est.append( - check_contam(1, f"{MY_BUCKET}/mixed_samples/v2/verifybamid/", s) - ) - split_labels["freemix_score"] = contam_est - mixed_table = hl.Table.from_pandas(split_labels) - mixed_table.write(f"{MY_BUCKET}/hgdp_n_way_mixed_sample_freemix_score.ht") b.run() @@ -454,20 +505,15 @@ def main(): action="store_true", ) parser.add_argument( - "--run-merge-gvcf-files", - help="Whether to run freemix summary table", + "--run_mixing", + help="Whether to run mixing chromosomes", action="store_true", ) parser.add_argument( - "--run-freemix-sum-table", + "--run-merge-gvcf-files", help="Whether to run freemix summary table", action="store_true", ) - parser.add_argument( - "--disable-sanity-check", - help="Whether to use sanity check in verifybamID", - action="store_true", - ) args = parser.parse_args() print(args) - main() + main() \ No newline at end of file diff --git a/run_vds_combiner.py b/run_vds_combiner.py index 238711f..3e34488 100644 --- a/run_vds_combiner.py +++ b/run_vds_combiner.py @@ -1,5 +1,6 @@ import hail as hl import argparse +import pandas as pd BILLING_PROJECT = "gnomad-production" MY_BUCKET = "gs://gnomad-wenhan/charr_simulation" @@ -33,38 +34,137 @@ POPs = ["afr", "amr", "eas", "mid", "nfe", "sas"] +def check_contam( + contamination_underestimation_factor: float, + output_path: str, + output_prefix: str, +): + # used to read from the selfSM file and calculate contamination, which gets printed out + import csv + import sys + import pickle + import subprocess + local = subprocess.check_output("pwd", shell=True) + local = local.decode("utf-8").split("\n")[0] + hl.hadoop_copy( + f"{output_path}{output_prefix}.selfSM", + f"file://{local}/contam_temp.selfSM", + ) + with open(f"{local}/contam_temp.selfSM", 'rt') as selfSM: + reader = csv.DictReader(selfSM, delimiter='\t') + i = 0 + for row in reader: + if float(row["FREELK0"]) == 0 and float(row["FREELK1"]) == 0: + # a zero value for the likelihoods implies no data. This usually indicates a problem rather than a real event. + # if the bam isn't really empty, this is probably due to the use of a incompatible reference build between + # vcf and bam. + sys.stderr.write( + "Found zero likelihoods. Bam is either very-very shallow, or aligned to the wrong reference (relative to the vcf).") + sys.exit(1) + print(f'{output_prefix} contamination rate: {float(row["FREEMIX"]) /contamination_underestimation_factor}') + c = float(row["FREEMIX"]) /contamination_underestimation_factor + i = i + 1 + # there should be exactly one row, and if this isn't the case the format of the output is unexpectedly different + # and the results are not reliable. + if i != 1: + sys.stderr.write("Found %d rows in .selfSM file. Was expecting exactly 1. This is an error" % (i)) + sys.exit(2) + return c + def main(): - gvcfs_to_combine = hl.utils.hadoop_ls(f'{args.input_gvcf_bucket}/merged-gvcf/*.vcf.gz') - gvcfs_list = [] - gvcf_names = [] - for file in gvcfs_to_combine: - if hl.hadoop_exists(f"{file['path']}.tbi"): - gvcfs_list.append(file['path']) - gvcf_names.append(file['path'].split("/")[-1][:-9]) - print(gvcf_names) - - combiner = hl.vds.new_combiner( - output_path=f"{MY_BUCKET}/{args.output_vds_name}.vds", - temp_path=TMP_DIR, - gvcf_paths=gvcfs_list, - gvcf_sample_names=gvcf_names, - gvcf_external_header="gs://gnomad-wenhan/charr_simulation/header.txt", - use_genome_default_intervals=True, - reference_genome="GRCh38", - ) - ## Run combiner - combiner.run() + if args.run_freemix_sum_table: + mixed_labels = ['HGDP00021_0.5', 'HGDP00105_0.5', 'HGDP00118_0.5', 'HGDP00281_0.5', 'HGDP00341_0.5', + 'HGDP00529_0.5', 'HGDP00610_0.5', 'HGDP00669_0.5', 'HGDP00688_0.5', 'HGDP00689_0.5', + 'HGDP00690_0.5', 'HGDP00703_0.5', 'HGDP00714_0.5', 'HGDP00726_0.5', 'HGDP00768_0.5', + 'HGDP00845_0.5', 'HGDP00875_0.5', 'HGDP00905_0.5', 'HGDP00931_0.5', 'HGDP00944_0.5', + 'HGDP01009_0.5', 'HGDP01050_0.5', 'HGDP01092_0.5', 'HGDP01096_0.5', 'HGDP01190_0.5', + 'HGDP01243_0.5', 'HGDP01371_0.5', 'HGDP01385_0.5', 'HGDP01396_0.5', 'HGDP01406_0.5', + 'HGDP00021_1.0', 'HGDP00105_1.0', 'HGDP00118_1.0', 'HGDP00281_1.0', 'HGDP00341_1.0', + 'HGDP00529_1.0', 'HGDP00610_1.0', 'HGDP00669_1.0', 'HGDP00688_1.0', 'HGDP00689_1.0', + 'HGDP00690_1.0', 'HGDP00703_1.0', 'HGDP00714_1.0', 'HGDP00726_1.0', 'HGDP00768_1.0', + 'HGDP00845_1.0', 'HGDP00875_1.0', 'HGDP00905_1.0', 'HGDP00931_1.0', 'HGDP00944_1.0', + 'HGDP01009_1.0', 'HGDP01050_1.0', 'HGDP01092_1.0', 'HGDP01096_1.0', 'HGDP01190_1.0', + 'HGDP01243_1.0', 'HGDP01371_1.0', 'HGDP01385_1.0', 'HGDP01396_1.0', 'HGDP01406_1.0', + 'HGDP00021_2.0', 'HGDP00105_2.0', 'HGDP00118_2.0', 'HGDP00281_2.0', 'HGDP00341_2.0', + 'HGDP00529_2.0', 'HGDP00610_2.0', 'HGDP00669_2.0', 'HGDP00688_2.0', 'HGDP00689_2.0', + 'HGDP00690_2.0', 'HGDP00703_2.0', 'HGDP00714_2.0', 'HGDP00726_2.0', 'HGDP00768_2.0', + 'HGDP00845_2.0', 'HGDP00875_2.0', 'HGDP00905_2.0', 'HGDP00931_2.0', 'HGDP00944_2.0', + 'HGDP01009_2.0', 'HGDP01050_2.0', 'HGDP01092_2.0', 'HGDP01096_2.0', 'HGDP01190_2.0', + 'HGDP01243_2.0', 'HGDP01371_2.0', 'HGDP01385_2.0', 'HGDP01396_2.0', 'HGDP01406_2.0', + 'HGDP00021_5.0', 'HGDP00105_5.0', 'HGDP00118_5.0', 'HGDP00281_5.0', 'HGDP00341_5.0', + 'HGDP00529_5.0', 'HGDP00610_5.0', 'HGDP00669_5.0', 'HGDP00688_5.0', 'HGDP00689_5.0', + 'HGDP00690_5.0', 'HGDP00703_5.0', 'HGDP00714_5.0', 'HGDP00726_5.0', 'HGDP00768_5.0', + 'HGDP00845_5.0', 'HGDP00875_5.0', 'HGDP00905_5.0', 'HGDP00931_5.0', 'HGDP00944_5.0', + 'HGDP01009_5.0', 'HGDP01050_5.0', 'HGDP01092_5.0', 'HGDP01096_5.0', 'HGDP01190_5.0', + 'HGDP01243_5.0', 'HGDP01371_5.0', 'HGDP01385_5.0', 'HGDP01396_5.0', 'HGDP01406_5.0', + 'HGDP00021_10.0', 'HGDP00105_10.0', 'HGDP00118_10.0', 'HGDP00281_10.0', 'HGDP00341_10.0', + 'HGDP00529_10.0', 'HGDP00610_10.0', 'HGDP00669_10.0', 'HGDP00688_10.0', 'HGDP00689_10.0', + 'HGDP00690_10.0', 'HGDP00703_10.0', 'HGDP00714_10.0', 'HGDP00726_10.0', 'HGDP00768_10.0', + 'HGDP00845_10.0', 'HGDP00875_10.0', 'HGDP00905_10.0', 'HGDP00931_10.0', 'HGDP00944_10.0', + 'HGDP01009_10.0', 'HGDP01050_10.0', 'HGDP01092_10.0', 'HGDP01096_10.0', 'HGDP01190_10.0', + 'HGDP01243_10.0', 'HGDP01371_10.0', 'HGDP01385_10.0', 'HGDP01396_10.0', 'HGDP01406_10.0'] + + contam_est = [] + split_labels = pd.DataFrame( + [i.split("_") for i in mixed_labels], + columns=["original", "contam_rate"], + ) + for s in mixed_labels: + contam_est.append( + check_contam(1, f"{MY_BUCKET}/mixed_samples/v2/verifybamid/", s) + ) + split_labels["freemix_score"] = contam_est + mixed_table = hl.Table.from_pandas(split_labels) + mixed_table.write(f"{MY_BUCKET}/hgdp_n_way_mixed_sample_freemix_score.ht") + + if args.run_vds_combiner: + gvcfs_to_combine = hl.utils.hadoop_ls(f'{args.input_gvcf_bucket}/merged-gvcf/*.vcf.gz') + gvcfs_list = [] + gvcf_names = [] + for file in gvcfs_to_combine: + if hl.hadoop_exists(f"{file['path']}.tbi"): + gvcfs_list.append(file['path']) + gvcf_names.append(file['path'].split("/")[-1][:-9]) + print(gvcf_names) + + combiner = hl.vds.new_combiner( + output_path=f"{MY_BUCKET}/{args.output_vds_name}.vds", + temp_path=TMP_DIR, + gvcf_paths=gvcfs_list, + gvcf_sample_names=gvcf_names, + gvcf_external_header="gs://gnomad-wenhan/charr_simulation/header.txt", + use_genome_default_intervals=True, + reference_genome="GRCh38", + ) + + ## Run combiner + combiner.run() if __name__ == "__main__": parser = argparse.ArgumentParser() + parser.add_argument( + "--run-freemix-sum-table", + help="Whether to run freemix summary table", + action="store_true", + ) + parser.add_argument( + "--disable-sanity-check", + help="Whether to use sanity check in verifybamID", + action="store_true", + ) parser.add_argument( "--sample_ids", help="Path to a list of sample IDs to merge", nargs="?", ) + parser.add_argument( + "--run-vds-combiner", + help="Whether to run freemix summary table", + action="store_true", + ) parser.add_argument( "--input-gvcf-bucket", help="Input gvcf bucket", @@ -81,4 +181,5 @@ def main(): main() # hailctl dataproc submit wlu run_vds_combiner.py --output-vds-name hgdp_decontaminated_28_samples -# hailctl dataproc submit wlu run_vds_combiner.py --input-gvcf-bucket gs://gnomad-wenhan/charr_simulation/mixed_samples/ --output-vds-name hgdp_mixed_samples_full \ No newline at end of file +# hailctl dataproc submit wlu run_vds_combiner.py --input-gvcf-bucket gs://gnomad-wenhan/charr_simulation/mixed_samples/ --output-vds-name hgdp_mixed_samples_full +# hailctl dataproc submit wlu run_vds_combiner.py --input-gvcf-bucket gs://gnomad-wenhan/charr_simulation/mixed_samples/v2/ --output-vds-name hgdp_n_way_mixed_samples_115 \ No newline at end of file