Skip to content
Merged
2 changes: 2 additions & 0 deletions docs/file_formats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ The Parquet schema used for the XICs is as follows:
('DETECTING_TRANSITION', Int64),
('PRECURSOR_DECOY', Int64),
('PRODUCT_DECOY', Int64),
('TRANSITION_ORDINAL', Int64),
('TRANSITION_TYPE', String),
('NATIVE_ID', String),
('RT_DATA', Binary),
('INTENSITY_DATA', Binary),
Expand Down
8 changes: 8 additions & 0 deletions pyprophet/io/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,14 @@ def _quantile_normalize(self, matrix: pd.DataFrame) -> pd.DataFrame:
raise ImportError(
"scikit-learn is required for quantile normalization"
) from exc

def _execute_copy_query(self, conn, query: str, path: str) -> None:
"""Execute COPY query with configured compression settings"""
conn.execute(
f"COPY ({query}) TO '{path}' "
f"(FORMAT 'parquet', COMPRESSION '{self.config.compression_method}', "
f"COMPRESSION_LEVEL {self.config.compression_level})"
)


@dataclass
Expand Down
8 changes: 0 additions & 8 deletions pyprophet/io/export/osw.py
Original file line number Diff line number Diff line change
Expand Up @@ -1389,11 +1389,3 @@ def _build_score_column_selection_and_joins(
", ".join(score_columns_to_select),
" ".join(score_tables_to_join),
)

def _execute_copy_query(self, conn, query: str, path: str) -> None:
"""Execute COPY query with configured compression settings"""
conn.execute(
f"COPY ({query}) TO '{path}' "
f"(FORMAT 'parquet', COMPRESSION '{self.config.compression_method}', "
f"COMPRESSION_LEVEL {self.config.compression_level})"
)
180 changes: 171 additions & 9 deletions pyprophet/io/export/sqmass.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ class SqMassWriter(BaseWriter):
"""

def __init__(self, config: ExportIOConfig):
super().__init__(config)
self.reader = SqMassReader(config)
super().__init__(config)

def save_results(self, result, pi0):
raise NotImplementedError(
Expand All @@ -302,12 +302,174 @@ def _write_parquet(self) -> None:
if self.config.file_type != "sqmass":
raise ValueError("Parquet export only supported from sqMass files")

# Read data using the reader
df = self.reader.read()
conn = duckdb.connect(":memory:")
self.reader._create_indexes()
load_sqlite_scanner(conn)

# Write to parquet using configured compression
df.to_parquet(
self.config.outfile,
compression=self.config.compression_method,
compression_level=self.config.compression_level,
)
query = self._build_export_query()

if self.config.export_format == "parquet":
self._execute_copy_query(conn, query, self.config.outfile)
else:
raise ValueError(f"Unsupported export format: {self.config.export_format}")

def _build_export_query(self) -> str:
"""Assemble the export SQL query from smaller parts."""
pqp_cte = self._cte_pqp_data()
chrom_cte = self._cte_chrom_data()
prec_meta_cte = self._cte_prec_meta()
chrom_prec_cte = self._cte_chrom_prec_merged()
chrom_trans_cte = self._cte_chrom_trans_merged()
chrom_combined_cte = self._cte_chrom_combined()
final_select = self._final_select_from_combined()

return f"""
WITH pqp_data AS ( {pqp_cte}),
chrom_data AS ( {chrom_cte}),
prec_meta AS ( {prec_meta_cte}),
chrom_prec_merged AS ( {chrom_prec_cte}),
chrom_trans_merged AS ( {chrom_trans_cte}),
chrom_combined AS ( {chrom_combined_cte})
{final_select}
"""

# --- Helper builders for the SQL query parts ---
def _cte_pqp_data(self) -> str:
"""CTE body for pqp_data: library metadata joined across PQP tables."""
return f"""
SELECT
PRECURSOR.ID AS PRECURSOR_ID,
TRANSITION.ID AS TRANSITION_ID,
PEPTIDE.MODIFIED_SEQUENCE,
PRECURSOR.CHARGE AS PRECURSOR_CHARGE,
TRANSITION.CHARGE AS PRODUCT_CHARGE,
TRANSITION.DETECTING AS DETECTING_TRANSITION,
TRANSITION.ORDINAL AS TRANSITION_ORDINAL,
TRANSITION.TYPE AS TRANSITION_TYPE,
PRECURSOR.DECOY AS PRECURSOR_DECOY,
TRANSITION.DECOY AS PRODUCT_DECOY
FROM sqlite_scan('{self.config.pqp_file}', 'PRECURSOR') as PRECURSOR
INNER JOIN sqlite_scan('{self.config.pqp_file}', 'PRECURSOR_PEPTIDE_MAPPING') as PRECURSOR_PEPTIDE_MAPPING
ON PRECURSOR.ID = PRECURSOR_PEPTIDE_MAPPING.PRECURSOR_ID
INNER JOIN sqlite_scan('{self.config.pqp_file}', 'PEPTIDE') as PEPTIDE
ON PRECURSOR_PEPTIDE_MAPPING.PEPTIDE_ID = PEPTIDE.ID
INNER JOIN sqlite_scan('{self.config.pqp_file}', 'TRANSITION_PRECURSOR_MAPPING') as TRANSITION_PRECURSOR_MAPPING
ON PRECURSOR.ID = TRANSITION_PRECURSOR_MAPPING.PRECURSOR_ID
INNER JOIN sqlite_scan('{self.config.pqp_file}', 'TRANSITION') as TRANSITION
ON TRANSITION_PRECURSOR_MAPPING.TRANSITION_ID = TRANSITION.ID
"""
def _cte_chrom_data(self) -> str:
"""CTE body for chrom_data: raw chromatogram rows from sqMass (xic)."""
return f"""
SELECT
CHROMATOGRAM.NATIVE_ID,
DATA.COMPRESSION,
DATA.DATA_TYPE,
DATA.DATA
FROM sqlite_scan('{self.config.infile}', 'CHROMATOGRAM') as CHROMATOGRAM
INNER JOIN sqlite_scan('{self.config.infile}', 'DATA') as DATA
ON DATA.CHROMATOGRAM_ID = CHROMATOGRAM.ID
"""

def _cte_prec_meta(self) -> str:
"""CTE body for precursor metadata: distinct precursor-level metadata."""
return """
SELECT DISTINCT
PRECURSOR_ID,
MODIFIED_SEQUENCE,
PRECURSOR_CHARGE,
PRECURSOR_DECOY
FROM pqp_data
"""

def _cte_chrom_prec_merged(self) -> str:
"""CTE body for chrom_prec_merged: precursor chromatograms with null transition fields."""
return """
SELECT
p.PRECURSOR_ID,
NULL AS TRANSITION_ID,
p.MODIFIED_SEQUENCE,
p.PRECURSOR_CHARGE,
NULL AS PRODUCT_CHARGE,
1 AS DETECTING_TRANSITION,
p.PRECURSOR_DECOY,
NULL AS PRODUCT_DECOY,
NULL AS TRANSITION_ORDINAL,
NULL AS TRANSITION_TYPE,
c.NATIVE_ID,
c.COMPRESSION,
c.DATA_TYPE,
c.DATA
FROM prec_meta p
INNER JOIN chrom_data c
ON p.PRECURSOR_ID = CAST(REGEXP_EXTRACT(c.NATIVE_ID, '(\\d+)_Precursor_i\\d+', 1) AS STRING)
WHERE REGEXP_MATCHES(c.NATIVE_ID, '_Precursor_i\\d+')
"""

def _cte_chrom_trans_merged(self) -> str:
"""CTE body for chrom_trans_merged: transition chromatograms by matching NATIVE_ID."""
return """
SELECT
p.PRECURSOR_ID,
p.TRANSITION_ID,
p.MODIFIED_SEQUENCE,
p.PRECURSOR_CHARGE,
p.PRODUCT_CHARGE,
p.DETECTING_TRANSITION,
p.PRECURSOR_DECOY,
p.PRODUCT_DECOY,
p.TRANSITION_ORDINAL,
p.TRANSITION_TYPE,
c.NATIVE_ID,
c.COMPRESSION,
c.DATA_TYPE,
c.DATA
FROM pqp_data p
INNER JOIN chrom_data c ON p.TRANSITION_ID = CAST(c.NATIVE_ID AS STRING)
WHERE NOT REGEXP_MATCHES(c.NATIVE_ID, '_Precursor_i\\d+')
"""

def _cte_chrom_combined(self) -> str:
"""CTE body for chrom_combined: union of precursor and transition chromatograms with metadata"""
return """
SELECT * FROM chrom_prec_merged
UNION ALL
SELECT * FROM chrom_trans_merged
"""

def _final_select_from_combined(self) -> str:
"""Final SELECT aggregating and ordering the chromatogram arrays."""
return """
SELECT
PRECURSOR_ID,
TRANSITION_ID,
MODIFIED_SEQUENCE,
PRECURSOR_CHARGE,
PRODUCT_CHARGE,
DETECTING_TRANSITION,
PRECURSOR_DECOY,
PRODUCT_DECOY,
TRANSITION_ORDINAL,
TRANSITION_TYPE,
NATIVE_ID,
MAX(CASE WHEN DATA_TYPE = 2 THEN DATA END) AS RT_DATA,
MAX(CASE WHEN DATA_TYPE = 1 THEN DATA END) AS INTENSITY_DATA,
MAX(CASE WHEN DATA_TYPE = 2 THEN COMPRESSION END) AS RT_COMPRESSION,
MAX(CASE WHEN DATA_TYPE = 1 THEN COMPRESSION END) AS INTENSITY_COMPRESSION
FROM chrom_combined
GROUP BY
PRECURSOR_ID,
TRANSITION_ID,
MODIFIED_SEQUENCE,
PRECURSOR_CHARGE,
PRODUCT_CHARGE,
DETECTING_TRANSITION,
PRECURSOR_DECOY,
PRODUCT_DECOY,
TRANSITION_ORDINAL,
TRANSITION_TYPE,
NATIVE_ID
ORDER BY PRECURSOR_ID,
CASE WHEN TRANSITION_ID IS NULL THEN 0 ELSE 1 END,
TRANSITION_ID
"""