From 04a13eb07ac9bddc5c547201f612c243620fb6f8 Mon Sep 17 00:00:00 2001 From: Joshua Charkow Date: Mon, 18 Aug 2025 20:08:26 -0400 Subject: [PATCH 01/11] feature: more efficient sqMass to parquet export --- pyprophet/io/_base.py | 10 +++ pyprophet/io/export/osw.py | 8 --- pyprophet/io/export/sqmass.py | 129 +++++++++++++++++++++++++++++++--- 3 files changed, 131 insertions(+), 16 deletions(-) diff --git a/pyprophet/io/_base.py b/pyprophet/io/_base.py index 89e284d9..eac212ea 100644 --- a/pyprophet/io/_base.py +++ b/pyprophet/io/_base.py @@ -865,6 +865,16 @@ def _quantile_normalize(self, matrix: pd.DataFrame) -> pd.DataFrame: raise ImportError( "scikit-learn is required for quantile normalization" ) from exc + def _execute_copy_query(self, conn, query: str, path: str) -> None: + """Execute COPY query with configured compression settings""" + print(f"COPY ({query}) TO '{path}' ") + print(f"(FORMAT 'parquet', COMPRESSION '{self.config.compression_method}', ") + print(f"COMPRESSION_LEVEL {self.config.compression_level})") + conn.execute( + f"COPY ({query}) TO '{path}' " + f"(FORMAT 'parquet', COMPRESSION '{self.config.compression_method}', " + f"COMPRESSION_LEVEL {self.config.compression_level})" + ) @dataclass diff --git a/pyprophet/io/export/osw.py b/pyprophet/io/export/osw.py index 02d1669d..e35336c1 100644 --- a/pyprophet/io/export/osw.py +++ b/pyprophet/io/export/osw.py @@ -1389,11 +1389,3 @@ def _build_score_column_selection_and_joins( ", ".join(score_columns_to_select), " ".join(score_tables_to_join), ) - - def _execute_copy_query(self, conn, query: str, path: str) -> None: - """Execute COPY query with configured compression settings""" - conn.execute( - f"COPY ({query}) TO '{path}' " - f"(FORMAT 'parquet', COMPRESSION '{self.config.compression_method}', " - f"COMPRESSION_LEVEL {self.config.compression_level})" - ) diff --git a/pyprophet/io/export/sqmass.py b/pyprophet/io/export/sqmass.py index b95caa7d..c58efcf1 100644 --- a/pyprophet/io/export/sqmass.py +++ b/pyprophet/io/export/sqmass.py @@ -280,7 +280,6 @@ class SqMassWriter(BaseWriter): def __init__(self, config: ExportIOConfig): super().__init__(config) - self.reader = SqMassReader(config) def save_results(self, result, pi0): raise NotImplementedError( @@ -301,13 +300,127 @@ def _write_parquet(self) -> None: """Handle parquet export based on configuration""" if self.config.file_type != "sqmass": raise ValueError("Parquet export only supported from sqMass files") + + conn = duckdb.connect(":memory:") + load_sqlite_scanner(conn) - # Read data using the reader - df = self.reader.read() + #self._initialize_connection() + #self._create_indexes() + query = self._build_export_query() - # Write to parquet using configured compression - df.to_parquet( - self.config.outfile, - compression=self.config.compression_method, - compression_level=self.config.compression_level, + if self.config.export_format == "parquet": + return self._execute_copy_query(conn, query, self.config.outfile) + raise ValueError(f"Unsupported export format: {self.config.export_format}") + + + def _build_export_query(self) -> str: + return f""" + WITH pqp_data AS ( + SELECT + PRECURSOR.ID AS PRECURSOR_ID, + TRANSITION.ID AS TRANSITION_ID, + PEPTIDE.MODIFIED_SEQUENCE, + PRECURSOR.CHARGE AS PRECURSOR_CHARGE, + TRANSITION.CHARGE AS PRODUCT_CHARGE, + TRANSITION.DETECTING AS DETECTING_TRANSITION, + PRECURSOR.DECOY AS PRECURSOR_DECOY, + TRANSITION.DECOY AS PRODUCT_DECOY + FROM sqlite_scan('{self.config.pqp_file}', 'PRECURSOR') as PRECURSOR + INNER JOIN sqlite_scan('{self.config.pqp_file}', 'PRECURSOR_PEPTIDE_MAPPING') as PRECURSOR_PEPTIDE_MAPPING + ON PRECURSOR.ID = PRECURSOR_PEPTIDE_MAPPING.PRECURSOR_ID + INNER JOIN sqlite_scan('{self.config.pqp_file}', 'PEPTIDE') as PEPTIDE + ON PRECURSOR_PEPTIDE_MAPPING.PEPTIDE_ID = PEPTIDE.ID + INNER JOIN sqlite_scan('{self.config.pqp_file}', 'TRANSITION_PRECURSOR_MAPPING') as TRANSITION_PRECURSOR_MAPPING + ON PRECURSOR.ID = TRANSITION_PRECURSOR_MAPPING.PRECURSOR_ID + INNER JOIN sqlite_scan('{self.config.pqp_file}', 'TRANSITION') as TRANSITION + ON TRANSITION_PRECURSOR_MAPPING.TRANSITION_ID = TRANSITION.ID + ), + chrom_data AS ( + SELECT + CHROMATOGRAM.NATIVE_ID, + DATA.COMPRESSION, + DATA.DATA_TYPE, + DATA.DATA + FROM sqlite_scan('{self.config.infile}', 'CHROMATOGRAM') as CHROMATOGRAM + INNER JOIN sqlite_scan('{self.config.infile}', 'DATA') as DATA + ON DATA.CHROMATOGRAM_ID = CHROMATOGRAM.ID + ), + prec_meta AS ( + SELECT DISTINCT + PRECURSOR_ID, + MODIFIED_SEQUENCE, + PRECURSOR_CHARGE, + PRECURSOR_DECOY + FROM pqp_data + ), + chrom_prec_merged AS ( + SELECT + p.PRECURSOR_ID, + NULL AS TRANSITION_ID, + p.MODIFIED_SEQUENCE, + p.PRECURSOR_CHARGE, + NULL AS PRODUCT_CHARGE, + 1 AS DETECTING_TRANSITION, + p.PRECURSOR_DECOY, + NULL AS PRODUCT_DECOY, + c.NATIVE_ID, + c.COMPRESSION, + c.DATA_TYPE, + c.DATA + FROM prec_meta p + INNER JOIN chrom_data c + ON p.PRECURSOR_ID = CAST(REGEXP_EXTRACT(c.NATIVE_ID, '(\\d+)_Precursor_i\\d+', 1) AS STRING) + WHERE REGEXP_MATCHES(c.NATIVE_ID, '_Precursor_i\\d+') + ), + chrom_trans_merged AS ( + SELECT + p.PRECURSOR_ID, + p.TRANSITION_ID, + p.MODIFIED_SEQUENCE, + p.PRECURSOR_CHARGE, + p.PRODUCT_CHARGE, + p.DETECTING_TRANSITION, + p.PRECURSOR_DECOY, + p.PRODUCT_DECOY, + c.NATIVE_ID, + c.COMPRESSION, + c.DATA_TYPE, + c.DATA + FROM pqp_data p + INNER JOIN chrom_data c ON p.TRANSITION_ID = CAST(c.NATIVE_ID AS STRING) + WHERE NOT REGEXP_MATCHES(c.NATIVE_ID, '_Precursor_i\\d+') + ), + chrom_combined AS ( + SELECT * FROM chrom_prec_merged + UNION ALL + SELECT * FROM chrom_trans_merged ) + SELECT + PRECURSOR_ID, + TRANSITION_ID, + MODIFIED_SEQUENCE, + PRECURSOR_CHARGE, + PRODUCT_CHARGE, + DETECTING_TRANSITION, + PRECURSOR_DECOY, + PRODUCT_DECOY, + NATIVE_ID, + MAX(CASE WHEN DATA_TYPE = 2 THEN DATA END) AS RT_DATA, + MAX(CASE WHEN DATA_TYPE = 1 THEN DATA END) AS INTENSITY_DATA, + MAX(CASE WHEN DATA_TYPE = 2 THEN COMPRESSION END) AS RT_COMPRESSION, + MAX(CASE WHEN DATA_TYPE = 1 THEN COMPRESSION END) AS INTENSITY_COMPRESSION + FROM chrom_combined + GROUP BY + PRECURSOR_ID, + TRANSITION_ID, + MODIFIED_SEQUENCE, + PRECURSOR_CHARGE, + PRODUCT_CHARGE, + DETECTING_TRANSITION, + PRECURSOR_DECOY, + PRODUCT_DECOY, + NATIVE_ID + ORDER BY PRECURSOR_ID, + CASE WHEN TRANSITION_ID IS NULL THEN 0 ELSE 1 END, + TRANSITION_ID + """ \ No newline at end of file From bdd2a9c8612e2e879a2585fcf8c6f119a0cbb7d0 Mon Sep 17 00:00:00 2001 From: Joshua Charkow Date: Tue, 19 Aug 2025 08:51:11 -0400 Subject: [PATCH 02/11] feature: add transition ordinal and type to export --- pyprophet/io/export/sqmass.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pyprophet/io/export/sqmass.py b/pyprophet/io/export/sqmass.py index c58efcf1..a9e9ac6d 100644 --- a/pyprophet/io/export/sqmass.py +++ b/pyprophet/io/export/sqmass.py @@ -323,6 +323,8 @@ def _build_export_query(self) -> str: PRECURSOR.CHARGE AS PRECURSOR_CHARGE, TRANSITION.CHARGE AS PRODUCT_CHARGE, TRANSITION.DETECTING AS DETECTING_TRANSITION, + TRANSITION.ORDINAL AS TRANSITION_ORDINAL, + TRANSITION.TYPE AS TRANSITION_TYPE, PRECURSOR.DECOY AS PRECURSOR_DECOY, TRANSITION.DECOY AS PRODUCT_DECOY FROM sqlite_scan('{self.config.pqp_file}', 'PRECURSOR') as PRECURSOR @@ -360,6 +362,8 @@ def _build_export_query(self) -> str: p.MODIFIED_SEQUENCE, p.PRECURSOR_CHARGE, NULL AS PRODUCT_CHARGE, + NULL AS TRANSITION_ORDINAL, + NULL AS TRANSITION_TYPE, 1 AS DETECTING_TRANSITION, p.PRECURSOR_DECOY, NULL AS PRODUCT_DECOY, @@ -382,6 +386,8 @@ def _build_export_query(self) -> str: p.DETECTING_TRANSITION, p.PRECURSOR_DECOY, p.PRODUCT_DECOY, + p.TRANSITION_ORDINAL, + p.TRANSITION_TYPE, c.NATIVE_ID, c.COMPRESSION, c.DATA_TYPE, @@ -404,6 +410,8 @@ def _build_export_query(self) -> str: DETECTING_TRANSITION, PRECURSOR_DECOY, PRODUCT_DECOY, + TRANSITION_ORDINAL, + TRANSITION_TYPE, NATIVE_ID, MAX(CASE WHEN DATA_TYPE = 2 THEN DATA END) AS RT_DATA, MAX(CASE WHEN DATA_TYPE = 1 THEN DATA END) AS INTENSITY_DATA, @@ -419,7 +427,9 @@ def _build_export_query(self) -> str: DETECTING_TRANSITION, PRECURSOR_DECOY, PRODUCT_DECOY, - NATIVE_ID + TRANSITION_ORDINAL, + TRANSITION_TYPE, + NATIVE_ID, ORDER BY PRECURSOR_ID, CASE WHEN TRANSITION_ID IS NULL THEN 0 ELSE 1 END, TRANSITION_ID From c2f6f4196cb54ecf49dc3464d861fd915908fe27 Mon Sep 17 00:00:00 2001 From: Joshua Charkow Date: Tue, 19 Aug 2025 08:52:48 -0400 Subject: [PATCH 03/11] remove old comments and print statements --- pyprophet/io/_base.py | 3 --- pyprophet/io/export/sqmass.py | 2 -- 2 files changed, 5 deletions(-) diff --git a/pyprophet/io/_base.py b/pyprophet/io/_base.py index eac212ea..a1334011 100644 --- a/pyprophet/io/_base.py +++ b/pyprophet/io/_base.py @@ -867,9 +867,6 @@ def _quantile_normalize(self, matrix: pd.DataFrame) -> pd.DataFrame: ) from exc def _execute_copy_query(self, conn, query: str, path: str) -> None: """Execute COPY query with configured compression settings""" - print(f"COPY ({query}) TO '{path}' ") - print(f"(FORMAT 'parquet', COMPRESSION '{self.config.compression_method}', ") - print(f"COMPRESSION_LEVEL {self.config.compression_level})") conn.execute( f"COPY ({query}) TO '{path}' " f"(FORMAT 'parquet', COMPRESSION '{self.config.compression_method}', " diff --git a/pyprophet/io/export/sqmass.py b/pyprophet/io/export/sqmass.py index a9e9ac6d..0936d869 100644 --- a/pyprophet/io/export/sqmass.py +++ b/pyprophet/io/export/sqmass.py @@ -304,8 +304,6 @@ def _write_parquet(self) -> None: conn = duckdb.connect(":memory:") load_sqlite_scanner(conn) - #self._initialize_connection() - #self._create_indexes() query = self._build_export_query() if self.config.export_format == "parquet": From c528f16a132fee11e5511ad971dadc2b28bd4cd5 Mon Sep 17 00:00:00 2001 From: Joshua Charkow Date: Tue, 19 Aug 2025 09:00:25 -0400 Subject: [PATCH 04/11] fix bugs with adding transition columns --- pyprophet/io/export/sqmass.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyprophet/io/export/sqmass.py b/pyprophet/io/export/sqmass.py index 0936d869..6b730327 100644 --- a/pyprophet/io/export/sqmass.py +++ b/pyprophet/io/export/sqmass.py @@ -360,11 +360,11 @@ def _build_export_query(self) -> str: p.MODIFIED_SEQUENCE, p.PRECURSOR_CHARGE, NULL AS PRODUCT_CHARGE, - NULL AS TRANSITION_ORDINAL, - NULL AS TRANSITION_TYPE, 1 AS DETECTING_TRANSITION, p.PRECURSOR_DECOY, NULL AS PRODUCT_DECOY, + NULL AS TRANSITION_ORDINAL, + NULL AS TRANSITION_TYPE, c.NATIVE_ID, c.COMPRESSION, c.DATA_TYPE, From 03e6b36e08b1287afa198172a76a7f1701a38206 Mon Sep 17 00:00:00 2001 From: Joshua Charkow <47336288+jcharkow@users.noreply.github.com> Date: Tue, 19 Aug 2025 16:30:45 -0400 Subject: [PATCH 05/11] Apply suggestions from code review Co-authored-by: Justin Sing <32938975+singjc@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pyprophet/io/_base.py | 1 + pyprophet/io/export/sqmass.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pyprophet/io/_base.py b/pyprophet/io/_base.py index a1334011..3f586b4b 100644 --- a/pyprophet/io/_base.py +++ b/pyprophet/io/_base.py @@ -865,6 +865,7 @@ def _quantile_normalize(self, matrix: pd.DataFrame) -> pd.DataFrame: raise ImportError( "scikit-learn is required for quantile normalization" ) from exc + def _execute_copy_query(self, conn, query: str, path: str) -> None: """Execute COPY query with configured compression settings""" conn.execute( diff --git a/pyprophet/io/export/sqmass.py b/pyprophet/io/export/sqmass.py index 6b730327..4e232c9e 100644 --- a/pyprophet/io/export/sqmass.py +++ b/pyprophet/io/export/sqmass.py @@ -307,7 +307,7 @@ def _write_parquet(self) -> None: query = self._build_export_query() if self.config.export_format == "parquet": - return self._execute_copy_query(conn, query, self.config.outfile) + self._execute_copy_query(conn, query, self.config.outfile) raise ValueError(f"Unsupported export format: {self.config.export_format}") @@ -427,7 +427,7 @@ def _build_export_query(self) -> str: PRODUCT_DECOY, TRANSITION_ORDINAL, TRANSITION_TYPE, - NATIVE_ID, + NATIVE_ID ORDER BY PRECURSOR_ID, CASE WHEN TRANSITION_ID IS NULL THEN 0 ELSE 1 END, TRANSITION_ID From 5dacab51fbee3afca7ea4dbb99196b6d7550f79d Mon Sep 17 00:00:00 2001 From: Joshua Charkow Date: Tue, 19 Aug 2025 16:39:46 -0400 Subject: [PATCH 06/11] create reader object so can add indices to sqMass --- pyprophet/io/export/sqmass.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pyprophet/io/export/sqmass.py b/pyprophet/io/export/sqmass.py index 4e232c9e..ab162560 100644 --- a/pyprophet/io/export/sqmass.py +++ b/pyprophet/io/export/sqmass.py @@ -279,6 +279,7 @@ class SqMassWriter(BaseWriter): """ def __init__(self, config: ExportIOConfig): + self.reader = SqMassReader(config) super().__init__(config) def save_results(self, result, pi0): @@ -302,6 +303,7 @@ def _write_parquet(self) -> None: raise ValueError("Parquet export only supported from sqMass files") conn = duckdb.connect(":memory:") + self.reader._create_indexes() load_sqlite_scanner(conn) query = self._build_export_query() From c94b00e2bf5cf29f58cd625d9a005d0800a8c110 Mon Sep 17 00:00:00 2001 From: Joshua Charkow Date: Tue, 19 Aug 2025 16:42:40 -0400 Subject: [PATCH 07/11] fix: bug resulting from code suggestions --- pyprophet/io/export/sqmass.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyprophet/io/export/sqmass.py b/pyprophet/io/export/sqmass.py index ab162560..b5ac10fc 100644 --- a/pyprophet/io/export/sqmass.py +++ b/pyprophet/io/export/sqmass.py @@ -310,7 +310,8 @@ def _write_parquet(self) -> None: if self.config.export_format == "parquet": self._execute_copy_query(conn, query, self.config.outfile) - raise ValueError(f"Unsupported export format: {self.config.export_format}") + else: + raise ValueError(f"Unsupported export format: {self.config.export_format}") def _build_export_query(self) -> str: From 9cdef0490007bd82e57b2b8db7e74b731c7c4256 Mon Sep 17 00:00:00 2001 From: Joshua Charkow Date: Tue, 19 Aug 2025 16:49:00 -0400 Subject: [PATCH 08/11] split sqlite query into smaller functions for better readability --- pyprophet/io/export/sqmass.py | 71 ++++++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/pyprophet/io/export/sqmass.py b/pyprophet/io/export/sqmass.py index b5ac10fc..25e4d5db 100644 --- a/pyprophet/io/export/sqmass.py +++ b/pyprophet/io/export/sqmass.py @@ -315,8 +315,28 @@ def _write_parquet(self) -> None: def _build_export_query(self) -> str: + """Build the complete export query by combining all subqueries""" + pqp_cte = self._build_pqp_data_cte() + chrom_cte = self._build_chrom_data_cte() + prec_meta_cte = self._build_prec_meta_cte() + prec_merged_cte = self._build_chrom_prec_merged_cte() + trans_merged_cte = self._build_chrom_trans_merged_cte() + combined_cte = self._build_chrom_combined_cte() + final_select = self._build_final_select() + return f""" - WITH pqp_data AS ( + WITH {pqp_cte}, + {chrom_cte}, + {prec_meta_cte}, + {prec_merged_cte}, + {trans_merged_cte}, + {combined_cte} + {final_select} + """ + + def _build_pqp_data_cte(self) -> str: + """Build the PQP data CTE that extracts precursor and transition information""" + return f"""pqp_data AS ( SELECT PRECURSOR.ID AS PRECURSOR_ID, TRANSITION.ID AS TRANSITION_ID, @@ -337,8 +357,11 @@ def _build_export_query(self) -> str: ON PRECURSOR.ID = TRANSITION_PRECURSOR_MAPPING.PRECURSOR_ID INNER JOIN sqlite_scan('{self.config.pqp_file}', 'TRANSITION') as TRANSITION ON TRANSITION_PRECURSOR_MAPPING.TRANSITION_ID = TRANSITION.ID - ), - chrom_data AS ( + )""" + + def _build_chrom_data_cte(self) -> str: + """Build the chromatogram data CTE that extracts raw chromatogram information""" + return f"""chrom_data AS ( SELECT CHROMATOGRAM.NATIVE_ID, DATA.COMPRESSION, @@ -347,16 +370,22 @@ def _build_export_query(self) -> str: FROM sqlite_scan('{self.config.infile}', 'CHROMATOGRAM') as CHROMATOGRAM INNER JOIN sqlite_scan('{self.config.infile}', 'DATA') as DATA ON DATA.CHROMATOGRAM_ID = CHROMATOGRAM.ID - ), - prec_meta AS ( + )""" + + def _build_prec_meta_cte(self) -> str: + """Build the precursor metadata CTE""" + return """prec_meta AS ( SELECT DISTINCT PRECURSOR_ID, MODIFIED_SEQUENCE, PRECURSOR_CHARGE, PRECURSOR_DECOY FROM pqp_data - ), - chrom_prec_merged AS ( + )""" + + def _build_chrom_prec_merged_cte(self) -> str: + """Build the CTE that merges precursor chromatogram data""" + return """chrom_prec_merged AS ( SELECT p.PRECURSOR_ID, NULL AS TRANSITION_ID, @@ -374,10 +403,13 @@ def _build_export_query(self) -> str: c.DATA FROM prec_meta p INNER JOIN chrom_data c - ON p.PRECURSOR_ID = CAST(REGEXP_EXTRACT(c.NATIVE_ID, '(\\d+)_Precursor_i\\d+', 1) AS STRING) - WHERE REGEXP_MATCHES(c.NATIVE_ID, '_Precursor_i\\d+') - ), - chrom_trans_merged AS ( + ON p.PRECURSOR_ID = CAST(REGEXP_EXTRACT(c.NATIVE_ID, '(\\\\d+)_Precursor_i\\\\d+', 1) AS STRING) + WHERE REGEXP_MATCHES(c.NATIVE_ID, '_Precursor_i\\\\d+') + )""" + + def _build_chrom_trans_merged_cte(self) -> str: + """Build the CTE that merges transition chromatogram data""" + return """chrom_trans_merged AS ( SELECT p.PRECURSOR_ID, p.TRANSITION_ID, @@ -396,13 +428,19 @@ def _build_export_query(self) -> str: FROM pqp_data p INNER JOIN chrom_data c ON p.TRANSITION_ID = CAST(c.NATIVE_ID AS STRING) WHERE NOT REGEXP_MATCHES(c.NATIVE_ID, '_Precursor_i\\d+') - ), - chrom_combined AS ( + )""" + + def _build_chrom_combined_cte(self) -> str: + """Build the CTE that combines precursor and transition chromatogram data""" + return """chrom_combined AS ( SELECT * FROM chrom_prec_merged UNION ALL SELECT * FROM chrom_trans_merged - ) - SELECT + )""" + + def _build_final_select(self) -> str: + """Build the final SELECT statement with aggregation and ordering""" + return """SELECT PRECURSOR_ID, TRANSITION_ID, MODIFIED_SEQUENCE, @@ -433,5 +471,4 @@ def _build_export_query(self) -> str: NATIVE_ID ORDER BY PRECURSOR_ID, CASE WHEN TRANSITION_ID IS NULL THEN 0 ELSE 1 END, - TRANSITION_ID - """ \ No newline at end of file + TRANSITION_ID""" \ No newline at end of file From 3388d951d6a55407a07f74e421f188a2b51e8625 Mon Sep 17 00:00:00 2001 From: Joshua Charkow Date: Tue, 19 Aug 2025 16:59:47 -0400 Subject: [PATCH 09/11] Revert "split sqlite query into smaller functions for better readability" This reverts commit 9cdef0490007bd82e57b2b8db7e74b731c7c4256. --- pyprophet/io/export/sqmass.py | 71 +++++++++-------------------------- 1 file changed, 17 insertions(+), 54 deletions(-) diff --git a/pyprophet/io/export/sqmass.py b/pyprophet/io/export/sqmass.py index 25e4d5db..b5ac10fc 100644 --- a/pyprophet/io/export/sqmass.py +++ b/pyprophet/io/export/sqmass.py @@ -315,28 +315,8 @@ def _write_parquet(self) -> None: def _build_export_query(self) -> str: - """Build the complete export query by combining all subqueries""" - pqp_cte = self._build_pqp_data_cte() - chrom_cte = self._build_chrom_data_cte() - prec_meta_cte = self._build_prec_meta_cte() - prec_merged_cte = self._build_chrom_prec_merged_cte() - trans_merged_cte = self._build_chrom_trans_merged_cte() - combined_cte = self._build_chrom_combined_cte() - final_select = self._build_final_select() - return f""" - WITH {pqp_cte}, - {chrom_cte}, - {prec_meta_cte}, - {prec_merged_cte}, - {trans_merged_cte}, - {combined_cte} - {final_select} - """ - - def _build_pqp_data_cte(self) -> str: - """Build the PQP data CTE that extracts precursor and transition information""" - return f"""pqp_data AS ( + WITH pqp_data AS ( SELECT PRECURSOR.ID AS PRECURSOR_ID, TRANSITION.ID AS TRANSITION_ID, @@ -357,11 +337,8 @@ def _build_pqp_data_cte(self) -> str: ON PRECURSOR.ID = TRANSITION_PRECURSOR_MAPPING.PRECURSOR_ID INNER JOIN sqlite_scan('{self.config.pqp_file}', 'TRANSITION') as TRANSITION ON TRANSITION_PRECURSOR_MAPPING.TRANSITION_ID = TRANSITION.ID - )""" - - def _build_chrom_data_cte(self) -> str: - """Build the chromatogram data CTE that extracts raw chromatogram information""" - return f"""chrom_data AS ( + ), + chrom_data AS ( SELECT CHROMATOGRAM.NATIVE_ID, DATA.COMPRESSION, @@ -370,22 +347,16 @@ def _build_chrom_data_cte(self) -> str: FROM sqlite_scan('{self.config.infile}', 'CHROMATOGRAM') as CHROMATOGRAM INNER JOIN sqlite_scan('{self.config.infile}', 'DATA') as DATA ON DATA.CHROMATOGRAM_ID = CHROMATOGRAM.ID - )""" - - def _build_prec_meta_cte(self) -> str: - """Build the precursor metadata CTE""" - return """prec_meta AS ( + ), + prec_meta AS ( SELECT DISTINCT PRECURSOR_ID, MODIFIED_SEQUENCE, PRECURSOR_CHARGE, PRECURSOR_DECOY FROM pqp_data - )""" - - def _build_chrom_prec_merged_cte(self) -> str: - """Build the CTE that merges precursor chromatogram data""" - return """chrom_prec_merged AS ( + ), + chrom_prec_merged AS ( SELECT p.PRECURSOR_ID, NULL AS TRANSITION_ID, @@ -403,13 +374,10 @@ def _build_chrom_prec_merged_cte(self) -> str: c.DATA FROM prec_meta p INNER JOIN chrom_data c - ON p.PRECURSOR_ID = CAST(REGEXP_EXTRACT(c.NATIVE_ID, '(\\\\d+)_Precursor_i\\\\d+', 1) AS STRING) - WHERE REGEXP_MATCHES(c.NATIVE_ID, '_Precursor_i\\\\d+') - )""" - - def _build_chrom_trans_merged_cte(self) -> str: - """Build the CTE that merges transition chromatogram data""" - return """chrom_trans_merged AS ( + ON p.PRECURSOR_ID = CAST(REGEXP_EXTRACT(c.NATIVE_ID, '(\\d+)_Precursor_i\\d+', 1) AS STRING) + WHERE REGEXP_MATCHES(c.NATIVE_ID, '_Precursor_i\\d+') + ), + chrom_trans_merged AS ( SELECT p.PRECURSOR_ID, p.TRANSITION_ID, @@ -428,19 +396,13 @@ def _build_chrom_trans_merged_cte(self) -> str: FROM pqp_data p INNER JOIN chrom_data c ON p.TRANSITION_ID = CAST(c.NATIVE_ID AS STRING) WHERE NOT REGEXP_MATCHES(c.NATIVE_ID, '_Precursor_i\\d+') - )""" - - def _build_chrom_combined_cte(self) -> str: - """Build the CTE that combines precursor and transition chromatogram data""" - return """chrom_combined AS ( + ), + chrom_combined AS ( SELECT * FROM chrom_prec_merged UNION ALL SELECT * FROM chrom_trans_merged - )""" - - def _build_final_select(self) -> str: - """Build the final SELECT statement with aggregation and ordering""" - return """SELECT + ) + SELECT PRECURSOR_ID, TRANSITION_ID, MODIFIED_SEQUENCE, @@ -471,4 +433,5 @@ def _build_final_select(self) -> str: NATIVE_ID ORDER BY PRECURSOR_ID, CASE WHEN TRANSITION_ID IS NULL THEN 0 ELSE 1 END, - TRANSITION_ID""" \ No newline at end of file + TRANSITION_ID + """ \ No newline at end of file From 0720b045e4799479fb7ea6381a51da2d4a13f328 Mon Sep 17 00:00:00 2001 From: Joshua Charkow Date: Tue, 19 Aug 2025 17:43:06 -0400 Subject: [PATCH 10/11] refactor - split query into smaller chunks --- pyprophet/io/export/sqmass.py | 68 +++++++++++++++++++++++++++-------- 1 file changed, 53 insertions(+), 15 deletions(-) diff --git a/pyprophet/io/export/sqmass.py b/pyprophet/io/export/sqmass.py index b5ac10fc..357b851f 100644 --- a/pyprophet/io/export/sqmass.py +++ b/pyprophet/io/export/sqmass.py @@ -301,7 +301,7 @@ def _write_parquet(self) -> None: """Handle parquet export based on configuration""" if self.config.file_type != "sqmass": raise ValueError("Parquet export only supported from sqMass files") - + conn = duckdb.connect(":memory:") self.reader._create_indexes() load_sqlite_scanner(conn) @@ -312,11 +312,31 @@ def _write_parquet(self) -> None: self._execute_copy_query(conn, query, self.config.outfile) else: raise ValueError(f"Unsupported export format: {self.config.export_format}") - def _build_export_query(self) -> str: + """Assemble the export SQL query from smaller parts.""" + pqp_cte = self._cte_pqp_data() + chrom_cte = self._cte_chrom_data() + prec_meta_cte = self._cte_prec_meta() + chrom_prec_cte = self._cte_chrom_prec_merged() + chrom_trans_cte = self._cte_chrom_trans_merged() + chrom_combined_cte = self._cte_chrom_combined() + final_select = self._final_select_from_combined() + + return f""" + WITH pqp_data AS ( {pqp_cte}), + chrom_data AS ( {chrom_cte}), + prec_meta AS ( {prec_meta_cte}), + chrom_prec_merged AS ( {chrom_prec_cte}), + chrom_trans_merged AS ( {chrom_trans_cte}), + chrom_combined AS ( {chrom_combined_cte}) + {final_select} + """ + + # --- Helper builders for the SQL query parts --- + def _cte_pqp_data(self) -> str: + """CTE body for pqp_data: library metadata joined across PQP tables.""" return f""" - WITH pqp_data AS ( SELECT PRECURSOR.ID AS PRECURSOR_ID, TRANSITION.ID AS TRANSITION_ID, @@ -337,8 +357,10 @@ def _build_export_query(self) -> str: ON PRECURSOR.ID = TRANSITION_PRECURSOR_MAPPING.PRECURSOR_ID INNER JOIN sqlite_scan('{self.config.pqp_file}', 'TRANSITION') as TRANSITION ON TRANSITION_PRECURSOR_MAPPING.TRANSITION_ID = TRANSITION.ID - ), - chrom_data AS ( + """ + def _cte_chrom_data(self) -> str: + """CTE body for chrom_data: raw chromatogram rows from sqMass (xic).""" + return f""" SELECT CHROMATOGRAM.NATIVE_ID, DATA.COMPRESSION, @@ -347,16 +369,22 @@ def _build_export_query(self) -> str: FROM sqlite_scan('{self.config.infile}', 'CHROMATOGRAM') as CHROMATOGRAM INNER JOIN sqlite_scan('{self.config.infile}', 'DATA') as DATA ON DATA.CHROMATOGRAM_ID = CHROMATOGRAM.ID - ), - prec_meta AS ( + """ + + def _cte_prec_meta(self) -> str: + """CTE body for precursor metadata: distinct precursor-level metadata.""" + return """ SELECT DISTINCT PRECURSOR_ID, MODIFIED_SEQUENCE, PRECURSOR_CHARGE, PRECURSOR_DECOY FROM pqp_data - ), - chrom_prec_merged AS ( + """ + + def _cte_chrom_prec_merged(self) -> str: + """CTE body for chrom_prec_merged: precursor chromatograms with null transition fields.""" + return """ SELECT p.PRECURSOR_ID, NULL AS TRANSITION_ID, @@ -376,8 +404,11 @@ def _build_export_query(self) -> str: INNER JOIN chrom_data c ON p.PRECURSOR_ID = CAST(REGEXP_EXTRACT(c.NATIVE_ID, '(\\d+)_Precursor_i\\d+', 1) AS STRING) WHERE REGEXP_MATCHES(c.NATIVE_ID, '_Precursor_i\\d+') - ), - chrom_trans_merged AS ( + """ + + def _cte_chrom_trans_merged(self) -> str: + """CTE body for chrom_trans_merged: transition chromatograms by matching NATIVE_ID.""" + return """ SELECT p.PRECURSOR_ID, p.TRANSITION_ID, @@ -396,12 +427,19 @@ def _build_export_query(self) -> str: FROM pqp_data p INNER JOIN chrom_data c ON p.TRANSITION_ID = CAST(c.NATIVE_ID AS STRING) WHERE NOT REGEXP_MATCHES(c.NATIVE_ID, '_Precursor_i\\d+') - ), - chrom_combined AS ( + """ + + def _cte_chrom_combined(self) -> str: + """CTE body for chrom_combined: union of precursor and transition chromatograms with metadata""" + return """ SELECT * FROM chrom_prec_merged UNION ALL SELECT * FROM chrom_trans_merged - ) + """ + + def _final_select_from_combined(self) -> str: + """Final SELECT aggregating and ordering the chromatogram arrays.""" + return """ SELECT PRECURSOR_ID, TRANSITION_ID, @@ -434,4 +472,4 @@ def _build_export_query(self) -> str: ORDER BY PRECURSOR_ID, CASE WHEN TRANSITION_ID IS NULL THEN 0 ELSE 1 END, TRANSITION_ID - """ \ No newline at end of file + """ \ No newline at end of file From 4434704b485b8eb9792ccc81ed18f90e889b24a0 Mon Sep 17 00:00:00 2001 From: singjc Date: Tue, 30 Sep 2025 13:12:00 -0400 Subject: [PATCH 11/11] update: chrom parquet schema documentation --- docs/file_formats.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/file_formats.rst b/docs/file_formats.rst index 76484703..eb245651 100644 --- a/docs/file_formats.rst +++ b/docs/file_formats.rst @@ -311,6 +311,8 @@ The Parquet schema used for the XICs is as follows: ('DETECTING_TRANSITION', Int64), ('PRECURSOR_DECOY', Int64), ('PRODUCT_DECOY', Int64), + ('TRANSITION_ORDINAL', Int64), + ('TRANSITION_TYPE', String), ('NATIVE_ID', String), ('RT_DATA', Binary), ('INTENSITY_DATA', Binary),