From 170995e5402377c96e5432deb22fc4fe5c269bf5 Mon Sep 17 00:00:00 2001 From: Mingxun Wang Date: Tue, 7 Sep 2021 13:49:48 -0700 Subject: [PATCH 1/6] updating export --- massql/msql_cmd.py | 21 +++++++++++---------- massql/msql_fileloading.py | 35 ++++++++++------------------------- 2 files changed, 21 insertions(+), 35 deletions(-) diff --git a/massql/msql_cmd.py b/massql/msql_cmd.py index df7f59e..99420a8 100644 --- a/massql/msql_cmd.py +++ b/massql/msql_cmd.py @@ -16,8 +16,8 @@ def main(): parser = argparse.ArgumentParser(description="MSQL CMD") - parser.add_argument('filename', help='Input filename') - parser.add_argument('query', help='Input Query') + parser.add_argument('filename', nargs='+', help='Input filename') + parser.add_argument('--query', default="QUERY scaninfo(MS2DATA)", help='Input Query') parser.add_argument('--output_file', default=None, help='output results filename') parser.add_argument('--parallel_query', default="NO", help='YES to make it parallel with ray locally, NO is default') parser.add_argument('--cache', default="YES", help='YES to cache with feather, YES is the default') @@ -45,14 +45,16 @@ def main(): # Executing all_results_list = [] - for i, query in enumerate(all_queries): - results_df = msql_engine.process_query(query, - args.filename, - cache=(args.cache == "YES"), - parallel=PARALLEL) + for filename in args.filename: + for i, query in enumerate(all_queries): + results_df = msql_engine.process_query(query, + filename, + cache=(args.cache == "YES"), + parallel=PARALLEL) + results_df["filename"] = os.path.basename(filename) - results_df["query_index"] = i - all_results_list.append(results_df) + results_df["query_index"] = i + all_results_list.append(results_df) # Merging results_df = pd.concat(all_results_list) @@ -70,7 +72,6 @@ def main(): pass if args.output_file and len(results_df) > 0: - results_df["filename"] = os.path.basename(args.filename) if args.original_path is not None: useful_filename = args.original_path diff --git a/massql/msql_fileloading.py b/massql/msql_fileloading.py index fd27f71..5700439 100644 --- a/massql/msql_fileloading.py +++ b/massql/msql_fileloading.py @@ -22,19 +22,13 @@ def load_data(input_filename, cache=False): Returns: [type]: [description] """ + cache_filename = input_filename + ".msql.parquet" if cache: - ms1_filename = input_filename + "_ms1.msql.feather" - ms2_filename = input_filename + "_ms2.msql.feather" + if os.path.exists(cache_filename): + cache_df = pd.read_parquet(cache_filename) - if os.path.exists(ms1_filename) or os.path.exists(ms2_filename): - try: - ms1_df = pd.read_feather(ms1_filename) - except: - ms1_df = pd.DataFrame() - try: - ms2_df = pd.read_feather(ms2_filename) - except: - ms2_df = pd.DataFrame() + ms1_df = cache_df[cache_df["mslevel"] == 1] + ms2_df = cache_df[cache_df["mslevel"] == 2] return ms1_df, ms2_df @@ -60,22 +54,13 @@ def load_data(input_filename, cache=False): print("Cannot Load File Extension") raise Exception("File Format Not Supported") - # Saving Cache if cache: - ms1_filename = input_filename + "_ms1.msql.feather" - ms2_filename = input_filename + "_ms2.msql.feather" - - if not (os.path.exists(ms1_filename) or os.path.exists(ms2_filename)): - try: - ms1_df.to_feather(ms1_filename) - except: - pass - - try: - ms2_df.to_feather(ms2_filename) - except: - pass + ms1_df["mslevel"] = 1 + ms2_df["mslevel"] = 2 + + cache_df = pd.concat([ms1_df, ms2_df], axis=0) + cache_df.to_parquet(cache_filename) return ms1_df, ms2_df From 790837d8c895d9570912054cede5af025a74990a Mon Sep 17 00:00:00 2001 From: Mingxun Wang Date: Tue, 7 Sep 2021 13:51:33 -0700 Subject: [PATCH 2/6] updating workflow --- workflow/workflow.nf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workflow/workflow.nf b/workflow/workflow.nf index 980280f..196b42e 100644 --- a/workflow/workflow.nf +++ b/workflow/workflow.nf @@ -37,7 +37,7 @@ if(params.parallel_files == "YES"){ """ $params.PYTHONRUNTIME $TOOL_FOLDER/msql_cmd.py \ "$input_spectrum" \ - "${params.query}" \ + --query "${params.query}" \ --output_file "${mangled_output_filename}_output.tsv" \ --parallel_query $params.parallel_query \ --cache NO \ @@ -67,7 +67,7 @@ else{ """ $params.PYTHONRUNTIME $TOOL_FOLDER/msql_cmd.py \ "$input_spectrum" \ - "${params.query}" \ + --query "${params.query}" \ --output_file "${mangled_output_filename}_output.tsv" \ --parallel_query $params.parallel_query \ --cache NO \ From d58369bfc38fe530311a84babcfe922eaf676e47 Mon Sep 17 00:00:00 2001 From: Mingxun Wang Date: Tue, 7 Sep 2021 13:55:03 -0700 Subject: [PATCH 3/6] loading quickly --- massql/msql_fileloading.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/massql/msql_fileloading.py b/massql/msql_fileloading.py index 5700439..0726742 100644 --- a/massql/msql_fileloading.py +++ b/massql/msql_fileloading.py @@ -49,7 +49,11 @@ def load_data(input_filename, cache=False): elif input_filename[-4:].lower() == ".txt": ms1_df, ms2_df = _load_data_txt(input_filename) - + elif input_filename.lower().endswith("parquet"): + merged_df = pd.read_parquet(input_filename) + ms1_df = merged_df[merged_df["mslevel"] == 1] + ms2_df = merged_df[merged_df["mslevel"] == 2] + cache = False else: print("Cannot Load File Extension") raise Exception("File Format Not Supported") From 8e135b0917f70d2cbdecc3ffc95e40c6332e795c Mon Sep 17 00:00:00 2001 From: Mingxun Wang Date: Tue, 7 Sep 2021 14:13:51 -0700 Subject: [PATCH 4/6] parquet version of extraction --- massql/msql_cmd.py | 16 ++++++++-------- massql/msql_extract.py | 36 +++++++++++++++++++++++++++++++++--- 2 files changed, 41 insertions(+), 11 deletions(-) diff --git a/massql/msql_cmd.py b/massql/msql_cmd.py index 99420a8..b409c4d 100644 --- a/massql/msql_cmd.py +++ b/massql/msql_cmd.py @@ -22,8 +22,8 @@ def main(): parser.add_argument('--parallel_query', default="NO", help='YES to make it parallel with ray locally, NO is default') parser.add_argument('--cache', default="YES", help='YES to cache with feather, YES is the default') parser.add_argument('--original_path', default=None, help='Original absolute path for the filename, useful in proteosafe') - parser.add_argument('--extract_mzML', default=None, help='Extracting spectra found as mzML file') parser.add_argument('--extract_json', default=None, help='Extracting spectra found as json file') + parser.add_argument('--extract_mzML', default=None, help='Extracting spectra found as mzML file, not implemented yet') args = parser.parse_args() @@ -114,13 +114,13 @@ def main(): else: results_df.to_csv(args.output_file, index=False, columns=columns) - # Extracting - if args.extract_json is not None and len(results_df) > 0: - print("Extracting {} spectra".format(len(results_df))) - try: - msql_extract._extract_spectra(results_df, os.path.dirname(args.filename), output_json_filename=args.extract_json) - except: - print("Extraction Failed") + # Extracting + if args.extract_json is not None and len(results_df) > 0: + print("Extracting {} spectra".format(len(results_df))) + try: + msql_extract._extract_spectra(results_df, os.path.dirname(args.filename[0]), output_json_filename=args.extract_json) + except: + print("Extraction Failed") if __name__ == "__main__": diff --git a/massql/msql_extract.py b/massql/msql_extract.py index 0675e9b..23ed282 100644 --- a/massql/msql_extract.py +++ b/massql/msql_extract.py @@ -160,6 +160,34 @@ def _extract_mgf_scan(input_filename, spectrum_identifier_list): return output_list +def _extract_parquet_scan(input_filename, spectrum_identifier_list): + merged_df = pd.read_parquet(input_filename) + + all_spectrum_identifier_set = set(spectrum_identifier_list) + + filtered_df = merged_df[merged_df["scan"].isin(all_spectrum_identifier_set)] + grouped_df = filtered_df.groupby("scan") + + output_list = [] + for group, scan_df in grouped_df: + mz = scan_df["mz"].values + intensity = scan_df["i"].values + + peaks_list = list(zip(mz, intensity)) + peaks_list = [[float(peak[0]), float(peak[1])] for peak in peaks_list] + + spectrum_obj = {} + spectrum_obj["peaks"] = peaks_list + spectrum_obj["mslevel"] = int(scan_df["mslevel"].iloc[0]) + spectrum_obj["scan"] = str(scan_df["scan"].iloc[0]) + + if spectrum_obj["mslevel"] > 1: + spectrum_obj["precursor_mz"] = float(scan_df["precmz"].iloc[0]) + + output_list.append(spectrum_obj) + + return output_list + def _extract_spectra(results_df, input_spectra_folder, output_mgf_filename=None, @@ -184,12 +212,14 @@ def _extract_spectra(results_df, input_spectra_folder, input_spectra_filename = os.path.join(input_spectra_folder, results_by_file_df["filename"].iloc[0]) spectrum_obj_list = [] - if ".mzML" in input_spectra_filename: + if input_spectra_filename.endswith(".mzML"): spectrum_obj_list = _extract_mzML_scan(input_spectra_filename, list(set(results_by_file_df["scan"]))) - if ".mzXML" in input_spectra_filename: + if input_spectra_filename.endswith(".mzXML"): spectrum_obj_list = _extract_mzXML_scan(input_spectra_filename, list(set(results_by_file_df["scan"]))) - if ".mgf" in input_spectra_filename: + if input_spectra_filename.endswith(".mgf"): spectrum_obj_list = _extract_mgf_scan(input_spectra_filename, list(set(results_by_file_df["scan"]))) + if input_spectra_filename.endswith(".parquet"): + spectrum_obj_list = _extract_parquet_scan(input_spectra_filename, list(set(results_by_file_df["scan"]))) for spectrum_obj in spectrum_obj_list: From 2efb2639c7484dc4351283611f7e01dfe376e032 Mon Sep 17 00:00:00 2001 From: Mingxun Wang Date: Tue, 7 Sep 2021 14:17:55 -0700 Subject: [PATCH 5/6] adding workflow for caching --- massql/msql_fileloading.py | 2 -- workflow/workflow_caching.nf | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 workflow/workflow_caching.nf diff --git a/massql/msql_fileloading.py b/massql/msql_fileloading.py index 0726742..343e8a7 100644 --- a/massql/msql_fileloading.py +++ b/massql/msql_fileloading.py @@ -632,6 +632,4 @@ def _load_data_txt(input_filename): ms1_df['rt'] = 0 ms1_df['polarity'] = "Positive" - print(ms1_df) - return ms1_df, pd.DataFrame() diff --git a/workflow/workflow_caching.nf b/workflow/workflow_caching.nf new file mode 100644 index 0000000..caaff84 --- /dev/null +++ b/workflow/workflow_caching.nf @@ -0,0 +1,34 @@ +#!/usr/bin/env nextflow + +params.input_spectra = 'data/GNPS00002_A3_p.mzML' + +_spectra_ch = Channel.fromPath( params.input_spectra ) +_spectra_ch.into{_spectra_ch1;_spectra_ch2} + +_spectra_ch3 = _spectra_ch1.map { file -> tuple(file, file.toString().replaceAll("/", "_").replaceAll(" ", "_"), file) } + +TOOL_FOLDER = "$baseDir/bin" +params.publishdir = "nf_output" +params.PYTHONRUNTIME = "python" // this is a hack because CCMS cluster does not have python installed + +process queryData { + errorStrategy 'ignore' + time '4h' + //maxRetries 3 + //memory { 6.GB * task.attempt } + memory { 12.GB } + + //publishDir "$params.publishdir/msql_temp", mode: 'copy' + + input: + set val(filepath), val(mangled_output_filename), file(input_spectrum) from _spectra_ch3 + + output: + file "*parquet" + + """ + $params.PYTHONRUNTIME $TOOL_FOLDER/msql_cmd.py \ + "$input_spectrum" \ + --original_path "$filepath" + """ +} \ No newline at end of file From 01c6b5aeeace8df2e8f5fc33a77c0dfdf8e41946 Mon Sep 17 00:00:00 2001 From: Mingxun Wang Date: Tue, 7 Sep 2021 14:38:21 -0700 Subject: [PATCH 6/6] initial caching --- workflow/Makefile | 6 ++++++ workflow/workflow_caching.nf | 8 ++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/workflow/Makefile b/workflow/Makefile index 90164ee..166eca7 100644 --- a/workflow/Makefile +++ b/workflow/Makefile @@ -1,3 +1,5 @@ +# Workflow Tests + run_test: nextflow run workflow.nf -c test.config @@ -12,6 +14,10 @@ run_test_iron: nextflow run workflow.nf -c test.config --input_spectra="./test/isa_9_fe.mzML" --query="QUERY scaninfo(MS1DATA) WHERE MS1MZ=X-2:INTENSITYMATCH=Y*0.063:INTENSITYMATCHPERCENT=25 AND MS1MZ=X:INTENSITYMATCH=Y:INTENSITYMATCHREFERENCE:INTENSITYPERCENT=5 AND MS2PREC=X" --resume +# Caching Tests +run_caching_test: + nextflow run workflow_caching.nf -c test.config + cluster_test: nextflow run workflow.nf -c cluster.config \ --input_spectra="/data/massive/MSV000082622/ccms_peak/**.mzML" \ diff --git a/workflow/workflow_caching.nf b/workflow/workflow_caching.nf index caaff84..f25435a 100644 --- a/workflow/workflow_caching.nf +++ b/workflow/workflow_caching.nf @@ -11,14 +11,14 @@ TOOL_FOLDER = "$baseDir/bin" params.publishdir = "nf_output" params.PYTHONRUNTIME = "python" // this is a hack because CCMS cluster does not have python installed -process queryData { +process cacheData { errorStrategy 'ignore' time '4h' //maxRetries 3 //memory { 6.GB * task.attempt } memory { 12.GB } - //publishDir "$params.publishdir/msql_temp", mode: 'copy' + publishDir "$params.publishdir/msql_cache", mode: 'copy' input: set val(filepath), val(mangled_output_filename), file(input_spectrum) from _spectra_ch3 @@ -28,7 +28,7 @@ process queryData { """ $params.PYTHONRUNTIME $TOOL_FOLDER/msql_cmd.py \ - "$input_spectrum" \ - --original_path "$filepath" + "$input_spectrum" + mv ${input_spectrum}.msql.parquet ${mangled_output_filename}.msql.parquet """ } \ No newline at end of file