Skip to content
3 changes: 2 additions & 1 deletion .codacy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ engines:
- "qpx/core/http.py"
pylint:
options:
disable: "arguments-differ,unexpected-keyword-arg,no-value-for-parameter"
disable: "arguments-differ,unexpected-keyword-arg,no-value-for-parameter,line-too-long"
markdownlint:
options:
MD013: false
Expand All @@ -20,3 +20,4 @@ engines:
exclude_paths:
- "docs/spec/**"
- ".github/workflows/**"
- "tests/**"
3 changes: 2 additions & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[MESSAGES CONTROL]
disable=arguments-differ,
unexpected-keyword-arg,
no-value-for-parameter
no-value-for-parameter,
line-too-long
233 changes: 133 additions & 100 deletions qpx/converters/mztab.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

from __future__ import annotations

import contextlib
import gzip
import logging
import os
import re
import tempfile
import typing
from pathlib import Path

import duckdb
Expand All @@ -28,6 +30,8 @@
_PROTEIN_LINE_PREFIX = "PRT"
_PSM_HEADER_PREFIX = "PSH"
_PSM_LINE_PREFIX = "PSM"
_PEPTIDE_HEADER_PREFIX = "PEH"
_PEPTIDE_LINE_PREFIX = "PEP"

# Files larger than this threshold use the fast DuckDB-native path
_FAST_LOAD_THRESHOLD_BYTES = 500 * 1024 * 1024 # 500 MB
Expand All @@ -49,7 +53,7 @@
conn: duckdb.DuckDBPyConnection,
mztab_path: str,
) -> None:
"""Parse an mzTab file and load metadata, proteins, and PSMs into DuckDB.
"""Parse an mzTab file and load metadata, proteins, peptides, and PSMs into DuckDB.

Check notice on line 56 in qpx/converters/mztab.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

qpx/converters/mztab.py#L56

Multi-line docstring summary should start at the second line (D213)

For large files (>500 MB), uses a fast path that splits the mzTab into
temporary section files and loads them via DuckDB's native ``read_csv``,
Expand All @@ -58,11 +62,17 @@
After calling this function the connection will contain:
* ``metadata`` -- two-column table (key TEXT, value TEXT)
* ``proteins`` -- protein section with dynamic columns
* ``peptides`` -- peptide (PEP) section with dynamic columns;
created as a fallback empty table when the PEP section is absent
* ``psms`` -- PSM section with dynamic columns

Args:
conn: An open DuckDB connection (in-memory or persistent).
mztab_path: Path to the mzTab file (plain-text or ``.gz``).
Parameters
----------
conn : duckdb.DuckDBPyConnection
An open DuckDB connection (in-memory or persistent).
mztab_path : str
Path to the mzTab file (plain-text or ``.gz``).

"""
file_size = os.path.getsize(mztab_path)
is_gz = str(mztab_path).endswith(".gz")
Expand All @@ -74,137 +84,160 @@
_load_mztab_classic(conn, mztab_path)


# Section definitions: (header_prefix, data_prefix, table_name, dedup_col)
_MZTAB_SECTIONS = [
(_PROTEIN_HEADER_PREFIX, _PROTEIN_LINE_PREFIX, "proteins", "accession"),
(_PSM_HEADER_PREFIX, _PSM_LINE_PREFIX, "psms", "sequence"),
(_PEPTIDE_HEADER_PREFIX, _PEPTIDE_LINE_PREFIX, "peptides", "sequence"),
]


def _build_prefix_dispatch() -> tuple[dict[str, str], dict[str, str]]:
"""Build prefix→section_name lookup dicts from ``_MZTAB_SECTIONS``."""
header_map = {h: name for h, _, name, _ in _MZTAB_SECTIONS}
data_map = {d: name for _, d, name, _ in _MZTAB_SECTIONS}
return header_map, data_map


def _load_mztab_classic(
conn: duckdb.DuckDBPyConnection,
mztab_path: str,
) -> None:
"""Original in-memory mzTab loader (good for files < 500 MB)."""
metadata_rows: list[tuple[str, str]] = []
protein_header: list[str] | None = None
protein_rows: list[list[str]] = []
psm_header: list[str] | None = None
psm_rows: list[list[str]] = []
sections: dict[str, tuple[list[str] | None, list[list[str]]]] = {name: (None, []) for _, _, name, _ in _MZTAB_SECTIONS}
header_map, data_map = _build_prefix_dispatch()

def on_metadata(parts: list[str]) -> None:
if len(parts) >= 3:
metadata_rows.append((parts[1], parts[2]))

def on_header(name: str, parts: list[str]) -> None:
sections[name] = ([_clean_col(c) for c in parts[1:]], sections[name][1])

def on_data(name: str, parts: list[str]) -> None:
if sections[name][0] is not None:
sections[name][1].append(parts[1:])

with _open_mztab(mztab_path) as fh:
for line in fh:
line = line.rstrip("\n\r")
if not line:
continue

parts = line.split("\t")
prefix = parts[0] if parts else ""
if prefix == _METADATA_PREFIX:
on_metadata(parts)
elif prefix in header_map:
on_header(header_map[prefix], parts)
elif prefix in data_map:
on_data(data_map[prefix], parts)

if prefix == _METADATA_PREFIX and len(parts) >= 3:
metadata_rows.append((parts[1], parts[2]))
_register_metadata(conn, metadata_rows)
for _, _, table_name, dedup_col in _MZTAB_SECTIONS:
header, rows = sections[table_name]
_register_section_df(conn, table_name, header, rows, dedup_col)
logger.info(
"mzTab loaded: %d metadata, %d proteins, %d peptides, %d PSMs",
len(metadata_rows),
len(sections["proteins"][1]),
len(sections["peptides"][1]),
len(sections["psms"][1]),
)

elif prefix == _PROTEIN_HEADER_PREFIX:
protein_header = [_clean_col(c) for c in parts[1:]]

elif prefix == _PROTEIN_LINE_PREFIX and protein_header is not None:
protein_rows.append(parts[1:])
def _cleanup_tmpdir(tmpdir: str) -> None:
"""Remove temp section files and directory used by the fast loader."""
for _, _, name, _ in _MZTAB_SECTIONS:
try:
os.unlink(os.path.join(tmpdir, f"{name}.tsv"))
except OSError:
pass
try:
os.rmdir(tmpdir)
except OSError:
pass

elif prefix == _PSM_HEADER_PREFIX:
psm_header = [_clean_col(c) for c in parts[1:]]

elif prefix == _PSM_LINE_PREFIX and psm_header is not None:
psm_rows.append(parts[1:])
def _stream_mztab_to_files(
mztab_path: str,
files: dict[str, typing.IO[str]],
info: dict[str, list],
) -> list[tuple[str, str]]:
"""Stream mzTab lines into per-section temp files, return metadata rows."""
metadata_rows: list[tuple[str, str]] = []
header_map, data_map = _build_prefix_dispatch()

_register_metadata(conn, metadata_rows)
_register_section_df(conn, "proteins", protein_header, protein_rows, "accession")
_register_section_df(conn, "psms", psm_header, psm_rows, "sequence")
def on_metadata(parts: list[str]) -> None:
if len(parts) >= 3:
metadata_rows.append((parts[1], parts[2]))

logger.info(
"mzTab loaded: %d metadata rows, %d proteins, %d PSMs",
len(metadata_rows),
len(protein_rows),
len(psm_rows),
)
def on_header(name: str, parts: list[str]) -> None:
cleaned = "\t".join(_clean_col(c) for c in parts[1:]) + "\n"
files[name].write(cleaned)
info[name][1] = True

with _open_mztab(mztab_path) as fh:
for line in fh:
line = line.rstrip("\n\r")
if not line:
continue
prefix = line[:3]
# Fast path: data lines are written directly without a full split.
# mzTab data prefixes (PRT, PSM, PEP) are always exactly 3 chars
# followed by a tab, so line[4:] is the payload — no need to
# split("\t") and re-join, which is the hot path for >500 MB files.
if prefix in data_map:
name = data_map[prefix]
if info[name][1]:
files[name].write(line[4:])
files[name].write("\n")
info[name][2] += 1
else:
# Headers and metadata need a full split for column cleaning
parts = line.split("\t")
pfx = parts[0][:3] if parts else ""
if pfx == _METADATA_PREFIX:
on_metadata(parts)
elif pfx in header_map:
on_header(header_map[pfx], parts)

return metadata_rows


def _load_mztab_fast(
conn: duckdb.DuckDBPyConnection,
mztab_path: str,
) -> None:
"""Fast mzTab loader that splits sections to temp files and uses DuckDB read_csv.

Single-pass over the mzTab:
- Metadata rows are small, accumulated in memory.
- PSM and protein rows are streamed to temporary TSV files.
- DuckDB reads the temp files natively (no Python/pandas overhead).
"""
metadata_rows: list[tuple[str, str]] = []
psm_count = 0
protein_count = 0

"""Fast mzTab loader that splits sections to temp files and uses DuckDB read_csv."""
tmpdir = tempfile.mkdtemp(prefix="mztab_split_")
psm_tmp = os.path.join(tmpdir, "psms.tsv")
prot_tmp = os.path.join(tmpdir, "proteins.tsv")

psm_header_line: str | None = None
prot_header_line: str | None = None
info: dict[str, list] = {}
for _, _, name, _ in _MZTAB_SECTIONS:
info[name] = [os.path.join(tmpdir, f"{name}.tsv"), False, 0]

try:
with (
_open_mztab(mztab_path) as fh,
open(psm_tmp, "w", encoding="utf-8") as psm_out,
open(prot_tmp, "w", encoding="utf-8") as prot_out,
):
for line in fh:
line = line.rstrip("\n\r")
if not line:
continue

# Check only the first 3 chars for speed
prefix = line[:3]

if prefix == _METADATA_PREFIX:
parts = line.split("\t", 3)
if len(parts) >= 3:
metadata_rows.append((parts[1], parts[2]))

elif prefix == _PSM_HEADER_PREFIX:
# Write cleaned header to PSM temp file
parts = line.split("\t")
cleaned = [_clean_col(c) for c in parts[1:]]
psm_header_line = "\t".join(cleaned) + "\n"
psm_out.write(psm_header_line)

elif prefix == _PSM_LINE_PREFIX and psm_header_line is not None:
# Strip the "PSM\t" prefix and write the rest directly
psm_out.write(line[4:] + "\n")
psm_count += 1

elif prefix == _PROTEIN_HEADER_PREFIX:
parts = line.split("\t")
cleaned = [_clean_col(c) for c in parts[1:]]
prot_header_line = "\t".join(cleaned) + "\n"
prot_out.write(prot_header_line)

elif prefix == _PROTEIN_LINE_PREFIX and prot_header_line is not None:
prot_out.write(line[4:] + "\n")
protein_count += 1

with contextlib.ExitStack() as stack:
files = {
name: stack.enter_context(
open(vals[0], "w", encoding="utf-8"),
)
for name, vals in info.items()
}
metadata_rows = _stream_mztab_to_files(mztab_path, files, info)
# ExitStack closes all file handles here before DuckDB reads
_register_metadata(conn, metadata_rows)
_register_section_csv(conn, "proteins", prot_tmp, prot_header_line is not None, protein_count, "accession")
_register_section_csv(conn, "psms", psm_tmp, psm_header_line is not None, psm_count, "sequence")

for _, _, table_name, dedup_col in _MZTAB_SECTIONS:
tmp_path, has_header, count = info[table_name]
_register_section_csv(conn, table_name, tmp_path, has_header, count, dedup_col)
logger.info(
"mzTab loaded (fast): %d metadata rows, %d proteins, %d PSMs",
"mzTab loaded (fast): %d metadata, %d proteins, %d peptides, %d PSMs",
len(metadata_rows),
protein_count,
psm_count,
info["proteins"][2],
info["peptides"][2],
info["psms"][2],
)
finally:
# Clean up temp files
for f in (psm_tmp, prot_tmp):
try:
os.unlink(f)
except OSError:
pass
try:
os.rmdir(tmpdir)
except OSError:
pass
_cleanup_tmpdir(tmpdir)


def load_msstats(
Expand Down
Loading
Loading