diff --git a/access_genbank.py b/access_genbank.py index 32ef2a8..d045394 100644 --- a/access_genbank.py +++ b/access_genbank.py @@ -9,19 +9,18 @@ #------------------------------- def build_search_term(gene_name, min_len=0, max_len=-1): - """ generate a search query for NCBI's nucleotide database (GenBank). Bio.Entrez does not provide filters such as sequence length and gene name + """ + Generate a search query for NCBI's nucleotide database (GenBank). Bio.Entrez does not provide filters such as sequence length and gene name so a search query keywords must be used instead. Used to make database queries in extract_database and download_database - Parameters - ---------- - gene_name: (str) name of the gene of interest - min_len: (int, optional) lower bound of the sequence length filter, by default 0 - max_len: (int, optional) upper bound of the sequence length filter, by default -1 means no limit + Parameters: + gene_name(str): name of the gene of interest + min_len(int, optional): lower bound of the sequence length filter, by default 0 + max_len(int, optional): upper bound of the sequence length filter, by default -1 means no limit - Returns - ---------- - term: (str) search query + Returns: + term(str): search query """ term = f"{gene_name}[Gene Name]" if max_len == -1: @@ -32,17 +31,16 @@ def build_search_term(gene_name, min_len=0, max_len=-1): def extract_database(gene_name, seqlen_start=0, seqlen_stop=-1): - '''Request data on GenBank from NCBI (with Entrez module), with filtering options + ''' + Request data on GenBank from NCBI (with Entrez module), with filtering options - Parameters - ---------- - gene_name: (str) name of the gene to extract from GenBank - seqlen_start: (int, optional) lower bound of the sequence length filter, by default 0 - seqlen_stop: (int, optional) upper bound of the sequence length filter, by default -1 means no limit + Parameters: + gene_name(str): name of the gene to extract from GenBank + seqlen_start(int, optional): lower bound of the sequence length filter, by default 0 + seqlen_stop(int, optional): upper bound of the sequence length filter, by default -1 means no limit - Returns - ---------- - seq_record: (str) raw data of all sequences + Returns: + seq_record(str): raw data of all sequences ''' handle = Entrez.esearch(db= "nucleotide", term= build_search_term(gene_name, seqlen_start, seqlen_stop), retmax = 10000) record = Entrez.read(handle) @@ -60,59 +58,94 @@ def extract_database(gene_name, seqlen_start=0, seqlen_stop=-1): def download_database(gene_name, seqlen_start=0, seqlen_stop=-1): - '''Download data to a .fasta file, with filtering options + ''' + Download data to a .fasta file, with filtering options Args: - gene_name: (str) name of the gene to extract from GenBank - seqlen_start: (int) lower bound of the sequence length filter - seqlen_stop: (int) upper bound of the sequence length filter + gene_name(str): name of the gene to extract from GenBank + seqlen_start(int): lower bound of the sequence length filter + seqlen_stop(int): upper bound of the sequence length filter Returns: - data_path: (str) path to downloaded data .fasta file + data_path(str): path to downloaded data .fasta file ''' data_path = f"{gene_name}[{seqlen_start},{seqlen_stop}].fasta" with open(data_path, mode="w") as file: file.write(extract_database(gene_name, seqlen_start, seqlen_stop)) return data_path -def download_sequence(species, gene_name, dst, start_length=None, stop_length= None, id = None, permissive_search = True): +def download_sequence_from_id(id: list, dst = None): + """ + download sequence from GenBank through the Entrez database using unique identifiers. + + Args: + id (list): list of IDs of sequences to download from GenBank + dst(str): destination file path + + Returns: + Bio.SeqIO.FastaIO.FastaIterator: iterator over the records corresponding to the input ids + + """ + handle = Entrez.efetch(db="nucleotide", id=id, retmode = "fasta", rettype = "fasta") + record_parser = SeqIO.parse(handle, "fasta") + if dst != None: + with open(dst, mode="a") as writer: + SeqIO.write(record_parser,writer,"fasta") + return SeqIO.parse(dst,"fasta") + return record_parser + +def get_species_from_ids(ids:list): """ - download sequence from GenBank through the Entrez database. + Return the species from GenBank entries with certain ids. Parameters: - ---------- - species(str): name of species - gene_name(str): name of gene - dst(str,Path-like): destination file path - start_length(int): minimum length of sequence - stop_length(int): maximum length of sequence - id(list): list of NCBi ids of sequences to download. If provided, overrides gene_name and species. - permissive_search(bool, default = True): when True, if Advanced NCBI query returns nothing, replace it with a less precise general query. + ids: list of ids to search + + Returns: + dictionary that matches ids to organism names + + """ + species_dict= dict() + for id in ids: + handle = Entrez.efetch(db="nucleotide", id=id, retmode = "gb", rettype = "gb") + records = SeqIO.parse(handle,"genbank") + one_record = next(records) + species_dict[id]=one_record.annotations["organism"] + return species_dict + +def download_sequence(species, gene_name, dst = None, start_length=None, stop_length= None, permissive_search = True): + """ + Download sequence from GenBank through the Entrez database. + + Parameters: + species(str): name of species + gene_name(str): name of gene + dst(str,Path-like): destination file path, if None does not write to a file, only returns + start_length(int): minimum length of sequence + stop_length(int): maximum length of sequence + id(list): list of NCBi ids of sequences to download. If provided, overrides gene_name and species. + permissive_search(bool, default = True): when True, if Advanced NCBI query returns nothing, replace it with a less precise general query. + + Returns: + Bio.SeqRecord.SeqRecord: sequence found from GenBank + """ - if id == None: - search_term = f"{gene_name}[Gene Name] AND {species}[Organism]" - if start_length!= None or stop_length!= None: - search_term += f" {start_length}:{stop_length}[Sequence Length]" + + search_term = f"{gene_name}[Gene Name] AND {species}[Organism]" + if start_length!= None or stop_length!= None: + search_term += f" {start_length}:{stop_length}[Sequence Length]" + handle = Entrez.esearch(db = "nucleotide", term = search_term, retmax = 1, idtype = "acc") + search_result = Entrez.read(handle) + handle.close() + id = search_result["IdList"] + if len(id)==0 and permissive_search: + search_term = f"{gene_name} {species} {start_length}:{stop_length}[Sequence Length]" handle = Entrez.esearch(db = "nucleotide", term = search_term, retmax = 1, idtype = "acc") search_result = Entrez.read(handle) handle.close() id = search_result["IdList"] - n=0 - for i in id: - n+=1 - if n==0 and permissive_search: - search_term = f"{gene_name} {species} {start_length}:{stop_length}[Sequence Length]" - handle = Entrez.esearch(db = "nucleotide", term = search_term, retmax = 1, idtype = "acc") - search_result = Entrez.read(handle) - handle.close() - id = search_result["IdList"] - n=0 - for i in id: - n+=1 - if n==1: - handle = Entrez.efetch(db="nucleotide", id=id, retmode = "fasta", rettype = "fasta") - sequence = handle.read() - handle.close() - with open(dst, mode="a") as writer: - writer.write(sequence) \ No newline at end of file + if len(id)==1: + return next(download_sequence_from_id(id,dst)) + else: + raise ValueError("Your search did not return any results") diff --git a/create_fake_dataset.py b/create_fake_dataset.py new file mode 100644 index 0000000..a343afe --- /dev/null +++ b/create_fake_dataset.py @@ -0,0 +1,34 @@ +from access_genbank import download_sequence_from_id +from generate_fastq import generate_fastq, multiplex +import os +import os.path as ospath +import argparse +from Bio import SeqIO + +def main(): + + destination_folder = "test_fake4" + ids= ["ON409953.1", "AF288129.1", "AF288126.1", "AF288125.1" ] + multiplexed = True + + + + destination_folder_path= ospath.join("data", destination_folder) + if not ospath.exists(destination_folder_path): + os.makedirs(destination_folder_path) + + record_parser = download_sequence_from_id(ids) + + for id, record in zip(ids,record_parser): + new_demultiplexed_fastq_path = ospath.join(destination_folder_path, str(id)+".fastq") + generate_fastq(record.seq, 100, new_demultiplexed_fastq_path) + print("left for loop") + if multiplexed: + multiplex_path=ospath.join(destination_folder_path,"multiplexed.fastq") + multiplex([destination_folder_path],multiplex_path) + + for record in SeqIO.parse(multiplex_path,"fastq"): + print(record) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/generate_fastq.py b/generate_fastq.py new file mode 100644 index 0000000..272665e --- /dev/null +++ b/generate_fastq.py @@ -0,0 +1,214 @@ +from Bio import SeqIO +import random +from Bio.Seq import Seq +import numpy as np +import os +import os.path as ospath +import numpy as np +import scipy.stats +import matplotlib.pyplot as plt + + +def generate_weighted_sequence_variant(sequence, weights=[0.25, 0.25, 0.25, 0.25]): + """ + Flips an initial sequence as it could happen when using the MinIon tool + + Arguments: + sequence (Seq): sequence of interest + weights(list of 4 float): probability of each sequence version (initial sequence, reverse complement, complement, reverse initial) + + Returns: + selected_variant(Seq): randomly reversed sequence + variant_description(str): string indicating which variant is selected + """ + variants = ["initial_sequence", "reverse_complement", "complement", "reverse_initial"] + variant_sequences = [sequence, sequence.reverse_complement(), sequence.complement(), sequence[::-1]] + + selected_variant_index = random.choices(range(len(variants)), weights=weights, k=1)[0] + selected_variant = variant_sequences[selected_variant_index] + variant_description = variants[selected_variant_index] + + return selected_variant, variant_description + + + +def break_prob_function(position, sequence_length): + """ + Example of probability function usable for the previous function + + Arguments: + position(int): position in sequence + sequence_length(int) + Returns: + probability at a specific position in sequence(float) + """ + max_prob = 0.5 # Max probability at start and end of sequence + return max_prob * (position / sequence_length) + +def bimodal_distribution(position,sequence_length): + """ + Probability density function of a bimodal normal distribution, highest chance is centered around positions at one or two thirds of sequence length + + Parameters + ---------- + position: position in the sequence for which probability of event is calculated + sequence_length: length of sequence + + Returns + --------- + probability of event, between 0 and 1 + """ + peaks = [sequence_length/3, 2*sequence_length/3] + return 0.5* scipy.stats.norm(peaks[0],sequence_length/15).pdf(position) + 0.5*scipy.stats.norm(peaks[1],sequence_length/15).pdf(position) + + +def break_sequence_with_probability(sequence, break_prob_function = bimodal_distribution, nbreaks = 2, take_longest= False): + """ + Simulates breakages in the sequence, which could happen using the MinIon tool + + Arguments: + sequence(Seq): sequence of interest + break_prob_funtion(function): probability function used as weight for the breakages + + Returns: + final_sequence (Seq): final sequence after breakages + break_info (dict): dictionary containing information about the breakages + """ + new_sequence=sequence + break_info = {'number_of_breaks': 0, 'part_taken': ''} + + for i, base in enumerate(sequence): + # Computes the breakage probability + break_prob = break_prob_function(i, len(sequence)) + + # Checks if sequence breaks at this iteration + if random.random() <= break_prob: + if nbreaks > 0: + new_sequence = new_sequence[:i+break_info["number_of_breaks"]] +"N" +new_sequence[i+break_info["number_of_breaks"]:] + nbreaks -= 1 # Breakage marker is 'N' + break_info['number_of_breaks'] += 1 + broken_parts = new_sequence.split("N") + if take_longest: + length_index = lambda index: len(broken_parts[index]) + final_seq_index = max(range(len(broken_parts)), key=length_index) + else: + final_seq_index = random.choice(range(len(broken_parts))) + + final_seq=broken_parts[final_seq_index] + break_info["part_taken"] = final_seq_index + + return final_seq, break_info + + +def mutate(base): + """ + Simulates substitution mutation + + Arguments: + base(char) + + Returns: + muted base (char) + """ + # Bases list + bases = ['A', 'T', 'C', 'G'] + if base not in bases: + raise ValueError("Invalid base provided. Base must be one of 'A', 'T', 'C', or 'G'.") + bases.remove(base) + # Randomly selected new base + return random.choice(bases) + + +def assign_quality_scores(sequence, mutation_probability=0.1,mutation_mean = 16,mutation_sd = 3,basic_mean = 48,basic_sd = 3): + """ + Assigns fake scores for each base, following a different normal law if base is mutated or not + + Arguments: + sequence(Seq): sequence of interest + mutation_probability(float) + mutation_mean(int): mean of normal law associated with mutation + mutation_sd(int): standard deviation of normal law associated with mutation + basic_mean(int): mean of normal law associated with no-change bases + mutation_mean(int): standard deviation of normal law associated with no-change bases + + Returns: + quality score of sequence (in base 64 characters -> ASCII) + nb_mutations(int): number of mutations + mutations_positions(array of ints): positions of mutations + """ + quality_scores = [] + base64_table = """!"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~""" + nb_mutations=0 + mutations_positions= [] + for i, base in enumerate(sequence): + # Computes score for each base depending on mutation + if random.random() < mutation_probability: + # If mutation needed + base = mutate(base) + nb_mutations+=1 + mutations_positions.append(i+1) + quality_score = np.random.normal(mutation_mean, mutation_sd) + else: + # If no mutation needed + quality_score = np.random.normal(basic_mean, basic_sd) + # Limits the score between 0 and 93 (ASCII go from 33 to 126) + quality_score = max(min(quality_score, 93), 0) + base64_score = base64_table[int(round(quality_score))] + quality_scores.append(base64_score) + return ''.join(quality_scores), nb_mutations, mutations_positions + +def generate_fastq(input_seq, n_reads, dst, variants_weights= [0.25, 0.25,0.25,0.25], break_prob_function=break_prob_function, mutation_prob =0.1, quality_score_params= [10,2,50,5], nbreaks=2, take_longest=False): + + with open(dst, "a") as handle: # Operations on newly created or updated file + for i in range(n_reads) : + seq, orientation = generate_weighted_sequence_variant(input_seq,variants_weights) + broken_seq, break_info = break_sequence_with_probability(seq,break_prob_function, nbreaks,take_longest) + quality_string, n_mutations,mutations_positions = assign_quality_scores(broken_seq,mutation_prob,quality_score_params[0],quality_score_params[1],quality_score_params[2],quality_score_params[3]) + + variant_str = "Orientation_of_sequence="+orientation+" " + breakage_str = "Number_of_breakages="+str(break_info['number_of_breaks'])+" Part_taken="+str(break_info['part_taken'])+" " + mutations_str = "Number_of_mutations="+str(n_mutations)+" Positions="+str(mutations_positions) + + handle.write(f"@read{i} "+variant_str+breakage_str+mutations_str+'\n') #writes the sequence id + handle.write(str(broken_seq)+ '\n') #writes the sequence generated by transforming it into string + handle.write("+\n") #writes the + separator + handle.write(quality_string + '\n') #writes the quality score + +def multiplex(folder_paths: list, dst: str): + """ + merge multiple fastq files into one, while writing in the description the origin of each read + + Parameters + ---------- + folder_paths: list of folder paths to look for fastq files for merging + dst: path of the output merged fastq file + + """ + parsers=dict() + for folder_path in folder_paths: + for root,_,files in os.walk(folder_path): + for file in files: + if file.endswith(".fastq"): + file_path = ospath.join(root,file) + parsers[file_path] = list(SeqIO.parse(file_path, "fastq")) + + with open(dst, "a") as writer: + print("inside writing") + while parsers: + parent_file_path = random.choice(list(parsers.keys())) + current_list=parsers[parent_file_path] + index= random.randint(0, len(current_list)-1) + seqrecord = current_list[index] + seqrecord.description = seqrecord.description + " source=" + parent_file_path + SeqIO.write(seqrecord, writer, "fastq") + current_list.pop(index) + print("index popped") + if not current_list: + print("list popped") + parsers.pop(parent_file_path) + + + + + + diff --git a/process_fastq.py b/process_fastq.py index adbb759..c7fc7d6 100644 --- a/process_fastq.py +++ b/process_fastq.py @@ -2,6 +2,7 @@ import numpy as np from Bio import SeqIO, Align import os.path as ospath + import os import gzip import shutil @@ -33,8 +34,11 @@ def extract_gz(src_file, dst_file): def read_fastq(fastq_filepath=None): """ Read a fastq file and return the reads. - :param fastq_filepath: filepath of the .fastq file [str] - :return: reads from the fastq file [list of Bio.SeqRecord.SeqRecord] + + Args: + fastq_filepath (str): filepath of the .fastq file + Returns: + reads(list of Bio.SeqRecord.SeqRecord): reads from the fastq file """ if fastq_filepath is None: fastq_filepath = "data/rbcL_Qiagen_tomato.fastq" # default path (example) if fastq_filepath.lower().endswith('.gz'): @@ -82,9 +86,11 @@ def split_fastq(input_path, output_dir, base_name, percentile=20): def concatenate_fastq(src_folder=None, dst=None): """ Concatenate all .fastq from a folder into a single .fastq file. - :param folder: folder containing the .fastq files (usually fastq_pass folder) [str] - :param dst: destination file (.fastq) [str] - :return: None + Args: + folder (str): folder containing the .fastq files (usually fastq_pass folder) + dst (str): destination file (.fastq) + Returns: + None """ if src_folder is None: src_folder = "fastq_pass" # default folder (example) if dst is None: dst = f"{src_folder}/concatenation.fastq" # default destination (example) @@ -107,40 +113,42 @@ def get_file_iterator(filename): break -def extract_fastq(main_dir, barcode_nb, dst): +def extract_fastq(main_dir, sample_nb, dst): """ - extract all fastq files under a certain barcode/sample number from an expedition results folder. + Extract all fastq files under a certain barcode/sample number from an expedition results folder. - Parameters - ---------- - main_dir: expedition folder path - sample_nb: sample number for which to extract fastq. ie 6 to extract from folder barcode06 - dst: destination file path + Args: + main_dir(str): expedition folder path + sample_nb(int): sample number for which to extract fastq. ie 6 to extract from folder barcode06 + dst(str): destination file path + Returns: + None """ - search_dirs=[] for root, dirs, files in os.walk(main_dir): if "fastq_pass" in dirs: - search_dirs.append(ospath.join(root, "fastq_pass")) - if "barcoding" in dirs: - search_dirs.append(ospath.join(root, "barcoding")) - - for search_dir in search_dirs: - for root, dirs, files in os.walk(search_dir): - if f"barcode{barcode_nb}" in dirs: - fastq_dir = ospath.join(root, f"barcode{barcode_nb}") - print(fastq_dir) - concatenate_fastq(fastq_dir, dst) - elif f"barcode0{barcode_nb}" in dirs: - fastq_dir = ospath.join(root, f"barcode0{barcode_nb}") - print(fastq_dir) - concatenate_fastq(fastq_dir, dst) + pass_dir = ospath.join(root, "fastq_pass") + break + for root, dirs, files in os.walk(pass_dir): + if f"barcode{sample_nb}" in dirs: + fastq_dir = ospath.join(root, f"barcode{sample_nb}") + elif f"barcode0{sample_nb}" in dirs: + fastq_dir = ospath.join(root, f"barcode0{sample_nb}") + concatenate_fastq(fastq_dir, dst) #---------------------------------- #GET AND VISUALIZE QUALITY SCORES #---------------------------------- -def getContentFile(pathFastqFile: str): +def getContentFile(pathFastqFile): + """ + Open file and get all the reads. + + Args: + pathFastqFile(str): fastq file to read + Returns: + allReadsAsSring(str): all fastq faile reads + """ # open file and get all the reads allReadsAsString = [] @@ -151,7 +159,15 @@ def getContentFile(pathFastqFile: str): def extractReadQuality(fileContent): + """ + Extract the quality of the file reads. Note that the reads are concatenated in one list. + + Args: + fileContent(str): fastq file content as a string + Returns: + qualityOfReads(list of str): quality scores as a string list + """ # the fileContent must contain multiple reads, where each read is 4 lines: if not (len(fileContent) % 4 == 0): raise ValueError("fastq file must have reads composed of 4 lines") @@ -172,7 +188,15 @@ def extractReadQuality(fileContent): def convertQuality(allReadsQuality): + """ + Converts the qualities from fastq format to int. + Args: + allReadsQuality(list of str): quality scores in the original format + + Returns: + qualityOfReads(list of int): quality scores converted in int + """ # note that here we can't distinguish the reads from each other anymore # they are all in one list convertedQualities = [] @@ -190,7 +214,16 @@ def convertQuality(allReadsQuality): def visualiseQualities(pathFastqFile, readQualityConverted): + """ + Creates a graph representing the frequency of each score value. + + Args: + pathFastqFile(str): path of the fastq file in directory + readQualityConverted(list of int): quality scores converted in int + Returns: + None + """ plt.figure(figsize=(10, 6)) plt.hist(readQualityConverted, bins=range(0, 95)) @@ -214,22 +247,28 @@ def visualiseQualities(pathFastqFile, readQualityConverted): def create_random_sequence(length=500, seed=None): """ Generate a random sequence. - :param length: length of the sequence generated [int] - :param seed: random seed [int] - :return: random sequence [str] + Args: + length(int): length of the sequence generated + seed(int): random seed + Returns: + random sequence(str) """ if seed is not None: random.seed(seed) return "".join(["ATGC"[random.randint(0,3)] for i in range(length)]) def damage_sequence(sequence, mutation_rate=0.05, deletion_rate=0.05, insertion_rate=0.05): """ - Damage a sequence randomly with mutation (substitution), deletion and insertion - Warning: to avoid infinite loop, don't set the insertion_rate to 1.0 (or near) - :param sequence: original sequence to damage [str] - :param mutation_rate: mutation rate (between 0.0 and 1.0) [float] - :param deletion_rate: deletion_rate (between 0.0 and 1.0) [float] - :param insertion_rate: insertion_rate (between 0.0 and 1.0) [float] - :return: damaged sequence [str] + Damage a sequence randomly with mutation (substitution), deletion and insertion. + Warning: to avoid infinite loop, don't set the insertion_rate to 1.0 (or near). + + Args: + sequence(str): original sequence to damage + mutation_rate(float): mutation rate (between 0.0 and 1.0) + deletion_rate(float): deletion_rate (between 0.0 and 1.0) + insertion_rate(float): insertion_rate (between 0.0 and 1.0) + + Returns: + sequence(str): damaged sequence """ if mutation_rate < 0 or mutation_rate > 1: raise Exception("[damage_sequence] mutation_rate is incorrect (must be between 0.0 and 1.0)") diff --git a/rbcl_Qiagen_tomato_5000.fastq.png b/rbcl_Qiagen_tomato_5000.fastq.png new file mode 100644 index 0000000..7c45ef9 Binary files /dev/null and b/rbcl_Qiagen_tomato_5000.fastq.png differ diff --git a/test_material/rbcL_Qiagen_tomato_5000_test_file.fastq b/test_material/rbcL_Qiagen_tomato_5000_test_file.fastq new file mode 100644 index 0000000..c22edb2 --- /dev/null +++ b/test_material/rbcL_Qiagen_tomato_5000_test_file.fastq @@ -0,0 +1,8 @@ +@632f5334-2c26-4f91-a611-bbcc85d83ea1 runid=84dc383f16e4874203b6120bf223ea75d3815a54 read=14 ch=417 start_time=2021-11-26T13:56:48Z flow_cell_id=FAR79813 protocol_group_id=rbcL_Qiagen_tomato sample_id=no_sample +GTATGCTTCGTTCATTCAAATTTGGGTGTTTAGCAATTTTCGCATTTATCGTGAAACGCTTTCGCGTTTTTGGATCGGCAGGTGTTTAACCGTTTTCGCATTTATCGTGGCTTAACACCAGCGTTTTTCATTGTAACTTAAAATTATATAAGAGAAGAAGAATCTTTGATTTTTTTTTTTGAAAAAGGTAACCGAGCTTCTTTAGTAATAAGACTATTCAAATTACAATATTCGTGGAAAATCGTAATAAATATTGAAGGCATCTTTTAATAGCGAAGTTTGAACAAAATTTCCAA ++ +$$%&$$%00*268%%)($$#####%++]<;7-$$#$#$257166>>8;7820.(''''')668.0+,-0.,(()''(,,++55]<<*')10024=//.*,*(),/)+//00347%&'%#**&%()'&'(((()'***+*33373356*,+')),0,)& +@c4124d21-64a3-4b8e-ac5a-771ea99ff56f runid=84dc383f16e4874203b6120bf223ea75d3815a54 read=704 ch=203 start_time=2021-11-26T14:00:32Z flow_cell_id=FAR79813 protocol_group_id=rbcL_Qiagen_tomato sample_id=no_sample +GTTGTACTTCGTTCCAGTTATCAGATGTTGGGTGTTTAGCCGTTTTCGCATTTATCATTGAAACAACCGCGTTTTCGTGCGCCGCTTCACCTACAATGGAAGTAAACATATTGGTAACAGAACCTTTATGTAAAGGTCTAAAGGAGTGGCTACATAAGCAATATATTGATCTTTTCTCCAACAACGCGCTCGATGCGGTAGCATCGCACCTTTGTAACGATCAAGACTGGTAGAGTCCATCGGTCCATACAATTGTCCATGTACCAGTAGAAGATTCGGCTGCGGCCCCTGCTACTGGGTGGAACTCCAGGTTGAGGTTACTCGGAATGCTAATATATCAGTATCCTTGGTTTGGTACTCAGGAGTATAATAAGTCAATTTGTACTCTTTAACACCAGCTTTGAATCAACACTTTAGTCTCTG ++ +%84695532.,-.:+&''+'&(%'('&%$&&..]2233-,-58>DB]479;>9899)))..-)((-0'(7879=5;8?A>:7@67=<663/.-3-3'.5=;2++;9:;99&&()'%*%%+++,,/>@9?@HABA334+)*&)+-.0/25:750**)(()23581100/0-/44=8]2@3333101334214778743.+'(%%.45''':6:.,344095-+,*)())()(&&)--,.**,+.&%/7@>;2012441+*+))68123?=660-=&56')&522688<]1+ \ No newline at end of file diff --git a/test_utils.py b/test_utils.py index e69de29..8b85ef7 100644 --- a/test_utils.py +++ b/test_utils.py @@ -0,0 +1,47 @@ +import process_fastq +from Bio import SeqIO, Align + +#In order to run the tests, enter in terminal : pytest test_utils.py +#Warning : the 3 last tests have an extremely small chance to fail due to functions based on randomness, running the tests several times can confirm if it is the reason or not + +def test_read_fastq(): + filePath = "test_material/rbcL_Qiagen_tomato_5000_test_file.fastq" + reads = process_fastq.read_fastq(filePath) + assert len(reads) == 2 + assert str(reads[0].seq) == "GTATGCTTCGTTCATTCAAATTTGGGTGTTTAGCAATTTTCGCATTTATCGTGAAACGCTTTCGCGTTTTTGGATCGGCAGGTGTTTAACCGTTTTCGCATTTATCGTGGCTTAACACCAGCGTTTTTCATTGTAACTTAAAATTATATAAGAGAAGAAGAATCTTTGATTTTTTTTTTTGAAAAAGGTAACCGAGCTTCTTTAGTAATAAGACTATTCAAATTACAATATTCGTGGAAAATCGTAATAAATATTGAAGGCATCTTTTAATAGCGAAGTTTGAACAAAATTTCCAA" + assert str(reads[1].seq) == "GTTGTACTTCGTTCCAGTTATCAGATGTTGGGTGTTTAGCCGTTTTCGCATTTATCATTGAAACAACCGCGTTTTCGTGCGCCGCTTCACCTACAATGGAAGTAAACATATTGGTAACAGAACCTTTATGTAAAGGTCTAAAGGAGTGGCTACATAAGCAATATATTGATCTTTTCTCCAACAACGCGCTCGATGCGGTAGCATCGCACCTTTGTAACGATCAAGACTGGTAGAGTCCATCGGTCCATACAATTGTCCATGTACCAGTAGAAGATTCGGCTGCGGCCCCTGCTACTGGGTGGAACTCCAGGTTGAGGTTACTCGGAATGCTAATATATCAGTATCCTTGGTTTGGTACTCAGGAGTATAATAAGTCAATTTGTACTCTTTAACACCAGCTTTGAATCAACACTTTAGTCTCTG" + +def test_split_fastq(): + inputPath="test_material/rbcL_Qiagen_tomato_5000_test_file.fastq" + outputDir="test_material" + output_name="outputTest" + assert process_fastq.split_fastq(inputPath,outputDir,output_name) == ('test_material\\outputTest_top20.fastq', 'test_material\\outputTest_remaining80.fastq') + +def test_quality_scores(): + filePath = "test_material/rbcL_Qiagen_tomato_5000_test_file.fastq" + reads = process_fastq.getContentFile(filePath) + assert reads==['@632f5334-2c26-4f91-a611-bbcc85d83ea1 runid=84dc383f16e4874203b6120bf223ea75d3815a54 read=14 ch=417 start_time=2021-11-26T13:56:48Z flow_cell_id=FAR79813 protocol_group_id=rbcL_Qiagen_tomato sample_id=no_sample\n', 'GTATGCTTCGTTCATTCAAATTTGGGTGTTTAGCAATTTTCGCATTTATCGTGAAACGCTTTCGCGTTTTTGGATCGGCAGGTGTTTAACCGTTTTCGCATTTATCGTGGCTTAACACCAGCGTTTTTCATTGTAACTTAAAATTATATAAGAGAAGAAGAATCTTTGATTTTTTTTTTTGAAAAAGGTAACCGAGCTTCTTTAGTAATAAGACTATTCAAATTACAATATTCGTGGAAAATCGTAATAAATATTGAAGGCATCTTTTAATAGCGAAGTTTGAACAAAATTTCCAA\n', '+\n', "$$%&$$%00*268%%)($$#####%++]<;7-$$#$#$257166>>8;7820.(''''')668.0+,-0.,(()''(,,++55]<<*')10024=//.*,*(),/)+//00347%&'%#**&%()'&'(((()'***+*33373356*,+')),0,)&\n", '@c4124d21-64a3-4b8e-ac5a-771ea99ff56f runid=84dc383f16e4874203b6120bf223ea75d3815a54 read=704 ch=203 start_time=2021-11-26T14:00:32Z flow_cell_id=FAR79813 protocol_group_id=rbcL_Qiagen_tomato sample_id=no_sample\n', 'GTTGTACTTCGTTCCAGTTATCAGATGTTGGGTGTTTAGCCGTTTTCGCATTTATCATTGAAACAACCGCGTTTTCGTGCGCCGCTTCACCTACAATGGAAGTAAACATATTGGTAACAGAACCTTTATGTAAAGGTCTAAAGGAGTGGCTACATAAGCAATATATTGATCTTTTCTCCAACAACGCGCTCGATGCGGTAGCATCGCACCTTTGTAACGATCAAGACTGGTAGAGTCCATCGGTCCATACAATTGTCCATGTACCAGTAGAAGATTCGGCTGCGGCCCCTGCTACTGGGTGGAACTCCAGGTTGAGGTTACTCGGAATGCTAATATATCAGTATCCTTGGTTTGGTACTCAGGAGTATAATAAGTCAATTTGTACTCTTTAACACCAGCTTTGAATCAACACTTTAGTCTCTG\n', '+\n', "%84695532.,-.:+&''+'&(%'('&%$&&..]2233-,-58>DB]479;>9899)))..-)((-0'(7879=5;8?A>:7@67=<663/.-3-3'.5=;2++;9:;99&&()'%*%%+++,,/>@9?@HABA334+)*&)+-.0/25:750**)(()23581100/0-/44=8]2@3333101334214778743.+'(%%.45''':6:.,344095-+,*)())()(&&)--,.**,+.&%/7@>;2012441+*+))68123?=660-=&56')&522688<]1+"] + quality = process_fastq.extractReadQuality(reads) + assert quality==["$$%&$$%00*268%%)($$#####%++]<;7-$$#$#$257166>>8;7820.(''''')668.0+,-0.,(()''(,,++55]<<*')10024=//.*,*(),/)+//00347%&'%#**&%()'&'(((()'***+*33373356*,+')),0,)&", "%84695532.,-.:+&''+'&(%'('&%$&&..]2233-,-58>DB]479;>9899)))..-)((-0'(7879=5;8?A>:7@67=<663/.-3-3'.5=;2++;9:;99&&()'%*%%+++,,/>@9?@HABA334+)*&)+-.0/25:750**)(()23581100/0-/44=8]2@3333101334214778743.+'(%%.45''':6:.,344095-+,*)())()(&&)--,.**,+.&%/7@>;2012441+*+))68123?=660-=&56')&522688<]1"] + converted_quality = process_fastq.convertQuality(quality) + assert converted_quality==[3, 3, 4, 5, 3, 3, 4, 15, 15, 9, 17, 21, 23, 4, 4, 8, 7, 3, 3, 2, 2, 2, 2, 2, 4, 10, 10, 60, 27, 26, 22, 12, 3, 3, 2, 3, 2, 3, 17, 20, 22, 16, 21, 21, 29, 29, 23, 26, 22, 23, 17, 15, 13, 7, 6, 6, 6, 6, 6, 8, 21, 21, 23, 13, 15, 10, 11, 12, 15, 13, 11, 7, 7, 8, 6, 6, 7, 11, 11, 10, 10, 20, 20, 60, 27, 27, 9, 6, 8, 16, 15, 15, 17, 19, 27, 14, 23, 5, 13, 19, 22, 24, 26, 15, 15, 15, 17, 18, 8, 7, 7, 8, 8, 5, 4, 4, 3, 4, 2, 3, 2, 4, 6, 15, 17, 52, 37, 37, 28, 8, 7, 7, 5, 3, 2, 7, 11, 11, 9, 4, 5, 17, 15, 20, 22, 22, 34, 60, 60, 14, 12, 13, 60, 60, 12, 24, 22, 15, 11, 10, 15, 20, 9, 10, 7, 13, 13, 16, 15, 19, 24, 26, 25, 24, 23, 20, 18, 19, 20, 18, 10, 9, 14, 13, 13, 12, 13, 9, 5, 9, 4, 4, 5, 4, 6, 14, 9, 22, 30, 27, 24, 28, 26, 5, 5, 5, 9, 14, 16, 15, 12, 13, 16, 14, 16, 25, 30, 32, 25, 24, 25, 22, 16, 15, 15, 23, 25, 25, 33, 60, 60, 29, 28, 14, 14, 13, 9, 11, 9, 7, 8, 11, 14, 8, 10, 14, 14, 15, 15, 18, 19, 22, 4, 5, 6, 4, 2, 9, 9, 5, 4, 7, 8, 6, 5, 6, 7, 7, 7, 7, 8, 6, 9, 9, 9, 10, 9, 18, 18, 18, 22, 18, 18, 20, 21, 9, 11, 10, 6, 8, 8, 11, 15, 11, 8, 5, 4, 23, 19, 21, 24, 20, 20, 18, 17, 13, 11, 12, 13, 25, 10, 5, 6, 6, 10, 6, 5, 7, 4, 6, 7, 6, 5, 4, 3, 5, 5, 13, 13, 60, 17, 17, 18, 18, 12, 11, 12, 20, 23, 29, 35, 33, 60, 19, 22, 24, 26, 29, 24, 23, 24, 24, 8, 8, 8, 13, 13, 12, 8, 7, 7, 12, 15, 6, 7, 22, 23, 22, 24, 27, 30, 27, 11, 10, 6, 8, 60, 21, 22, 22, 21, 22, 28, 25, 14, 13, 14, 14, 12, 13, 14, 17, 17, 19, 18, 21, 18, 17, 17, 17, 16, 14, 6, 5, 6, 11, 13, 11, 5, 5, 7, 7, 4, 5, 6, 7, 8, 13, 11, 10, 6, 8, 6, 3, 6, 3, 2, 2, 11, 15, 19, 19, 18, 19, 12, 21, 17, 15, 21, 15, 12, 7, 6, 5, 3, 3, 7, 8, 8, 10, 22, 25, 22, 24, 21, 22, 27, 43, 60, 60, 7, 9, 7, 29, 28, 20, 26, 23, 30, 32, 29, 25, 22, 31, 21, 22, 28, 27, 21, 21, 18, 14, 13, 12, 18, 12, 18, 6, 13, 20, 28, 26, 17, 10, 10, 26, 24, 25, 26, 24, 24, 5, 5, 7, 8, 6, 4, 9, 4, 4, 10, 10, 10, 11, 11, 14, 29, 31, 24, 30, 31, 39, 32, 33, 32, 18, 18, 19, 10, 8, 9, 5, 8, 10, 12, 13, 15, 14, 17, 20, 25, 22, 20, 15, 9, 9, 8, 7, 7, 8, 17, 18, 20, 23, 16, 16, 15, 15, 14, 15, 12, 14, 19, 19, 28, 23, 60, 17, 31, 18, 18, 18, 18, 16, 15, 16, 18, 18, 19, 17, 16, 19, 22, 22, 23, 22, 19, 18, 13, 10, 6, 7, 4, 4, 13, 19, 20, 27, 35, 27, 23, 25, 20, 14, 9, 21, 26, 24, 24, 19, 17, 9, 21, 15, 21, 17, 15, 21, 24, 7, 20, 23, 22, 21, 17, 17, 17, 21, 12, 17, 14, 60, 60, 28, 29, 6, 6, 6, 25, 21, 25, 13, 11, 18, 19, 19, 15, 24, 20, 12, 10, 11, 9, 8, 7, 8, 8, 7, 8, 7, 5, 5, 8, 12, 12, 11, 13, 9, 9, 11, 10, 13, 5, 4, 14, 22, 31, 29, 26, 17, 15, 16, 17, 19, 19, 16, 10, 9, 10, 8, 8, 21, 23, 16, 17, 18, 30, 28, 21, 21, 15, 12, 28, 5, 20, 21, 6, 8, 5, 20, 17, 17, 21, 23, 23, 27, 60, 16] + +def test_create_random_sequence(): + seed=1 + random_seq = process_fastq.create_random_sequence(50,seed) + assert random_seq == "TAGACCCCTACACCACGTAGAAAACTCATCCTGTTCGACATGAGCTGGCC" + +def test_mutation(): + sequence = "ATCG" + mutated_sequence = process_fastq.damage_sequence(sequence, mutation_rate=1.0) + assert mutated_sequence != sequence + +def test_deletion(): + sequence = "ATCG" + deleted_sequence = process_fastq.damage_sequence(sequence, deletion_rate=1.0) + assert deleted_sequence != sequence + +def test_insertion(): + sequence = "ATCG" + inserted_sequence = process_fastq.damage_sequence(sequence, insertion_rate=0.98) + assert inserted_sequence != sequence \ No newline at end of file