diff --git a/massql/msql_cmd.py b/massql/msql_cmd.py index df7f59e..b409c4d 100644 --- a/massql/msql_cmd.py +++ b/massql/msql_cmd.py @@ -16,14 +16,14 @@ def main(): parser = argparse.ArgumentParser(description="MSQL CMD") - parser.add_argument('filename', help='Input filename') - parser.add_argument('query', help='Input Query') + parser.add_argument('filename', nargs='+', help='Input filename') + parser.add_argument('--query', default="QUERY scaninfo(MS2DATA)", help='Input Query') parser.add_argument('--output_file', default=None, help='output results filename') parser.add_argument('--parallel_query', default="NO", help='YES to make it parallel with ray locally, NO is default') parser.add_argument('--cache', default="YES", help='YES to cache with feather, YES is the default') parser.add_argument('--original_path', default=None, help='Original absolute path for the filename, useful in proteosafe') - parser.add_argument('--extract_mzML', default=None, help='Extracting spectra found as mzML file') parser.add_argument('--extract_json', default=None, help='Extracting spectra found as json file') + parser.add_argument('--extract_mzML', default=None, help='Extracting spectra found as mzML file, not implemented yet') args = parser.parse_args() @@ -45,14 +45,16 @@ def main(): # Executing all_results_list = [] - for i, query in enumerate(all_queries): - results_df = msql_engine.process_query(query, - args.filename, - cache=(args.cache == "YES"), - parallel=PARALLEL) + for filename in args.filename: + for i, query in enumerate(all_queries): + results_df = msql_engine.process_query(query, + filename, + cache=(args.cache == "YES"), + parallel=PARALLEL) + results_df["filename"] = os.path.basename(filename) - results_df["query_index"] = i - all_results_list.append(results_df) + results_df["query_index"] = i + all_results_list.append(results_df) # Merging results_df = pd.concat(all_results_list) @@ -70,7 +72,6 @@ def main(): pass if args.output_file and len(results_df) > 0: - results_df["filename"] = os.path.basename(args.filename) if args.original_path is not None: useful_filename = args.original_path @@ -113,13 +114,13 @@ def main(): else: results_df.to_csv(args.output_file, index=False, columns=columns) - # Extracting - if args.extract_json is not None and len(results_df) > 0: - print("Extracting {} spectra".format(len(results_df))) - try: - msql_extract._extract_spectra(results_df, os.path.dirname(args.filename), output_json_filename=args.extract_json) - except: - print("Extraction Failed") + # Extracting + if args.extract_json is not None and len(results_df) > 0: + print("Extracting {} spectra".format(len(results_df))) + try: + msql_extract._extract_spectra(results_df, os.path.dirname(args.filename[0]), output_json_filename=args.extract_json) + except: + print("Extraction Failed") if __name__ == "__main__": diff --git a/massql/msql_extract.py b/massql/msql_extract.py index 0675e9b..23ed282 100644 --- a/massql/msql_extract.py +++ b/massql/msql_extract.py @@ -160,6 +160,34 @@ def _extract_mgf_scan(input_filename, spectrum_identifier_list): return output_list +def _extract_parquet_scan(input_filename, spectrum_identifier_list): + merged_df = pd.read_parquet(input_filename) + + all_spectrum_identifier_set = set(spectrum_identifier_list) + + filtered_df = merged_df[merged_df["scan"].isin(all_spectrum_identifier_set)] + grouped_df = filtered_df.groupby("scan") + + output_list = [] + for group, scan_df in grouped_df: + mz = scan_df["mz"].values + intensity = scan_df["i"].values + + peaks_list = list(zip(mz, intensity)) + peaks_list = [[float(peak[0]), float(peak[1])] for peak in peaks_list] + + spectrum_obj = {} + spectrum_obj["peaks"] = peaks_list + spectrum_obj["mslevel"] = int(scan_df["mslevel"].iloc[0]) + spectrum_obj["scan"] = str(scan_df["scan"].iloc[0]) + + if spectrum_obj["mslevel"] > 1: + spectrum_obj["precursor_mz"] = float(scan_df["precmz"].iloc[0]) + + output_list.append(spectrum_obj) + + return output_list + def _extract_spectra(results_df, input_spectra_folder, output_mgf_filename=None, @@ -184,12 +212,14 @@ def _extract_spectra(results_df, input_spectra_folder, input_spectra_filename = os.path.join(input_spectra_folder, results_by_file_df["filename"].iloc[0]) spectrum_obj_list = [] - if ".mzML" in input_spectra_filename: + if input_spectra_filename.endswith(".mzML"): spectrum_obj_list = _extract_mzML_scan(input_spectra_filename, list(set(results_by_file_df["scan"]))) - if ".mzXML" in input_spectra_filename: + if input_spectra_filename.endswith(".mzXML"): spectrum_obj_list = _extract_mzXML_scan(input_spectra_filename, list(set(results_by_file_df["scan"]))) - if ".mgf" in input_spectra_filename: + if input_spectra_filename.endswith(".mgf"): spectrum_obj_list = _extract_mgf_scan(input_spectra_filename, list(set(results_by_file_df["scan"]))) + if input_spectra_filename.endswith(".parquet"): + spectrum_obj_list = _extract_parquet_scan(input_spectra_filename, list(set(results_by_file_df["scan"]))) for spectrum_obj in spectrum_obj_list: diff --git a/massql/msql_fileloading.py b/massql/msql_fileloading.py index fd27f71..343e8a7 100644 --- a/massql/msql_fileloading.py +++ b/massql/msql_fileloading.py @@ -22,19 +22,13 @@ def load_data(input_filename, cache=False): Returns: [type]: [description] """ + cache_filename = input_filename + ".msql.parquet" if cache: - ms1_filename = input_filename + "_ms1.msql.feather" - ms2_filename = input_filename + "_ms2.msql.feather" + if os.path.exists(cache_filename): + cache_df = pd.read_parquet(cache_filename) - if os.path.exists(ms1_filename) or os.path.exists(ms2_filename): - try: - ms1_df = pd.read_feather(ms1_filename) - except: - ms1_df = pd.DataFrame() - try: - ms2_df = pd.read_feather(ms2_filename) - except: - ms2_df = pd.DataFrame() + ms1_df = cache_df[cache_df["mslevel"] == 1] + ms2_df = cache_df[cache_df["mslevel"] == 2] return ms1_df, ms2_df @@ -55,27 +49,22 @@ def load_data(input_filename, cache=False): elif input_filename[-4:].lower() == ".txt": ms1_df, ms2_df = _load_data_txt(input_filename) - + elif input_filename.lower().endswith("parquet"): + merged_df = pd.read_parquet(input_filename) + ms1_df = merged_df[merged_df["mslevel"] == 1] + ms2_df = merged_df[merged_df["mslevel"] == 2] + cache = False else: print("Cannot Load File Extension") raise Exception("File Format Not Supported") - # Saving Cache if cache: - ms1_filename = input_filename + "_ms1.msql.feather" - ms2_filename = input_filename + "_ms2.msql.feather" - - if not (os.path.exists(ms1_filename) or os.path.exists(ms2_filename)): - try: - ms1_df.to_feather(ms1_filename) - except: - pass - - try: - ms2_df.to_feather(ms2_filename) - except: - pass + ms1_df["mslevel"] = 1 + ms2_df["mslevel"] = 2 + + cache_df = pd.concat([ms1_df, ms2_df], axis=0) + cache_df.to_parquet(cache_filename) return ms1_df, ms2_df @@ -643,6 +632,4 @@ def _load_data_txt(input_filename): ms1_df['rt'] = 0 ms1_df['polarity'] = "Positive" - print(ms1_df) - return ms1_df, pd.DataFrame() diff --git a/workflow/Makefile b/workflow/Makefile index 90164ee..166eca7 100644 --- a/workflow/Makefile +++ b/workflow/Makefile @@ -1,3 +1,5 @@ +# Workflow Tests + run_test: nextflow run workflow.nf -c test.config @@ -12,6 +14,10 @@ run_test_iron: nextflow run workflow.nf -c test.config --input_spectra="./test/isa_9_fe.mzML" --query="QUERY scaninfo(MS1DATA) WHERE MS1MZ=X-2:INTENSITYMATCH=Y*0.063:INTENSITYMATCHPERCENT=25 AND MS1MZ=X:INTENSITYMATCH=Y:INTENSITYMATCHREFERENCE:INTENSITYPERCENT=5 AND MS2PREC=X" --resume +# Caching Tests +run_caching_test: + nextflow run workflow_caching.nf -c test.config + cluster_test: nextflow run workflow.nf -c cluster.config \ --input_spectra="/data/massive/MSV000082622/ccms_peak/**.mzML" \ diff --git a/workflow/workflow.nf b/workflow/workflow.nf index 980280f..196b42e 100644 --- a/workflow/workflow.nf +++ b/workflow/workflow.nf @@ -37,7 +37,7 @@ if(params.parallel_files == "YES"){ """ $params.PYTHONRUNTIME $TOOL_FOLDER/msql_cmd.py \ "$input_spectrum" \ - "${params.query}" \ + --query "${params.query}" \ --output_file "${mangled_output_filename}_output.tsv" \ --parallel_query $params.parallel_query \ --cache NO \ @@ -67,7 +67,7 @@ else{ """ $params.PYTHONRUNTIME $TOOL_FOLDER/msql_cmd.py \ "$input_spectrum" \ - "${params.query}" \ + --query "${params.query}" \ --output_file "${mangled_output_filename}_output.tsv" \ --parallel_query $params.parallel_query \ --cache NO \ diff --git a/workflow/workflow_caching.nf b/workflow/workflow_caching.nf new file mode 100644 index 0000000..f25435a --- /dev/null +++ b/workflow/workflow_caching.nf @@ -0,0 +1,34 @@ +#!/usr/bin/env nextflow + +params.input_spectra = 'data/GNPS00002_A3_p.mzML' + +_spectra_ch = Channel.fromPath( params.input_spectra ) +_spectra_ch.into{_spectra_ch1;_spectra_ch2} + +_spectra_ch3 = _spectra_ch1.map { file -> tuple(file, file.toString().replaceAll("/", "_").replaceAll(" ", "_"), file) } + +TOOL_FOLDER = "$baseDir/bin" +params.publishdir = "nf_output" +params.PYTHONRUNTIME = "python" // this is a hack because CCMS cluster does not have python installed + +process cacheData { + errorStrategy 'ignore' + time '4h' + //maxRetries 3 + //memory { 6.GB * task.attempt } + memory { 12.GB } + + publishDir "$params.publishdir/msql_cache", mode: 'copy' + + input: + set val(filepath), val(mangled_output_filename), file(input_spectrum) from _spectra_ch3 + + output: + file "*parquet" + + """ + $params.PYTHONRUNTIME $TOOL_FOLDER/msql_cmd.py \ + "$input_spectrum" + mv ${input_spectrum}.msql.parquet ${mangled_output_filename}.msql.parquet + """ +} \ No newline at end of file