diff --git a/.codacy.yml b/.codacy.yml index 5c642ace..b27cfb71 100644 --- a/.codacy.yml +++ b/.codacy.yml @@ -9,7 +9,7 @@ engines: - "qpx/core/http.py" pylint: options: - disable: "arguments-differ,unexpected-keyword-arg,no-value-for-parameter" + disable: "arguments-differ,unexpected-keyword-arg,no-value-for-parameter,line-too-long" markdownlint: options: MD013: false @@ -20,3 +20,4 @@ engines: exclude_paths: - "docs/spec/**" - ".github/workflows/**" + - "tests/**" diff --git a/.pylintrc b/.pylintrc index 28d751aa..77a4b662 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,4 +1,5 @@ [MESSAGES CONTROL] disable=arguments-differ, unexpected-keyword-arg, - no-value-for-parameter + no-value-for-parameter, + line-too-long diff --git a/qpx/converters/mztab.py b/qpx/converters/mztab.py index 400a0456..4f7ae0b7 100644 --- a/qpx/converters/mztab.py +++ b/qpx/converters/mztab.py @@ -8,11 +8,13 @@ from __future__ import annotations +import contextlib import gzip import logging import os import re import tempfile +import typing from pathlib import Path import duckdb @@ -28,6 +30,8 @@ _PROTEIN_LINE_PREFIX = "PRT" _PSM_HEADER_PREFIX = "PSH" _PSM_LINE_PREFIX = "PSM" +_PEPTIDE_HEADER_PREFIX = "PEH" +_PEPTIDE_LINE_PREFIX = "PEP" # Files larger than this threshold use the fast DuckDB-native path _FAST_LOAD_THRESHOLD_BYTES = 500 * 1024 * 1024 # 500 MB @@ -49,7 +53,7 @@ def load_mztab_sections( conn: duckdb.DuckDBPyConnection, mztab_path: str, ) -> None: - """Parse an mzTab file and load metadata, proteins, and PSMs into DuckDB. + """Parse an mzTab file and load metadata, proteins, peptides, and PSMs into DuckDB. For large files (>500 MB), uses a fast path that splits the mzTab into temporary section files and loads them via DuckDB's native ``read_csv``, @@ -58,11 +62,17 @@ def load_mztab_sections( After calling this function the connection will contain: * ``metadata`` -- two-column table (key TEXT, value TEXT) * ``proteins`` -- protein section with dynamic columns + * ``peptides`` -- peptide (PEP) section with dynamic columns; + created as a fallback empty table when the PEP section is absent * ``psms`` -- PSM section with dynamic columns - Args: - conn: An open DuckDB connection (in-memory or persistent). - mztab_path: Path to the mzTab file (plain-text or ``.gz``). + Parameters + ---------- + conn : duckdb.DuckDBPyConnection + An open DuckDB connection (in-memory or persistent). + mztab_path : str + Path to the mzTab file (plain-text or ``.gz``). + """ file_size = os.path.getsize(mztab_path) is_gz = str(mztab_path).endswith(".gz") @@ -74,137 +84,160 @@ def load_mztab_sections( _load_mztab_classic(conn, mztab_path) +# Section definitions: (header_prefix, data_prefix, table_name, dedup_col) +_MZTAB_SECTIONS = [ + (_PROTEIN_HEADER_PREFIX, _PROTEIN_LINE_PREFIX, "proteins", "accession"), + (_PSM_HEADER_PREFIX, _PSM_LINE_PREFIX, "psms", "sequence"), + (_PEPTIDE_HEADER_PREFIX, _PEPTIDE_LINE_PREFIX, "peptides", "sequence"), +] + + +def _build_prefix_dispatch() -> tuple[dict[str, str], dict[str, str]]: + """Build prefix→section_name lookup dicts from ``_MZTAB_SECTIONS``.""" + header_map = {h: name for h, _, name, _ in _MZTAB_SECTIONS} + data_map = {d: name for _, d, name, _ in _MZTAB_SECTIONS} + return header_map, data_map + + def _load_mztab_classic( conn: duckdb.DuckDBPyConnection, mztab_path: str, ) -> None: """Original in-memory mzTab loader (good for files < 500 MB).""" metadata_rows: list[tuple[str, str]] = [] - protein_header: list[str] | None = None - protein_rows: list[list[str]] = [] - psm_header: list[str] | None = None - psm_rows: list[list[str]] = [] + sections: dict[str, tuple[list[str] | None, list[list[str]]]] = {name: (None, []) for _, _, name, _ in _MZTAB_SECTIONS} + header_map, data_map = _build_prefix_dispatch() + + def on_metadata(parts: list[str]) -> None: + if len(parts) >= 3: + metadata_rows.append((parts[1], parts[2])) + + def on_header(name: str, parts: list[str]) -> None: + sections[name] = ([_clean_col(c) for c in parts[1:]], sections[name][1]) + + def on_data(name: str, parts: list[str]) -> None: + if sections[name][0] is not None: + sections[name][1].append(parts[1:]) with _open_mztab(mztab_path) as fh: for line in fh: line = line.rstrip("\n\r") if not line: continue - parts = line.split("\t") prefix = parts[0] if parts else "" + if prefix == _METADATA_PREFIX: + on_metadata(parts) + elif prefix in header_map: + on_header(header_map[prefix], parts) + elif prefix in data_map: + on_data(data_map[prefix], parts) - if prefix == _METADATA_PREFIX and len(parts) >= 3: - metadata_rows.append((parts[1], parts[2])) + _register_metadata(conn, metadata_rows) + for _, _, table_name, dedup_col in _MZTAB_SECTIONS: + header, rows = sections[table_name] + _register_section_df(conn, table_name, header, rows, dedup_col) + logger.info( + "mzTab loaded: %d metadata, %d proteins, %d peptides, %d PSMs", + len(metadata_rows), + len(sections["proteins"][1]), + len(sections["peptides"][1]), + len(sections["psms"][1]), + ) - elif prefix == _PROTEIN_HEADER_PREFIX: - protein_header = [_clean_col(c) for c in parts[1:]] - elif prefix == _PROTEIN_LINE_PREFIX and protein_header is not None: - protein_rows.append(parts[1:]) +def _cleanup_tmpdir(tmpdir: str) -> None: + """Remove temp section files and directory used by the fast loader.""" + for _, _, name, _ in _MZTAB_SECTIONS: + try: + os.unlink(os.path.join(tmpdir, f"{name}.tsv")) + except OSError: + pass + try: + os.rmdir(tmpdir) + except OSError: + pass - elif prefix == _PSM_HEADER_PREFIX: - psm_header = [_clean_col(c) for c in parts[1:]] - elif prefix == _PSM_LINE_PREFIX and psm_header is not None: - psm_rows.append(parts[1:]) +def _stream_mztab_to_files( + mztab_path: str, + files: dict[str, typing.IO[str]], + info: dict[str, list], +) -> list[tuple[str, str]]: + """Stream mzTab lines into per-section temp files, return metadata rows.""" + metadata_rows: list[tuple[str, str]] = [] + header_map, data_map = _build_prefix_dispatch() - _register_metadata(conn, metadata_rows) - _register_section_df(conn, "proteins", protein_header, protein_rows, "accession") - _register_section_df(conn, "psms", psm_header, psm_rows, "sequence") + def on_metadata(parts: list[str]) -> None: + if len(parts) >= 3: + metadata_rows.append((parts[1], parts[2])) - logger.info( - "mzTab loaded: %d metadata rows, %d proteins, %d PSMs", - len(metadata_rows), - len(protein_rows), - len(psm_rows), - ) + def on_header(name: str, parts: list[str]) -> None: + cleaned = "\t".join(_clean_col(c) for c in parts[1:]) + "\n" + files[name].write(cleaned) + info[name][1] = True + + with _open_mztab(mztab_path) as fh: + for line in fh: + line = line.rstrip("\n\r") + if not line: + continue + prefix = line[:3] + # Fast path: data lines are written directly without a full split. + # mzTab data prefixes (PRT, PSM, PEP) are always exactly 3 chars + # followed by a tab, so line[4:] is the payload — no need to + # split("\t") and re-join, which is the hot path for >500 MB files. + if prefix in data_map: + name = data_map[prefix] + if info[name][1]: + files[name].write(line[4:]) + files[name].write("\n") + info[name][2] += 1 + else: + # Headers and metadata need a full split for column cleaning + parts = line.split("\t") + pfx = parts[0][:3] if parts else "" + if pfx == _METADATA_PREFIX: + on_metadata(parts) + elif pfx in header_map: + on_header(header_map[pfx], parts) + + return metadata_rows def _load_mztab_fast( conn: duckdb.DuckDBPyConnection, mztab_path: str, ) -> None: - """Fast mzTab loader that splits sections to temp files and uses DuckDB read_csv. - - Single-pass over the mzTab: - - Metadata rows are small, accumulated in memory. - - PSM and protein rows are streamed to temporary TSV files. - - DuckDB reads the temp files natively (no Python/pandas overhead). - """ - metadata_rows: list[tuple[str, str]] = [] - psm_count = 0 - protein_count = 0 - + """Fast mzTab loader that splits sections to temp files and uses DuckDB read_csv.""" tmpdir = tempfile.mkdtemp(prefix="mztab_split_") - psm_tmp = os.path.join(tmpdir, "psms.tsv") - prot_tmp = os.path.join(tmpdir, "proteins.tsv") - - psm_header_line: str | None = None - prot_header_line: str | None = None + info: dict[str, list] = {} + for _, _, name, _ in _MZTAB_SECTIONS: + info[name] = [os.path.join(tmpdir, f"{name}.tsv"), False, 0] try: - with ( - _open_mztab(mztab_path) as fh, - open(psm_tmp, "w", encoding="utf-8") as psm_out, - open(prot_tmp, "w", encoding="utf-8") as prot_out, - ): - for line in fh: - line = line.rstrip("\n\r") - if not line: - continue - - # Check only the first 3 chars for speed - prefix = line[:3] - - if prefix == _METADATA_PREFIX: - parts = line.split("\t", 3) - if len(parts) >= 3: - metadata_rows.append((parts[1], parts[2])) - - elif prefix == _PSM_HEADER_PREFIX: - # Write cleaned header to PSM temp file - parts = line.split("\t") - cleaned = [_clean_col(c) for c in parts[1:]] - psm_header_line = "\t".join(cleaned) + "\n" - psm_out.write(psm_header_line) - - elif prefix == _PSM_LINE_PREFIX and psm_header_line is not None: - # Strip the "PSM\t" prefix and write the rest directly - psm_out.write(line[4:] + "\n") - psm_count += 1 - - elif prefix == _PROTEIN_HEADER_PREFIX: - parts = line.split("\t") - cleaned = [_clean_col(c) for c in parts[1:]] - prot_header_line = "\t".join(cleaned) + "\n" - prot_out.write(prot_header_line) - - elif prefix == _PROTEIN_LINE_PREFIX and prot_header_line is not None: - prot_out.write(line[4:] + "\n") - protein_count += 1 - + with contextlib.ExitStack() as stack: + files = { + name: stack.enter_context( + open(vals[0], "w", encoding="utf-8"), + ) + for name, vals in info.items() + } + metadata_rows = _stream_mztab_to_files(mztab_path, files, info) + # ExitStack closes all file handles here before DuckDB reads _register_metadata(conn, metadata_rows) - _register_section_csv(conn, "proteins", prot_tmp, prot_header_line is not None, protein_count, "accession") - _register_section_csv(conn, "psms", psm_tmp, psm_header_line is not None, psm_count, "sequence") - + for _, _, table_name, dedup_col in _MZTAB_SECTIONS: + tmp_path, has_header, count = info[table_name] + _register_section_csv(conn, table_name, tmp_path, has_header, count, dedup_col) logger.info( - "mzTab loaded (fast): %d metadata rows, %d proteins, %d PSMs", + "mzTab loaded (fast): %d metadata, %d proteins, %d peptides, %d PSMs", len(metadata_rows), - protein_count, - psm_count, + info["proteins"][2], + info["peptides"][2], + info["psms"][2], ) finally: - # Clean up temp files - for f in (psm_tmp, prot_tmp): - try: - os.unlink(f) - except OSError: - pass - try: - os.rmdir(tmpdir) - except OSError: - pass + _cleanup_tmpdir(tmpdir) def load_msstats( diff --git a/qpx/converters/quantms/feature_adapter.py b/qpx/converters/quantms/feature_adapter.py index 3bdabcd9..4e17a7c9 100644 --- a/qpx/converters/quantms/feature_adapter.py +++ b/qpx/converters/quantms/feature_adapter.py @@ -59,6 +59,11 @@ class QuantmsFeatureAdapter(BaseConverter): ) """ + def __init__(self, **kwargs): + """Initialize adapter with an empty peptide-protein map.""" + super().__init__(**kwargs) + self._pep_protein_map: dict[str, str] = {} + def convert( self, mztab_path: str, @@ -83,6 +88,7 @@ def convert( enzyme_name: Enzyme name for computing missed cleavages. """ self._enzyme_name = enzyme_name + self._pep_protein_map: dict[str, str] = {} # Step 1: Load data into DuckDB (skip if already loaded) if not self._table_exists("psms"): load_mztab_sections(self._conn, mztab_path) @@ -173,6 +179,9 @@ def _convert_lfq_fast( # Step 3: Build ProForma lookup (distinct peptidoforms only) self._load_proforma_lookup(pf_col) + # Step 3b: Build peptide → protein lookup from mzTab PEP section + self._pep_protein_map = self._build_peptide_protein_map() + # Step 4: Build the big JOIN query # NOTE: Column names (pf_col, ref_col, etc.) come from the internal # _FEATURE_MAP dict resolved against actual DuckDB column names — they @@ -516,6 +525,56 @@ def _load_proforma_lookup(self, pf_col: str) -> None: """) self.logger.info("ProForma lookup table: %d entries", len(records)) + def _build_peptide_protein_map(self) -> dict[str, str]: + """Build peptide sequence → single protein accession map from mzTab PEP section. + + The mzTab PEP section contains the protein inference result: each + peptide is assigned to exactly one protein accession (including razor + peptide resolution). This map is used in ``_rows_to_feature_records`` + to resolve protein groups (``A;B``) to the correct single accession. + + Only unambiguous peptides (single protein) are included. + + Returns + ------- + dict[str, str] + Dict mapping plain sequence (uppercase, letters only) to single + protein accession. + + """ + if not self._table_exists("peptides"): + self.logger.info("No mzTab peptides table — skipping peptide protein map") + return {} + pep_cols = { + c[0] + for c in self._conn.execute( + "SELECT column_name FROM information_schema.columns WHERE table_name='peptides'" + ).fetchall() + } + if "sequence" not in pep_cols or "accession" not in pep_cols: + self.logger.info("Peptides table lacks sequence/accession — skipping") + return {} + rows = self._conn.execute(""" + WITH pep_resolved AS ( + SELECT + regexp_replace(upper(CAST(sequence AS VARCHAR)), '[^A-Z]', '', 'g') + AS pep_sequence, + CAST(accession AS VARCHAR) AS pep_accession + FROM peptides + WHERE accession IS NOT NULL + AND CAST(accession AS VARCHAR) != 'null' + AND sequence IS NOT NULL + ) + SELECT pep_sequence, MIN(pep_accession) AS pep_accession + FROM pep_resolved + GROUP BY pep_sequence + HAVING COUNT(DISTINCT pep_accession) = 1 + """).fetchall() + + pep_map = dict(rows) + self.logger.info("Peptide-protein map: %d entries (unambiguous)", len(pep_map)) + return pep_map + def _rows_to_feature_records(self, rows: list[tuple]) -> list[dict]: """Convert pre-joined SQL rows to feature record dicts. @@ -530,6 +589,7 @@ def _rows_to_feature_records(self, rows: list[tuple]) -> list[dict]: _enzyme = self._enzyme_name _count_mc = count_missed_cleavages _json_loads = json.loads + _pep_map = getattr(self, "_pep_protein_map", {}) for row in rows: try: @@ -557,8 +617,11 @@ def _rows_to_feature_records(self, rows: list[tuple]) -> list[dict]: # Mass error mass_error_ppm = 1e6 * (obs_mz - calc_mz) / calc_mz if calc_mz and obs_mz else None - # Protein accessions + # Protein accessions — resolve protein groups via PEP section acc_list = protein_name.split(";") if protein_name else [] + if len(acc_list) > 1 and sequence in _pep_map: + resolved = _pep_map[sequence] + acc_list = [resolved] anchor_protein = acc_list[0] if acc_list else "" # Protein q-value (from SQL JOIN) diff --git a/tests/converters/test_pep_protein_resolution.py b/tests/converters/test_pep_protein_resolution.py new file mode 100644 index 00000000..fb22f68a --- /dev/null +++ b/tests/converters/test_pep_protein_resolution.py @@ -0,0 +1,136 @@ +"""Unit tests for PEP-section peptide→protein resolution logic. + +Tests cover: +- ``_build_peptide_protein_map()`` with a real peptides table +- ``_build_peptide_protein_map()`` with a fallback (empty / missing columns) +- protein-group resolution in ``_rows_to_feature_records()`` +""" + +import duckdb +import pytest + +from qpx.converters.quantms.feature_adapter import QuantmsFeatureAdapter + + +@pytest.fixture() +def adapter(): + """Create an adapter with an in-memory DuckDB connection.""" + conn = duckdb.connect(":memory:") + a = QuantmsFeatureAdapter(conn=conn) + return a + + +class TestBuildPeptideProteinMap: + """Tests for _build_peptide_protein_map().""" + + def test_unambiguous_mapping(self, adapter): + """Peptides mapped to a single protein return a correct dict.""" + adapter._conn.execute(""" + CREATE TABLE peptides (sequence VARCHAR, accession VARCHAR) + """) + adapter._conn.execute(""" + INSERT INTO peptides VALUES + ('PEPTIDEK', 'P12345'), + ('ANOTHERK', 'P67890'), + ('PEPTIDEK', 'P12345') + """) + result = adapter._build_peptide_protein_map() + assert result == {"PEPTIDEK": "P12345", "ANOTHERK": "P67890"} + + def test_shared_peptide_excluded(self, adapter): + """Peptides mapping to multiple proteins are excluded.""" + adapter._conn.execute(""" + CREATE TABLE peptides (sequence VARCHAR, accession VARCHAR) + """) + adapter._conn.execute(""" + INSERT INTO peptides VALUES + ('SHAREDK', 'P11111'), + ('SHAREDK', 'P22222'), + ('UNIQUEK', 'P33333') + """) + result = adapter._build_peptide_protein_map() + assert "SHAREDK" not in result + assert result == {"UNIQUEK": "P33333"} + + def test_no_peptides_table(self, adapter): + """Returns empty dict when peptides table does not exist.""" + result = adapter._build_peptide_protein_map() + assert result == {} + + def test_fallback_table_missing_columns(self, adapter): + """Returns empty dict when peptides table lacks sequence/accession.""" + adapter._conn.execute("CREATE TABLE peptides (id INTEGER)") + result = adapter._build_peptide_protein_map() + assert result == {} + + +class TestProteinGroupResolution: + """Tests for protein-group resolution in _rows_to_feature_records().""" + + def _make_row(self, protein_name="P11;P22", sequence="TESTPEPTIDE"): + """Build a minimal row tuple matching the SQL query column order. + + Column order: + 0: peptidoform, 1: sequence, 2: run_file_name, 3: charge, + 4: intensity, 5: msstats_rt, 6: protein_name, + 7-13: PSM fields, 14: pg_global_qvalue, 15: gene_name, + 16: modifications_json + """ + return ( + sequence, # 0: peptidoform + sequence, # 1: sequence + "run1", # 2: run_file_name + 2, # 3: charge + 1000.0, # 4: intensity + 12.5, # 5: msstats_rt + protein_name, # 6: protein_name + None, # 7: psm_pep + 500.0, # 8: psm_calc_mz + 500.1, # 9: psm_obs_mz + False, # 10: psm_is_decoy + None, # 11: psm_scan + None, # 12: psm_id_run + None, # 13: psm_rt + 0.01, # 14: pg_global_qvalue + "GN1", # 15: gene_name + None, # 16: modifications_json + ) + + def test_resolves_ambiguous_group(self, adapter): + """Multi-accession protein_name 'A;B' is resolved to 'A' via PEP map.""" + adapter._pep_protein_map = {"TESTPEPTIDE": "P11"} + adapter._enzyme_name = None + + rows = [self._make_row(protein_name="P11;P22", sequence="TESTPEPTIDE")] + records = adapter._rows_to_feature_records(rows) + + assert len(records) == 1 + accessions = [a["accession"] for a in records[0]["pg_accessions"]] + assert accessions == ["P11"] + assert records[0]["anchor_protein"] == "P11" + assert records[0]["unique"] is True + + def test_keeps_single_accession(self, adapter): + """Single-accession protein_name is unchanged.""" + adapter._pep_protein_map = {"TESTPEPTIDE": "P11"} + adapter._enzyme_name = None + + rows = [self._make_row(protein_name="P11", sequence="TESTPEPTIDE")] + records = adapter._rows_to_feature_records(rows) + + assert len(records) == 1 + accessions = [a["accession"] for a in records[0]["pg_accessions"]] + assert accessions == ["P11"] + + def test_no_map_keeps_group(self, adapter): + """Without PEP map, multi-accession group is preserved.""" + adapter._pep_protein_map = {} + adapter._enzyme_name = None + + rows = [self._make_row(protein_name="P11;P22", sequence="TESTPEPTIDE")] + records = adapter._rows_to_feature_records(rows) + + assert len(records) == 1 + accessions = [a["accession"] for a in records[0]["pg_accessions"]] + assert accessions == ["P11", "P22"] + assert records[0]["unique"] is False