diff --git a/docs/file_formats.rst b/docs/file_formats.rst index 76484703..eb245651 100644 --- a/docs/file_formats.rst +++ b/docs/file_formats.rst @@ -311,6 +311,8 @@ The Parquet schema used for the XICs is as follows: ('DETECTING_TRANSITION', Int64), ('PRECURSOR_DECOY', Int64), ('PRODUCT_DECOY', Int64), + ('TRANSITION_ORDINAL', Int64), + ('TRANSITION_TYPE', String), ('NATIVE_ID', String), ('RT_DATA', Binary), ('INTENSITY_DATA', Binary), diff --git a/pyprophet/io/_base.py b/pyprophet/io/_base.py index 89e284d9..3f586b4b 100644 --- a/pyprophet/io/_base.py +++ b/pyprophet/io/_base.py @@ -865,6 +865,14 @@ def _quantile_normalize(self, matrix: pd.DataFrame) -> pd.DataFrame: raise ImportError( "scikit-learn is required for quantile normalization" ) from exc + + def _execute_copy_query(self, conn, query: str, path: str) -> None: + """Execute COPY query with configured compression settings""" + conn.execute( + f"COPY ({query}) TO '{path}' " + f"(FORMAT 'parquet', COMPRESSION '{self.config.compression_method}', " + f"COMPRESSION_LEVEL {self.config.compression_level})" + ) @dataclass diff --git a/pyprophet/io/export/osw.py b/pyprophet/io/export/osw.py index 02d1669d..e35336c1 100644 --- a/pyprophet/io/export/osw.py +++ b/pyprophet/io/export/osw.py @@ -1389,11 +1389,3 @@ def _build_score_column_selection_and_joins( ", ".join(score_columns_to_select), " ".join(score_tables_to_join), ) - - def _execute_copy_query(self, conn, query: str, path: str) -> None: - """Execute COPY query with configured compression settings""" - conn.execute( - f"COPY ({query}) TO '{path}' " - f"(FORMAT 'parquet', COMPRESSION '{self.config.compression_method}', " - f"COMPRESSION_LEVEL {self.config.compression_level})" - ) diff --git a/pyprophet/io/export/sqmass.py b/pyprophet/io/export/sqmass.py index b95caa7d..357b851f 100644 --- a/pyprophet/io/export/sqmass.py +++ b/pyprophet/io/export/sqmass.py @@ -279,8 +279,8 @@ class SqMassWriter(BaseWriter): """ def __init__(self, config: ExportIOConfig): - super().__init__(config) self.reader = SqMassReader(config) + super().__init__(config) def save_results(self, result, pi0): raise NotImplementedError( @@ -302,12 +302,174 @@ def _write_parquet(self) -> None: if self.config.file_type != "sqmass": raise ValueError("Parquet export only supported from sqMass files") - # Read data using the reader - df = self.reader.read() + conn = duckdb.connect(":memory:") + self.reader._create_indexes() + load_sqlite_scanner(conn) - # Write to parquet using configured compression - df.to_parquet( - self.config.outfile, - compression=self.config.compression_method, - compression_level=self.config.compression_level, - ) + query = self._build_export_query() + + if self.config.export_format == "parquet": + self._execute_copy_query(conn, query, self.config.outfile) + else: + raise ValueError(f"Unsupported export format: {self.config.export_format}") + + def _build_export_query(self) -> str: + """Assemble the export SQL query from smaller parts.""" + pqp_cte = self._cte_pqp_data() + chrom_cte = self._cte_chrom_data() + prec_meta_cte = self._cte_prec_meta() + chrom_prec_cte = self._cte_chrom_prec_merged() + chrom_trans_cte = self._cte_chrom_trans_merged() + chrom_combined_cte = self._cte_chrom_combined() + final_select = self._final_select_from_combined() + + return f""" + WITH pqp_data AS ( {pqp_cte}), + chrom_data AS ( {chrom_cte}), + prec_meta AS ( {prec_meta_cte}), + chrom_prec_merged AS ( {chrom_prec_cte}), + chrom_trans_merged AS ( {chrom_trans_cte}), + chrom_combined AS ( {chrom_combined_cte}) + {final_select} + """ + + # --- Helper builders for the SQL query parts --- + def _cte_pqp_data(self) -> str: + """CTE body for pqp_data: library metadata joined across PQP tables.""" + return f""" + SELECT + PRECURSOR.ID AS PRECURSOR_ID, + TRANSITION.ID AS TRANSITION_ID, + PEPTIDE.MODIFIED_SEQUENCE, + PRECURSOR.CHARGE AS PRECURSOR_CHARGE, + TRANSITION.CHARGE AS PRODUCT_CHARGE, + TRANSITION.DETECTING AS DETECTING_TRANSITION, + TRANSITION.ORDINAL AS TRANSITION_ORDINAL, + TRANSITION.TYPE AS TRANSITION_TYPE, + PRECURSOR.DECOY AS PRECURSOR_DECOY, + TRANSITION.DECOY AS PRODUCT_DECOY + FROM sqlite_scan('{self.config.pqp_file}', 'PRECURSOR') as PRECURSOR + INNER JOIN sqlite_scan('{self.config.pqp_file}', 'PRECURSOR_PEPTIDE_MAPPING') as PRECURSOR_PEPTIDE_MAPPING + ON PRECURSOR.ID = PRECURSOR_PEPTIDE_MAPPING.PRECURSOR_ID + INNER JOIN sqlite_scan('{self.config.pqp_file}', 'PEPTIDE') as PEPTIDE + ON PRECURSOR_PEPTIDE_MAPPING.PEPTIDE_ID = PEPTIDE.ID + INNER JOIN sqlite_scan('{self.config.pqp_file}', 'TRANSITION_PRECURSOR_MAPPING') as TRANSITION_PRECURSOR_MAPPING + ON PRECURSOR.ID = TRANSITION_PRECURSOR_MAPPING.PRECURSOR_ID + INNER JOIN sqlite_scan('{self.config.pqp_file}', 'TRANSITION') as TRANSITION + ON TRANSITION_PRECURSOR_MAPPING.TRANSITION_ID = TRANSITION.ID + """ + def _cte_chrom_data(self) -> str: + """CTE body for chrom_data: raw chromatogram rows from sqMass (xic).""" + return f""" + SELECT + CHROMATOGRAM.NATIVE_ID, + DATA.COMPRESSION, + DATA.DATA_TYPE, + DATA.DATA + FROM sqlite_scan('{self.config.infile}', 'CHROMATOGRAM') as CHROMATOGRAM + INNER JOIN sqlite_scan('{self.config.infile}', 'DATA') as DATA + ON DATA.CHROMATOGRAM_ID = CHROMATOGRAM.ID + """ + + def _cte_prec_meta(self) -> str: + """CTE body for precursor metadata: distinct precursor-level metadata.""" + return """ + SELECT DISTINCT + PRECURSOR_ID, + MODIFIED_SEQUENCE, + PRECURSOR_CHARGE, + PRECURSOR_DECOY + FROM pqp_data + """ + + def _cte_chrom_prec_merged(self) -> str: + """CTE body for chrom_prec_merged: precursor chromatograms with null transition fields.""" + return """ + SELECT + p.PRECURSOR_ID, + NULL AS TRANSITION_ID, + p.MODIFIED_SEQUENCE, + p.PRECURSOR_CHARGE, + NULL AS PRODUCT_CHARGE, + 1 AS DETECTING_TRANSITION, + p.PRECURSOR_DECOY, + NULL AS PRODUCT_DECOY, + NULL AS TRANSITION_ORDINAL, + NULL AS TRANSITION_TYPE, + c.NATIVE_ID, + c.COMPRESSION, + c.DATA_TYPE, + c.DATA + FROM prec_meta p + INNER JOIN chrom_data c + ON p.PRECURSOR_ID = CAST(REGEXP_EXTRACT(c.NATIVE_ID, '(\\d+)_Precursor_i\\d+', 1) AS STRING) + WHERE REGEXP_MATCHES(c.NATIVE_ID, '_Precursor_i\\d+') + """ + + def _cte_chrom_trans_merged(self) -> str: + """CTE body for chrom_trans_merged: transition chromatograms by matching NATIVE_ID.""" + return """ + SELECT + p.PRECURSOR_ID, + p.TRANSITION_ID, + p.MODIFIED_SEQUENCE, + p.PRECURSOR_CHARGE, + p.PRODUCT_CHARGE, + p.DETECTING_TRANSITION, + p.PRECURSOR_DECOY, + p.PRODUCT_DECOY, + p.TRANSITION_ORDINAL, + p.TRANSITION_TYPE, + c.NATIVE_ID, + c.COMPRESSION, + c.DATA_TYPE, + c.DATA + FROM pqp_data p + INNER JOIN chrom_data c ON p.TRANSITION_ID = CAST(c.NATIVE_ID AS STRING) + WHERE NOT REGEXP_MATCHES(c.NATIVE_ID, '_Precursor_i\\d+') + """ + + def _cte_chrom_combined(self) -> str: + """CTE body for chrom_combined: union of precursor and transition chromatograms with metadata""" + return """ + SELECT * FROM chrom_prec_merged + UNION ALL + SELECT * FROM chrom_trans_merged + """ + + def _final_select_from_combined(self) -> str: + """Final SELECT aggregating and ordering the chromatogram arrays.""" + return """ + SELECT + PRECURSOR_ID, + TRANSITION_ID, + MODIFIED_SEQUENCE, + PRECURSOR_CHARGE, + PRODUCT_CHARGE, + DETECTING_TRANSITION, + PRECURSOR_DECOY, + PRODUCT_DECOY, + TRANSITION_ORDINAL, + TRANSITION_TYPE, + NATIVE_ID, + MAX(CASE WHEN DATA_TYPE = 2 THEN DATA END) AS RT_DATA, + MAX(CASE WHEN DATA_TYPE = 1 THEN DATA END) AS INTENSITY_DATA, + MAX(CASE WHEN DATA_TYPE = 2 THEN COMPRESSION END) AS RT_COMPRESSION, + MAX(CASE WHEN DATA_TYPE = 1 THEN COMPRESSION END) AS INTENSITY_COMPRESSION + FROM chrom_combined + GROUP BY + PRECURSOR_ID, + TRANSITION_ID, + MODIFIED_SEQUENCE, + PRECURSOR_CHARGE, + PRODUCT_CHARGE, + DETECTING_TRANSITION, + PRECURSOR_DECOY, + PRODUCT_DECOY, + TRANSITION_ORDINAL, + TRANSITION_TYPE, + NATIVE_ID + ORDER BY PRECURSOR_ID, + CASE WHEN TRANSITION_ID IS NULL THEN 0 ELSE 1 END, + TRANSITION_ID + """ \ No newline at end of file