diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..c7d9aebd --- /dev/null +++ b/.dockerignore @@ -0,0 +1,36 @@ +# Exclude non-essential content from the Docker build context. +# Keep only what ``pip install .`` needs: pyproject.toml, README.md, LICENSE, qpx/. + +.git/ +.github/ +.pytest_cache/ +.hypothesis/ +.ruff_cache/ +.tmp/ +.vscode/ +.windsurf/ +.idea/ +__pycache__/ +*.pyc +*.pyo +*.egg-info/ +build/ +dist/ +.coverage +coverage.xml +.DS_Store +.venv/ +venv/ +env/ + +# Project-specific large/non-essential dirs +tests/ +docs/ +site/ +benchmarks/ +scripts/ +data/ + +# Docker-related files are not needed inside the image +Dockerfile +.dockerignore diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml new file mode 100644 index 00000000..cef5681e --- /dev/null +++ b/.github/workflows/docker-publish.yml @@ -0,0 +1,88 @@ +# Build and publish the QPX container image to GitHub Container Registry (GHCR). +# +# Triggers: +# - push to main -> publishes the `dev` tag (floating) +# - push tag v*.*.* -> publishes semantic version tags + `latest` +# - pull_request to main -> build only (smoke-test the Dockerfile) +# - workflow_dispatch -> manual run +# +# Image: ghcr.io/${owner}/qpx (expands to ghcr.io/bigbio/qpx) +# Platform: linux/amd64 (multi-arch can be added later) +# +# Repo settings required: +# Settings -> Actions -> General -> Workflow permissions -> "Read and write permissions" +# (the default GITHUB_TOKEN is used to authenticate against GHCR) + +name: Publish Docker image + +on: + push: + branches: [main] + tags: ["v*.*.*"] + pull_request: + branches: [main] + workflow_dispatch: + +permissions: + contents: read + packages: write + id-token: write + +jobs: + docker: + name: Build and push to GHCR + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + fetch-depth: 0 + fetch-tags: true + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to GHCR + if: github.event_name != 'pull_request' + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Resolve QPX version for build-arg + id: ver + run: | + if [[ "${GITHUB_REF}" == refs/tags/v* ]]; then + echo "qpx_version=${GITHUB_REF#refs/tags/v}" >> "$GITHUB_OUTPUT" + else + echo "qpx_version=0.0.0.dev0+$(git rev-parse --short HEAD)" >> "$GITHUB_OUTPUT" + fi + + - name: Extract image metadata + id: meta + uses: docker/metadata-action@v5 + with: + images: ghcr.io/${{ github.repository_owner }}/qpx + tags: | + type=semver,pattern={{version}} + type=semver,pattern={{major}}.{{minor}} + type=ref,event=pr + type=raw,value=dev,enable=${{ github.ref == 'refs/heads/main' && github.event_name == 'push' }} + type=raw,value=latest,enable=${{ startsWith(github.ref, 'refs/tags/v') }} + type=sha,format=short + + - name: Build and push + uses: docker/build-push-action@v6 + with: + context: . + platforms: linux/amd64 + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + build-args: | + QPX_VERSION=${{ steps.ver.outputs.qpx_version }} + provenance: true + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..b70e6de1 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,39 @@ +# syntax=docker/dockerfile:1.6 +# +# QPX container image +# ------------------- +# Provides the ``qpxc`` CLI built from this repository's source, with the +# optional ``[mudata]`` extras pre-installed for MuData export. +# +# Build: +# docker build -t ghcr.io/bigbio/qpx:dev . +# +# Run: +# docker run --rm -v $(pwd):/data ghcr.io/bigbio/qpx:dev convert diann --help + +FROM python:3.11-slim-bookworm + +# Runtime system deps required by pyOpenMS (see environment.yml). +RUN apt-get update \ + && apt-get install -y --no-install-recommends libglib2.0-0=2.74.6-2+deb12u8 procps=2:4.0.2-3 \ + && rm -rf /var/lib/apt/lists/* + +# hatch-vcs derives the version from git history; when the build context lacks +# a .git directory (typical Docker builds), fall back to this placeholder. +ARG QPX_VERSION=0.0.0.dev0 +ENV SETUPTOOLS_SCM_PRETEND_VERSION=${QPX_VERSION} + +# Minimal build inputs required by ``pip install .`` under hatchling. +WORKDIR /src +COPY pyproject.toml README.md LICENSE ./ +COPY qpx ./qpx + +RUN pip install --no-cache-dir --upgrade pip==24.0 \ + && pip install --no-cache-dir ".[mudata]" \ + && rm -rf /src + +LABEL org.opencontainers.image.source="https://github.com/bigbio/qpx" +LABEL org.opencontainers.image.description="QPX: Quantitative Proteomics Parquet toolkit (qpxc CLI) with MuData export" +LABEL org.opencontainers.image.licenses="Apache-2.0" + +WORKDIR /data diff --git a/pyproject.toml b/pyproject.toml index 955f23c8..210e7533 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,13 +28,15 @@ mzidentml = ["lxml>=4.9.0"] transforms = ["biopython", "mygene>=1.0.0", "anndata>=0.9.0"] plotting = ["plotly>=5.0.0", "scikit-learn>=1.5.0"] quantify = ["mokume>=0.1.0", "directlfq"] -all = ["lxml>=4.9.0", "biopython", "mygene>=1.0.0", "anndata>=0.9.0", "plotly>=5.0.0", "scikit-learn>=1.5.0", "mokume>=0.1.0", "directlfq"] +mudata = ["mudata>=0.2.4", "anndata>=0.9.0"] +all = ["lxml>=4.9.0", "biopython", "mygene>=1.0.0", "anndata>=0.9.0", "plotly>=5.0.0", "scikit-learn>=1.5.0", "mokume>=0.1.0", "directlfq", "mudata>=0.2.4"] dev = [ "pytest", "pytest-timeout", "hypothesis", "lxml>=4.9.0", "anndata", + "mudata>=0.2.4", "scikit-learn", "plotly>=5.0.0", "pre-commit", diff --git a/qpx/cli/convert.py b/qpx/cli/convert.py index b3801426..eb544022 100644 --- a/qpx/cli/convert.py +++ b/qpx/cli/convert.py @@ -9,6 +9,7 @@ qpxc convert maxquant — MaxQuant output to QPX qpxc convert fragpipe — FragPipe output to QPX qpxc convert mzidentml — mzIdentML (incl. XL-MS 1.3) to QPX + qpxc convert openms — OpenMS -out_qpx enrichment to full QPX qpxc convert sdrf — SDRF to sample.parquet + run.parquet """ @@ -20,6 +21,8 @@ import click +from qpx.converters.openms import OpenMSConverter + logger = logging.getLogger("qpx.cli.convert") @@ -263,6 +266,11 @@ def convert_quantms_cmd( show_default=True, help="Parquet compression codec.", ) +@click.option( + "--diann-log", + help="DIA-NN summary log file (version auto-detected from first line)", + type=click.Path(exists=True, dir_okay=False, path_type=Path), +) @click.option("--verbose", help="Enable verbose logging", is_flag=True) def convert_diann_cmd( report_path: Path, @@ -281,6 +289,7 @@ def convert_diann_cmd( project_accession: Optional[str], enrich_pride: bool, compression: str, + diann_log: Optional[Path], verbose: bool, ): """Convert DIA-NN report to QPX format. @@ -321,6 +330,7 @@ def convert_diann_cmd( duckdb_max_memory=duckdb_max_memory, duckdb_threads=duckdb_threads, compression=compression, + diann_log=str(diann_log) if diann_log else None, ) converter.convert_features( mzml_info_folder=mzml_info_folder, @@ -779,6 +789,103 @@ def convert_mzidentml_cmd( click.echo(f"mzIdentML conversion complete. Output: {output_folder}") +# --------------------------------------------------------------------------- +# OpenMS +# --------------------------------------------------------------------------- + + +@convert.command("openms") +@click.option( + "--qpx-dir", + help="Directory containing OpenMS -out_qpx parquet files (*.psm.parquet, *.feature.parquet, *.pg.parquet)", + required=True, + type=click.Path(exists=True, file_okay=False, path_type=Path), +) +@click.option( + "--sdrf-file", + help="SDRF metadata file path (for sample/run generation)", + required=True, + type=click.Path(exists=True, dir_okay=False, path_type=Path), +) +@click.option( + "--output-folder", + help="Output directory for the full QPX dataset", + required=True, + type=click.Path(file_okay=False, path_type=Path), +) +@click.option( + "--output-prefix", + help="Prefix for output file names", + default="openms", +) +@click.option( + "--project-accession", + help="PRIDE / ProteomeXchange accession (e.g. PXD001819)", +) +@click.option( + "--enrich-pride", + help="Fetch project metadata from PRIDE API after conversion", + is_flag=True, + default=False, +) +@click.option( + "--compression", + type=click.Choice(["zstd", "snappy", "gzip", "none"], case_sensitive=False), + default="zstd", + show_default=True, + help="Parquet compression codec.", +) +@click.option("--verbose", help="Enable verbose logging", is_flag=True) +def convert_openms_cmd(**kwargs): + r"""Enrich OpenMS ProteomicsLFQ -out_qpx output into a full QPX dataset. + + Validates the existing psm/feature/pg parquet files, copies them to the + output folder, and generates the missing metadata tables (run, sample, + ontology, provenance, dataset) from the SDRF file. + + \b + Examples: + # Enrich OpenMS QPX output + qpxc convert openms \\ + --qpx-dir ./openms_qpx_output \\ + --sdrf-file metadata.sdrf.tsv \\ + --output-folder ./qpx_full + + # With project accession + qpxc convert openms \\ + --qpx-dir ./openms_qpx_output \\ + --sdrf-file metadata.sdrf.tsv \\ + --output-folder ./qpx_full \\ + --project-accession PXD001819 + """ + qpx_dir = kwargs["qpx_dir"] + sdrf_file = kwargs["sdrf_file"] + output_folder = kwargs["output_folder"] + output_prefix = kwargs["output_prefix"] + project_accession = kwargs["project_accession"] + enrich_pride = kwargs["enrich_pride"] + compression = kwargs["compression"] + verbose = kwargs["verbose"] + + if verbose: + logging.getLogger().setLevel(logging.DEBUG) + + converter = OpenMSConverter( + qpx_dir=qpx_dir, + sdrf_path=sdrf_file, + compression=compression, + ) + converter.convert( + output_folder=output_folder, + output_prefix=output_prefix, + project_accession=project_accession, + ) + + _maybe_enrich_pride(output_folder, project_accession, enrich_pride) + + click.echo(f"OpenMS QPX enrichment complete. Output: {output_folder}") + + # --------------------------------------------------------------------------- # SDRF # --------------------------------------------------------------------------- diff --git a/qpx/converters/__init__.py b/qpx/converters/__init__.py index ebcc8489..4cbc8abc 100644 --- a/qpx/converters/__init__.py +++ b/qpx/converters/__init__.py @@ -20,6 +20,7 @@ from qpx.converters.maxquant.pg_adapter import MaxQuantPgAdapter from qpx.converters.maxquant.psm_adapter import MaxQuantPsmAdapter from qpx.converters.mzidentml.psm_adapter import MzIdentMLPsmAdapter +from qpx.converters.openms.converter import OpenMSConverter from qpx.converters.orchestrator import BaseOrchestrator, build_dataset_record from qpx.converters.quantms.converter import QuantMSConverter from qpx.converters.quantms.feature_adapter import QuantmsFeatureAdapter @@ -55,4 +56,6 @@ "FragPipeConverter", # mzIdentML "MzIdentMLPsmAdapter", + # OpenMS + "OpenMSConverter", ] diff --git a/qpx/converters/diann/converter.py b/qpx/converters/diann/converter.py index 14e27d38..ff72ba86 100644 --- a/qpx/converters/diann/converter.py +++ b/qpx/converters/diann/converter.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +import re from pathlib import Path from qpx._version import __version__ @@ -20,6 +21,8 @@ class DiaNNConverter(BaseOrchestrator): """Orchestrate full DIA-NN conversion to QPX format.""" + _DIANN_VERSION_RE = re.compile(r"DIA-NN\s+([\d.]+)") + def __init__( self, report_path, @@ -27,12 +30,14 @@ def __init__( duckdb_max_memory=None, duckdb_threads=None, compression: str = "zstd", + diann_log: str | None = None, ): self.report_path = str(report_path) self.sdrf_path = str(sdrf_path) if sdrf_path else None self._memory = duckdb_max_memory or "16GB" self._threads = duckdb_threads or 4 self._compression = compression + self._diann_version = self._parse_diann_version(diann_log) self._ontology_entries: list[dict] = [] self._resolved_mappings_by_view: dict[str, dict] = {} @@ -141,7 +146,7 @@ def write_provenance(self, output_folder: str | Path, prefix: str = "diann") -> "step_category": "quantification", "step_name": "precursor_quantification", "tool_name": "DIA-NN", - "tool_version": None, + "tool_version": self._diann_version, "tool_uri": None, "parameters": None, "config": None, @@ -175,5 +180,18 @@ def write_dataset( prefix, project_accession, software_name="DIA-NN", - software_version=None, + software_version=self._diann_version, ) + + @classmethod + def _parse_diann_version(cls, log_path: str | None) -> str | None: + """Extract DIA-NN version from the first line of a summary log.""" + if not log_path: + return None + try: + with open(log_path, encoding="utf-8", errors="replace") as fh: + match = cls._DIANN_VERSION_RE.search(fh.readline()) + return match.group(1) if match else None + except OSError: + logger.debug("Could not read DIA-NN log: %s", log_path) + return None diff --git a/qpx/converters/diann/pg_adapter.py b/qpx/converters/diann/pg_adapter.py index 6192aa14..1c93826f 100644 --- a/qpx/converters/diann/pg_adapter.py +++ b/qpx/converters/diann/pg_adapter.py @@ -14,6 +14,7 @@ from __future__ import annotations import logging +import re from typing import Optional import pandas as pd @@ -27,6 +28,8 @@ logger = logging.getLogger(__name__) +_EXT_RE = re.compile(r"\.(mzML|raw|d|wiff|htrms)$", re.IGNORECASE) + # Extra columns needed for PG aggregation but not in the field mappings _PG_EXTRA_COLS = [ ('"Proteotypic"', "proteotypic"), @@ -117,8 +120,9 @@ def _load_pg_matrix(self, path: str) -> pd.DataFrame: names_col = pg_matrix_resolved["pg_names"] genes_col = pg_matrix_resolved["gg_accessions"] - mzml_cols = [c for c in header if c.endswith(".mzML")] - usecols = [pg_col, names_col, genes_col] + mzml_cols + meta_cols = {pg_col, names_col, genes_col, "Protein.Ids", "First.Protein.Description"} + run_cols = [c for c in header if c not in meta_cols] + usecols = [pg_col, names_col, genes_col] + run_cols df = pd.read_csv(path, sep="\t", usecols=usecols) df.rename( columns={ @@ -128,8 +132,8 @@ def _load_pg_matrix(self, path: str) -> pd.DataFrame: }, inplace=True, ) - # Strip .mzML from column names - df.columns = [c.replace(".mzML", "") for c in df.columns] + # Strip common file extensions from run column names + df.columns = [_EXT_RE.sub("", c) for c in df.columns] return df def _get_unique_runs(self) -> list[str]: @@ -181,7 +185,7 @@ def _process_batch( # Push run_file_name extension stripping into SQL run_col = r["run_file_name"] - select_parts.append(f"regexp_replace(\"{run_col}\", '\\.(mzML|raw|d)$', '') AS run_file_name_clean") + select_parts.append(f"regexp_replace(\"{run_col}\", '\\.(mzML|raw|d|wiff|htrms)$', '') AS run_file_name_clean") select_clause = ",\n ".join(select_parts) diff --git a/qpx/converters/openms/__init__.py b/qpx/converters/openms/__init__.py new file mode 100644 index 00000000..c956edbe --- /dev/null +++ b/qpx/converters/openms/__init__.py @@ -0,0 +1,5 @@ +"""OpenMS QPX converter — enriches ProteomicsLFQ ``-out_qpx`` output.""" + +from qpx.converters.openms.converter import OpenMSConverter + +__all__ = ["OpenMSConverter"] diff --git a/qpx/converters/openms/converter.py b/qpx/converters/openms/converter.py new file mode 100644 index 00000000..312bd5cc --- /dev/null +++ b/qpx/converters/openms/converter.py @@ -0,0 +1,259 @@ +"""OpenMS QPX converter — enriches ProteomicsLFQ ``-out_qpx`` output. + +OpenMS ``ProteomicsLFQ -out_qpx`` already produces QPX-compliant +``psm.parquet``, ``feature.parquet``, and ``pg.parquet``. This converter +validates those files, copies them to the output folder, and generates the +missing metadata tables (``run``, ``sample``, ``ontology``, ``provenance``, +``dataset``) from an SDRF file. +""" + +from __future__ import annotations + +import logging +import shutil +from pathlib import Path + +import pyarrow as pa +import pyarrow.parquet as pq + +from qpx._version import __version__ +from qpx.converters.orchestrator import BaseOrchestrator +from qpx.converters.sdrf import SdrfConverter +from qpx.core.constants import FEATURE, ONTOLOGY, PG, PSM, RUN, SAMPLE +from qpx.core.data.loader import load_schema +from qpx.core.scores import score_ontology_entries + +logger = logging.getLogger(__name__) + +# Mapping from QPX view name to the schema loader name +_VIEW_SCHEMAS = { + PSM: "psm", + FEATURE: "feature", + PG: "pg", +} + + +def _discover_parquet(qpx_dir: Path) -> dict[str, Path]: + """Discover QPX parquet files in a directory. + + Looks for files matching ``*.psm.parquet``, ``*.feature.parquet``, + and ``*.pg.parquet``. Returns a dict mapping view name to path. + """ + found: dict[str, Path] = {} + for view in (PSM, FEATURE, PG): + matches = sorted(qpx_dir.glob(f"*.{view}.parquet")) + if matches: + found[view] = matches[0] + if len(matches) > 1: + logger.warning( + "Multiple %s parquet files found in %s; using %s", + view, + qpx_dir, + matches[0].name, + ) + return found + + +def _collect_score_names(table_path: Path) -> set[str]: + """Read ``additional_scores`` from a parquet file and collect score names.""" + table = pq.read_table(str(table_path), columns=["additional_scores"]) + names: set[str] = set() + col = table.column("additional_scores") + for row in col.to_pylist(): + if row: + for score in row: + if score and "name" in score and score["name"]: + names.add(score["name"]) + return names + + +def _validate_core(discovered: dict[str, Path]) -> None: + """Validate each discovered parquet file against its QPX schema.""" + for view, path in discovered.items(): + schema = load_schema(_VIEW_SCHEMAS[view]) + table = pq.read_table(str(path)) + result = schema.validate_full(table) + if not result.is_valid: + errors = "; ".join(i.message for i in result.errors) + raise ValueError(f"Validation failed for {path.name}: {errors}") + logger.info( + "%s: %s (%d rows, %d errors, %d warnings)", + view, + path.name, + table.num_rows, + len(result.errors), + len(result.warnings), + ) + + +def _copy_core( + discovered: dict[str, Path], + output_folder: Path, + output_prefix: str, +) -> dict[str, Path]: + """Copy core parquet files to output directory, skipping same-location.""" + output_paths: dict[str, Path] = {} + for view, src_path in discovered.items(): + dst = output_folder / f"{output_prefix}.{view}.parquet" + if src_path.resolve() != dst.resolve(): + shutil.copy2(str(src_path), str(dst)) + logger.info("Copied %s -> %s", src_path.name, dst.name) + else: + logger.info("Skipping copy for %s (same location)", view) + output_paths[view] = dst + return output_paths + + +class OpenMSConverter(BaseOrchestrator): + """Orchestrate OpenMS QPX enrichment to a full QPX dataset. + + Usage:: + + converter = OpenMSConverter( + qpx_dir="path/to/openms_qpx_output", + sdrf_path="metadata.sdrf.tsv", + ) + converter.convert( + output_folder="./qpx_full", + output_prefix="openms", + ) + """ + + def __init__( + self, + qpx_dir: str | Path, + sdrf_path: str | Path | None = None, + compression: str = "zstd", + ): + """Initialize the OpenMS QPX converter. + + Parameters + ---------- + qpx_dir : str or Path + Directory containing OpenMS ``-out_qpx`` parquet files. + sdrf_path : str, Path or None + Optional SDRF metadata file for sample/run generation. + compression : str + Parquet compression codec (default ``zstd``). + + """ + self.qpx_dir = Path(qpx_dir) + self.sdrf_path = str(sdrf_path) if sdrf_path else None + self._compression = compression + + def convert( + self, + output_folder: str | Path, + output_prefix: str = "openms", + project_accession: str | None = None, + ) -> None: + """Run the full enrichment pipeline. + + 1. Discover and validate core QPX parquet files + 2. Copy core files to output folder (if needed) + 3. Convert SDRF to run.parquet + sample.parquet + 4. Collect score names for ontology + 5. Write ontology.parquet, provenance.parquet, dataset.parquet + """ + output_folder = Path(output_folder) + output_folder.mkdir(parents=True, exist_ok=True) + + discovered = self.discover_and_validate() + output_paths = _copy_core(discovered, output_folder, output_prefix) + + ontology_entries = self._convert_sdrf(output_folder, output_prefix) + ontology_entries.extend(self._collect_ontology(output_paths)) + + self._write_ontology(output_folder, output_prefix, ontology_entries) + structures = list(discovered.keys()) + provenance_records = self._build_provenance(structures) + self._write_provenance(output_folder, output_prefix, provenance_records) + self._write_dataset( + output_folder, + output_prefix, + project_accession, + software_name="OpenMS/ProteomicsLFQ", + software_version=None, + provenance_records=provenance_records, + ) + logger.info("OpenMS QPX enrichment complete -> %s", output_folder) + + def discover_and_validate(self) -> dict[str, Path]: + """Discover and validate core QPX parquet files.""" + discovered = _discover_parquet(self.qpx_dir) + if not discovered: + raise FileNotFoundError( + f"No QPX parquet files (*.psm.parquet, *.feature.parquet, *.pg.parquet) found in {self.qpx_dir}" + ) + logger.info( + "Discovered %d core QPX file(s): %s", + len(discovered), + ", ".join(f"{v}={p.name}" for v, p in discovered.items()), + ) + _validate_core(discovered) + return discovered + + def _convert_sdrf( + self, + output_folder: Path, + output_prefix: str, + ) -> list[dict]: + """Convert SDRF to run + sample, returning ontology entries.""" + if not self.sdrf_path: + logger.warning("No SDRF path provided — skipping sample/run conversion") + return [] + with SdrfConverter(compression=self._compression) as sdrf_conv: + sdrf_conv.convert( + sdrf_path=self.sdrf_path, + sample_output=str(output_folder / f"{output_prefix}.sample.parquet"), + run_output=str(output_folder / f"{output_prefix}.run.parquet"), + ) + entries = list(sdrf_conv.run_ontology_entries()) + logger.info("SDRF conversion complete (sample + run)") + return entries + + @staticmethod + def _collect_ontology(output_paths: dict[str, Path]) -> list[dict]: + """Collect score-based ontology entries from core tables.""" + entries: list[dict] = [] + all_scores: set[str] = set() + for view, path in output_paths.items(): + try: + scores = _collect_score_names(path) + except (KeyError, pa.ArrowInvalid): + logger.debug("No additional_scores in %s", path.name) + continue + if scores: + entries.extend(score_ontology_entries(scores, view=view)) + all_scores |= scores + if all_scores: + logger.info("Discovered scores: %s", sorted(all_scores)) + return entries + + @staticmethod + def _build_provenance(structures: list[str]) -> list[dict]: + """Build provenance records for OpenMS + QPX conversion.""" + return [ + { + "step_order": 1, + "step_category": "quantification", + "step_name": "label_free_quantification", + "tool_name": "OpenMS/ProteomicsLFQ", + "tool_version": None, + "tool_uri": None, + "parameters": None, + "config": None, + "output_views": structures, + }, + { + "step_order": 2, + "step_category": "format_conversion", + "step_name": "openms_qpx_enrichment", + "tool_name": "qpx", + "tool_version": __version__, + "tool_uri": None, + "parameters": None, + "config": None, + "output_views": [SAMPLE, RUN, ONTOLOGY], + }, + ] diff --git a/qpx/mudata.py b/qpx/mudata.py index a9a417b1..c3d41415 100644 --- a/qpx/mudata.py +++ b/qpx/mudata.py @@ -9,7 +9,7 @@ from __future__ import annotations import logging -from pathlib import Path +from pathlib import Path, PurePosixPath from typing import TYPE_CHECKING import numpy as np @@ -69,11 +69,14 @@ def _pivot_to_sparse( return mat.tocsr() -_LABEL_FIELD_QUERIES: dict[str, str] = { - "channel": "SELECT i.channel FROM feature, UNNEST(intensities) AS _t(i) LIMIT 1", - "label": "SELECT i.label FROM feature, UNNEST(intensities) AS _t(i) LIMIT 1", +_LABEL_FIELD_QUERIES: dict[tuple[str, str], str] = { + ("channel", "feature"): "SELECT i.channel FROM feature, UNNEST(intensities) AS _t(i) LIMIT 1", + ("channel", "pg"): "SELECT i.channel FROM pg, UNNEST(intensities) AS _t(i) LIMIT 1", + ("label", "feature"): "SELECT i.label FROM feature, UNNEST(intensities) AS _t(i) LIMIT 1", + ("label", "pg"): "SELECT i.label FROM pg, UNNEST(intensities) AS _t(i) LIMIT 1", } + _PRECURSOR_QUERIES: dict[str, str] = { "channel": """ SELECT f.run_file_name, @@ -129,16 +132,19 @@ def _detect_label_field(engine: DuckDBEngine, table: str = "feature") -> str: return "label" -def _detect_intensity_label(engine: DuckDBEngine) -> str: - """Auto-detect the first intensity label from feature.parquet.""" - label_field = _detect_label_field(engine, "feature") +def _detect_intensity_label(engine: DuckDBEngine, table: str = "feature") -> str: + """Auto-detect the first intensity label from the given table.""" + label_field = _detect_label_field(engine, table) + query = _LABEL_FIELD_QUERIES.get((label_field, table)) + if query is None: + raise ValueError(f"Invalid label_field={label_field!r} or table={table!r}") try: - row = engine.execute(_LABEL_FIELD_QUERIES[label_field]).fetchone() + row = engine.execute(query).fetchone() except Exception as exc: - raise ValueError(f"Failed to read intensity labels from feature.parquet: {exc}") from exc + raise ValueError(f"Failed to read intensity labels from {table}: {exc}") from exc if row is None: - raise ValueError("No intensity labels found in feature.parquet") + raise ValueError(f"No intensity labels found in {table}") return row[0] @@ -172,8 +178,40 @@ def _load_run_obs(engine: DuckDBEngine, run_index: pd.Index) -> pd.DataFrame: # Keep first row per run_file_name (one sample per run for obs) df = df.drop_duplicates(subset="run_file_name", keep="first") df = df.set_index("run_file_name") - # Reindex to match run_index, filling missing with NaN + + # Reindex to match run_index; if direct match fails (e.g. extension + # mismatch between run.parquet and feature.parquet), try stem-based + # matching so that "BSA1_F1" matches "BSA1_F1.mzML". df = df.reindex(run_index) + if df.isna().all(axis=None) and not df.empty: + df_stems = df.index.map(lambda n: PurePosixPath(n).stem) + if not df_stems.duplicated().any(): + # Re-query with original index, map via stems + df2 = engine.execute( + """ + SELECT r.run_file_name, + r.run_accession, + rs.sample_accession, + rs.label, + rs.biological_replicate, + rs.technical_replicate, + r.fraction, + r.instrument + FROM run r, UNNEST(r.samples) AS _t(rs) + """ + ).fetchdf() + df2 = df2.drop_duplicates(subset="run_file_name", keep="first") + df2["_stem"] = df2["run_file_name"].map(lambda n: PurePosixPath(n).stem) + df2 = df2.set_index("_stem").drop(columns=["run_file_name"]) + df2.index.name = "run_file_name" + # Map stems back to original run_index names + df2 = df2.reindex([PurePosixPath(n).stem for n in run_index]) + df2.index = run_index + df = df2 + + # Fill remaining NaN in string columns to avoid h5py serialization errors + str_cols = df.select_dtypes(include=["object"]).columns + df[str_cols] = df[str_cols].fillna("") return df @@ -191,8 +229,13 @@ def _attach_uns_metadata(engine: DuckDBEngine, mdata) -> None: row = df.iloc[0] for col in df.columns: val = row[col] - if val is not None and not (isinstance(val, float) and np.isnan(val)): - mdata.uns[col] = val + if val is None: + continue + # pd.isna() catches np.nan, pd.NA, pd.NaT on scalars; guarded by is_scalar + # so that dicts/lists/arrays do not raise or get misclassified. + if pd.api.types.is_scalar(val) and pd.isna(val): + continue + mdata.uns[col] = val # --------------------------------------------------------------------------- @@ -438,15 +481,19 @@ def build_mudata( else: requested = _VALID_MODALITIES.copy() - if intensity_label is None: - intensity_label = _detect_intensity_label(engine) + feat_label_field = _detect_label_field(engine, "feature") + feat_intensity = intensity_label or _detect_intensity_label(engine, "feature") - label_field = _detect_label_field(engine, "feature") + pg_label_field = _detect_label_field(engine, "pg") + try: + pg_intensity = intensity_label or _detect_intensity_label(engine, "pg") + except ValueError: + pg_intensity = feat_intensity mod: dict = {} builders = { - "precursors": lambda: _build_precursor_adata(engine, intensity_label, label_field), - "proteins": lambda: _build_protein_adata(engine, intensity_label, label_field), + "precursors": lambda: _build_precursor_adata(engine, feat_intensity, feat_label_field), + "proteins": lambda: _build_protein_adata(engine, pg_intensity, pg_label_field), "expression": lambda: _build_expression_adata(ds_path), "differential": lambda: _build_differential_adata(ds_path), } @@ -467,4 +514,12 @@ def build_mudata( # Attach dataset-level metadata _attach_uns_metadata(engine, mdata) + # Sanitize NaN in object columns across all modalities to prevent + # h5py serialization errors (can't write NaN in vlen-string arrays). + for adata in mdata.mod.values(): + for frame in (adata.obs, adata.var): + str_cols = frame.select_dtypes(include=["object"]).columns + if len(str_cols): + frame[str_cols] = frame[str_cols].fillna("") + return mdata diff --git a/tests/converters/test_openms_converter.py b/tests/converters/test_openms_converter.py new file mode 100644 index 00000000..cd13513e --- /dev/null +++ b/tests/converters/test_openms_converter.py @@ -0,0 +1,240 @@ +"""Tests for the OpenMS QPX converter (enrichment of -out_qpx output).""" + +from __future__ import annotations + +from pathlib import Path + +import pyarrow.parquet as pq +import pytest + +from qpx.converters.openms.converter import OpenMSConverter, _discover_parquet +from qpx.core.data.loader import load_schema +from qpx.writers.feature import FeatureWriter +from qpx.writers.pg import PgWriter +from qpx.writers.psm import PsmWriter +from tests.conftest import ( + make_feature_record, + make_pg_record, + make_psm_record, +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _write_minimal_qpx(qpx_dir: Path, prefix: str = "quantms") -> None: + """Write minimal QPX-compliant psm, feature, pg parquet files.""" + # PSM + psm_records = [ + make_psm_record(sequence="PEPTIDEK", run_file_name="run_01", scan=[1001]), + make_psm_record(sequence="ANOTHERK", run_file_name="run_01", scan=[1002], charge=3), + ] + with PsmWriter(qpx_dir / f"{prefix}.psm.parquet") as w: + w.write_batch(psm_records) + + # Feature + feat_records = [ + make_feature_record(sequence="PEPTIDEK", run_file_name="run_01", anchor_protein="P12345"), + make_feature_record(sequence="ANOTHERK", run_file_name="run_01", anchor_protein="P12345", charge=3), + ] + with FeatureWriter(qpx_dir / f"{prefix}.feature.parquet") as w: + w.write_batch(feat_records) + + # PG + pg_records = [ + make_pg_record(anchor_protein="P12345", run_file_name="run_01"), + ] + with PgWriter(qpx_dir / f"{prefix}.pg.parquet") as w: + w.write_batch(pg_records) + + +def _write_minimal_sdrf(sdrf_path: Path) -> None: + """Write a minimal SDRF TSV for testing.""" + lines = [ + "source name\tcharacteristics[organism]\tcharacteristics[organism part]\t" + "comment[data file]\tcomment[label]\tcomment[instrument]\t" + "comment[cleavage agent details]\tcomment[fraction identifier]", + "sample_1\tHomo sapiens\tliver\trun_01.raw\tAC=MS:1002038;NT=label free sample\t" + "AC=MS:1000449;NT=LTQ Orbitrap\tAC=MS:1001251;NT=Trypsin\t1", + ] + sdrf_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + +# --------------------------------------------------------------------------- +# Tests: _discover_parquet +# --------------------------------------------------------------------------- + + +class TestDiscoverParquet: + def test_discovers_all_three(self, tmp_path): + _write_minimal_qpx(tmp_path) + found = _discover_parquet(tmp_path) + assert set(found.keys()) == {"psm", "feature", "pg"} + + def test_empty_dir(self, tmp_path): + found = _discover_parquet(tmp_path) + assert not found + + def test_partial(self, tmp_path): + """Only psm present.""" + psm_records = [make_psm_record()] + with PsmWriter(tmp_path / "test.psm.parquet") as w: + w.write_batch(psm_records) + found = _discover_parquet(tmp_path) + assert set(found.keys()) == {"psm"} + + +# --------------------------------------------------------------------------- +# Tests: OpenMSConverter +# --------------------------------------------------------------------------- + + +class TestOpenMSConverter: + def test_convert_full_bundle(self, tmp_path): + """Full enrichment: 3 core + SDRF -> 8 files.""" + qpx_dir = tmp_path / "openms_qpx" + qpx_dir.mkdir() + _write_minimal_qpx(qpx_dir) + + sdrf_path = tmp_path / "test.sdrf.tsv" + _write_minimal_sdrf(sdrf_path) + + output = tmp_path / "output" + converter = OpenMSConverter(qpx_dir=qpx_dir, sdrf_path=sdrf_path) + converter.convert(output_folder=output, output_prefix="openms") + + # Check all 8 files exist + expected_files = [ + "openms.psm.parquet", + "openms.feature.parquet", + "openms.pg.parquet", + "openms.sample.parquet", + "openms.run.parquet", + "openms.ontology.parquet", + "openms.provenance.parquet", + "openms.dataset.parquet", + ] + for fname in expected_files: + assert (output / fname).exists(), f"Missing: {fname}" + + def test_core_tables_validated(self, tmp_path): + """Core tables should pass QPX schema validation after copy.""" + qpx_dir = tmp_path / "openms_qpx" + qpx_dir.mkdir() + _write_minimal_qpx(qpx_dir) + + sdrf_path = tmp_path / "test.sdrf.tsv" + _write_minimal_sdrf(sdrf_path) + + output = tmp_path / "output" + converter = OpenMSConverter(qpx_dir=qpx_dir, sdrf_path=sdrf_path) + converter.convert(output_folder=output, output_prefix="openms") + + for view in ("psm", "feature", "pg"): + schema = load_schema(view) + table = pq.read_table(str(output / f"openms.{view}.parquet")) + result = schema.validate_full(table) + assert result.is_valid, f"{view} validation failed: {result.summary}" + + def test_provenance_structure(self, tmp_path): + """Provenance should have OpenMS + QPX steps.""" + qpx_dir = tmp_path / "openms_qpx" + qpx_dir.mkdir() + _write_minimal_qpx(qpx_dir) + + sdrf_path = tmp_path / "test.sdrf.tsv" + _write_minimal_sdrf(sdrf_path) + + output = tmp_path / "output" + converter = OpenMSConverter(qpx_dir=qpx_dir, sdrf_path=sdrf_path) + converter.convert(output_folder=output, output_prefix="openms") + + prov_table = pq.read_table(str(output / "openms.provenance.parquet")) + assert prov_table.num_rows == 2 + tools = prov_table.column("tool_name").to_pylist() + assert "OpenMS/ProteomicsLFQ" in tools + assert "qpx" in tools + + def test_dataset_metadata(self, tmp_path): + """Dataset should contain project accession.""" + qpx_dir = tmp_path / "openms_qpx" + qpx_dir.mkdir() + _write_minimal_qpx(qpx_dir) + + sdrf_path = tmp_path / "test.sdrf.tsv" + _write_minimal_sdrf(sdrf_path) + + output = tmp_path / "output" + converter = OpenMSConverter(qpx_dir=qpx_dir, sdrf_path=sdrf_path) + converter.convert( + output_folder=output, + output_prefix="openms", + project_accession="PXD001819", + ) + + ds_table = pq.read_table(str(output / "openms.dataset.parquet")) + assert ds_table.num_rows == 1 + assert ds_table.column("project_accession").to_pylist()[0] == "PXD001819" + assert ds_table.column("software_name").to_pylist()[0] == "OpenMS/ProteomicsLFQ" + + def test_no_qpx_files_raises(self, tmp_path): + """Empty directory should raise FileNotFoundError.""" + empty_dir = tmp_path / "empty" + empty_dir.mkdir() + converter = OpenMSConverter(qpx_dir=empty_dir) + with pytest.raises(FileNotFoundError, match="No QPX parquet files"): + converter.convert(output_folder=tmp_path / "out") + + def test_same_dir_skips_copy(self, tmp_path): + """When output_folder == qpx_dir, core files should not be copied.""" + _write_minimal_qpx(tmp_path, prefix="openms") + + sdrf_path = tmp_path / "test.sdrf.tsv" + _write_minimal_sdrf(sdrf_path) + + converter = OpenMSConverter(qpx_dir=tmp_path, sdrf_path=sdrf_path) + converter.convert(output_folder=tmp_path, output_prefix="openms") + + # Core files should still exist and metadata files should be added + assert (tmp_path / "openms.psm.parquet").exists() + assert (tmp_path / "openms.provenance.parquet").exists() + + def test_no_sdrf_skips_sample_run(self, tmp_path): + """Without SDRF, sample/run should not be generated.""" + qpx_dir = tmp_path / "openms_qpx" + qpx_dir.mkdir() + _write_minimal_qpx(qpx_dir) + + output = tmp_path / "output" + converter = OpenMSConverter(qpx_dir=qpx_dir) + converter.convert(output_folder=output, output_prefix="openms") + + # Core + provenance + dataset + ontology should exist + assert (output / "openms.psm.parquet").exists() + assert (output / "openms.provenance.parquet").exists() + assert (output / "openms.dataset.parquet").exists() + # sample/run should NOT exist + assert not (output / "openms.sample.parquet").exists() + assert not (output / "openms.run.parquet").exists() + + def test_metadata_tables_validate(self, tmp_path): + """All metadata tables should pass QPX schema validation.""" + qpx_dir = tmp_path / "openms_qpx" + qpx_dir.mkdir() + _write_minimal_qpx(qpx_dir) + + sdrf_path = tmp_path / "test.sdrf.tsv" + _write_minimal_sdrf(sdrf_path) + + output = tmp_path / "output" + converter = OpenMSConverter(qpx_dir=qpx_dir, sdrf_path=sdrf_path) + converter.convert(output_folder=output, output_prefix="openms") + + for view in ("sample", "run", "ontology", "provenance", "dataset"): + path = output / f"openms.{view}.parquet" + if path.exists(): + schema = load_schema(view) + table = pq.read_table(str(path)) + result = schema.validate_full(table) + assert result.is_valid, f"{view} validation failed: {result.summary}" diff --git a/tests/unit/test_mudata.py b/tests/unit/test_mudata.py new file mode 100644 index 00000000..fbd476cb --- /dev/null +++ b/tests/unit/test_mudata.py @@ -0,0 +1,164 @@ +"""Regression tests for qpx.mudata._attach_uns_metadata. + +Previously _attach_uns_metadata only filtered None and float NaN; it missed +pd.NA (from pandas nullable dtypes when reading parquet). That caused +mdata.write() to fail on real data with: + IORegistryError: No method registered for writing + +These tests ensure all scalar NA flavors are skipped while normal values +(including non-scalar dicts/lists) are preserved. +""" + +from __future__ import annotations + +import numpy as np +import pandas as pd +import pytest + +mudata = pytest.importorskip("mudata") +anndata = pytest.importorskip("anndata") + +from qpx.mudata import ( # noqa: E402 + _LABEL_FIELD_QUERIES, + _attach_uns_metadata, + _detect_intensity_label, +) + + +class _FakeResult: + def __init__(self, df: pd.DataFrame) -> None: + self._df = df + + def fetchdf(self) -> pd.DataFrame: + return self._df + + +class _FakeEngine: + """Minimal stand-in for qpx.core.engine.DuckDBEngine used by _attach_uns_metadata. + + _attach_uns_metadata only calls engine.execute(sql).fetchdf(), so this shim + is sufficient and avoids spinning up a real DuckDB + parquet fixture. + """ + + def __init__(self, df: pd.DataFrame) -> None: + self._df = df + + def execute(self, sql: str) -> _FakeResult: # noqa: ARG002 + return _FakeResult(self._df) + + +class _FakeRowResult: + """Returns a single-row fetchone() result.""" + + def __init__(self, value) -> None: + self._value = value + + def fetchone(self): + return (self._value,) if self._value is not None else None + + def fetchdf(self) -> pd.DataFrame: + return pd.DataFrame() + + +class _TableAwareEngine: + """Fake engine that dispatches based on SQL table references.""" + + def __init__(self, table_labels: dict[str, str]) -> None: + self._table_labels = table_labels + + def execute(self, sql: str, params=None) -> _FakeRowResult: # noqa: ARG002 + for table, label in self._table_labels.items(): + if table in sql: + if "typeof" in sql.lower(): + return _FakeRowResult('STRUCT("label" VARCHAR, intensity FLOAT)[]') + return _FakeRowResult(label) + return _FakeRowResult(None) + + +def _make_empty_mdata() -> "mudata.MuData": + return mudata.MuData({"precursors": anndata.AnnData(X=np.zeros((1, 1)))}) + + +def test_attach_uns_metadata_skips_all_na_flavors(): + """All scalar NA types must be filtered out of mdata.uns. + + Regression for IORegistryError on pd.NA when writing .h5mu. + """ + df = pd.DataFrame( + { + "software_name": ["qpx"], + "software_version": ["0.1.0"], + "file_checksums": pd.array([pd.NA], dtype="object"), + "file_row_counts": [None], + "file_sizes_bytes": [np.nan], + "creation_date": [pd.NaT], + "total_structures": [5], + } + ) + mdata = _make_empty_mdata() + + _attach_uns_metadata(_FakeEngine(df), mdata) + + assert "file_checksums" not in mdata.uns + assert "file_row_counts" not in mdata.uns + assert "file_sizes_bytes" not in mdata.uns + assert "creation_date" not in mdata.uns + + assert mdata.uns["software_name"] == "qpx" + assert mdata.uns["software_version"] == "0.1.0" + assert mdata.uns["total_structures"] == 5 + + +def test_attach_uns_metadata_preserves_non_scalar_values(): + """Dicts and lists must not be misclassified as NA by pd.isna().""" + df = pd.DataFrame( + { + "tags": [["a", "b"]], + "config": [{"key": "value"}], + } + ) + mdata = _make_empty_mdata() + + _attach_uns_metadata(_FakeEngine(df), mdata) + + assert mdata.uns["tags"] == ["a", "b"] + assert mdata.uns["config"] == {"key": "value"} + + +def test_attach_uns_metadata_allows_hdf5_write(tmp_path): + """End-to-end: mdata.write() must succeed when dataset row contains pd.NA. + + This is the symptom we originally observed on real DIA-NN conversion output. + """ + df = pd.DataFrame( + { + "software_name": ["qpx"], + "file_checksums": pd.array([pd.NA], dtype="object"), + } + ) + mdata = _make_empty_mdata() + _attach_uns_metadata(_FakeEngine(df), mdata) + + out = tmp_path / "roundtrip.h5mu" + mdata.write(out) + assert out.exists() + assert out.stat().st_size > 0 + + +def test_label_field_query_generates_table_specific_sql(): + """_LABEL_FIELD_QUERIES must embed the table name, not hardcode 'feature'.""" + sql_feat = _LABEL_FIELD_QUERIES[("label", "feature")] + sql_pg = _LABEL_FIELD_QUERIES[("label", "pg")] + assert "feature" in sql_feat and "pg" not in sql_feat + assert "pg" in sql_pg and "feature" not in sql_pg + + +def test_detect_intensity_label_per_table(): + """DIA-NN feature uses 'raw', pg uses 'LFQ' — each must be detected independently. + + Regression: previously _detect_intensity_label only inspected the feature table, + causing _build_protein_adata to query pg WHERE label='raw' and get zero rows. + """ + engine = _TableAwareEngine({"feature": "raw", "pg": "LFQ"}) + assert _detect_intensity_label(engine, "feature") == "raw" + assert _detect_intensity_label(engine, "pg") == "LFQ"