From 475e225779bcc95ce7ed7117868efc7d35c4646f Mon Sep 17 00:00:00 2001 From: gerchowl Date: Wed, 25 Feb 2026 22:18:27 +0100 Subject: [PATCH] feat(ingest): add DICOM series loader and Parquet columnar data loader Add fd5.ingest.dicom (DICOM series -> fd5 recon files via pydicom) and fd5.ingest.parquet (Parquet -> fd5 files via pyarrow). Closes #110, #117 --- pyproject.toml | 8 +- src/fd5/ingest/dicom.py | 334 +++++++++++++++ src/fd5/ingest/parquet.py | 369 +++++++++++++++++ tests/test_ingest_dicom.py | 774 +++++++++++++++++++++++++++++++++++ tests/test_ingest_parquet.py | 483 ++++++++++++++++++++++ 5 files changed, 1967 insertions(+), 1 deletion(-) create mode 100644 src/fd5/ingest/dicom.py create mode 100644 src/fd5/ingest/parquet.py create mode 100644 tests/test_ingest_dicom.py create mode 100644 tests/test_ingest_parquet.py diff --git a/pyproject.toml b/pyproject.toml index 921e089..c86d026 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,11 +31,17 @@ science = [ "pandas>=2.2", "matplotlib>=3.9", ] +dicom = [ + "pydicom>=2.4", +] nifti = [ "nibabel>=5.0", ] +parquet = [ + "pyarrow>=14.0", +] all = [ - "fd5[dev,science,nifti]", + "fd5[dev,science,dicom,nifti,parquet]", ] [build-system] diff --git a/src/fd5/ingest/dicom.py b/src/fd5/ingest/dicom.py new file mode 100644 index 0000000..ec46c77 --- /dev/null +++ b/src/fd5/ingest/dicom.py @@ -0,0 +1,334 @@ +"""fd5.ingest.dicom — DICOM series loader. + +Reads DICOM series directories and produces sealed fd5 files via ``fd5.create()``. +Requires ``pydicom>=2.4`` — install with ``pip install fd5[dicom]``. +""" + +from __future__ import annotations + +import json +from collections import defaultdict +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import numpy as np + +from fd5._types import Fd5Path +from fd5.ingest._base import hash_source_files + +try: + import pydicom +except ImportError as exc: + raise ImportError( + "pydicom is required for DICOM ingest. Install it with: pip install fd5[dicom]" + ) from exc + +import fd5 + +_PATIENT_TAGS = frozenset( + { + "PatientName", + "PatientID", + "PatientBirthDate", + "PatientBirthTime", + "PatientSex", + "PatientAge", + "PatientWeight", + "PatientAddress", + "PatientTelephoneNumbers", + "OtherPatientIDs", + "OtherPatientNames", + "EthnicGroup", + "PatientComments", + "ReferringPhysicianName", + "InstitutionName", + "InstitutionAddress", + "InstitutionalDepartmentName", + } +) + + +# --------------------------------------------------------------------------- +# Series discovery +# --------------------------------------------------------------------------- + + +def _discover_series(dicom_dir: Path) -> dict[str, list[Path]]: + """Group DICOM files by SeriesInstanceUID. + + Non-DICOM files are silently skipped. + """ + series: dict[str, list[Path]] = defaultdict(list) + for p in sorted(dicom_dir.iterdir()): + if not p.is_file(): + continue + try: + ds = pydicom.dcmread(str(p), stop_before_pixels=True, force=False) + except Exception: + continue + uid = getattr(ds, "SeriesInstanceUID", None) + if uid is not None: + series[str(uid)].append(p) + return dict(series) + + +# --------------------------------------------------------------------------- +# Volume assembly +# --------------------------------------------------------------------------- + + +def _sort_slices(dcm_files: list[Path]) -> list[pydicom.Dataset]: + """Read DICOM files and sort by ImagePositionPatient z-coordinate.""" + datasets = [pydicom.dcmread(str(p)) for p in dcm_files] + datasets.sort(key=lambda ds: float(ds.ImagePositionPatient[2])) + return datasets + + +def _assemble_volume(datasets: list[pydicom.Dataset]) -> np.ndarray: + """Stack sorted DICOM slices into a 3D float32 volume. + + Applies RescaleSlope/RescaleIntercept if present. + """ + slices = [] + for ds in datasets: + arr = ds.pixel_array.astype(np.float32) + slope = float(getattr(ds, "RescaleSlope", 1.0)) + intercept = float(getattr(ds, "RescaleIntercept", 0.0)) + arr = arr * slope + intercept + slices.append(arr) + return np.stack(slices, axis=0) + + +# --------------------------------------------------------------------------- +# Affine computation +# --------------------------------------------------------------------------- + + +def _compute_affine(datasets: list[pydicom.Dataset]) -> np.ndarray: + """Derive a 4×4 affine matrix from DICOM geometry tags. + + Uses ImagePositionPatient, ImageOrientationPatient, and PixelSpacing + from the first slice plus the z-spacing derived from the first two slices + (or SliceThickness as fallback for single-slice volumes). + """ + ds0 = datasets[0] + ipp = [float(v) for v in ds0.ImagePositionPatient] + iop = [float(v) for v in ds0.ImageOrientationPatient] + ps = [float(v) for v in ds0.PixelSpacing] + + row_cosines = np.array(iop[:3]) + col_cosines = np.array(iop[3:]) + + if len(datasets) > 1: + ipp1 = [float(v) for v in datasets[1].ImagePositionPatient] + slice_vec = np.array(ipp1) - np.array(ipp) + slice_spacing = np.linalg.norm(slice_vec) + if slice_spacing > 0: + slice_dir = slice_vec / slice_spacing + else: + slice_dir = np.cross(row_cosines, col_cosines) + slice_spacing = float(getattr(ds0, "SliceThickness", 1.0)) + else: + slice_dir = np.cross(row_cosines, col_cosines) + slice_spacing = float(getattr(ds0, "SliceThickness", 1.0)) + + affine = np.eye(4, dtype=np.float64) + affine[:3, 0] = row_cosines * ps[1] + affine[:3, 1] = col_cosines * ps[0] + affine[:3, 2] = slice_dir * slice_spacing + affine[:3, 3] = ipp + return affine + + +# --------------------------------------------------------------------------- +# Metadata extraction +# --------------------------------------------------------------------------- + + +def _extract_timestamp(ds: pydicom.Dataset) -> str: + """Extract an ISO 8601 timestamp from DICOM study/acquisition tags.""" + date_str = getattr(ds, "StudyDate", None) or getattr(ds, "AcquisitionDate", "") + time_str = getattr(ds, "StudyTime", None) or getattr(ds, "AcquisitionTime", "") + if not date_str or len(date_str) < 8: + return datetime.now(tz=timezone.utc).isoformat() + + dt_str = f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:8]}" + if time_str: + hh = time_str[:2] + mm = time_str[2:4] + ss = time_str[4:6] if len(time_str) >= 6 else "00" + dt_str += f"T{hh}:{mm}:{ss}" + return dt_str + + +def _extract_scanner(ds: pydicom.Dataset) -> str: + """Build a scanner identifier from DICOM header tags.""" + parts = [ + getattr(ds, "Manufacturer", ""), + getattr(ds, "StationName", ""), + ] + return " ".join(p for p in parts if p).strip() or "unknown" + + +# --------------------------------------------------------------------------- +# De-identification +# --------------------------------------------------------------------------- + + +def _deidentify(ds: pydicom.Dataset) -> dict[str, Any]: + """Convert DICOM dataset to a JSON-safe dict with patient tags removed.""" + result: dict[str, Any] = {} + for elem in ds: + if elem.keyword in _PATIENT_TAGS: + continue + if elem.keyword == "PixelData": + continue + try: + val = elem.value + if isinstance(val, pydicom.Sequence): + continue + if isinstance(val, (pydicom.uid.UID, pydicom.valuerep.PersonName)): + val = str(val) + if isinstance(val, bytes): + continue + if isinstance(val, pydicom.multival.MultiValue): + val = [float(v) if isinstance(v, (float, int)) else str(v) for v in val] + json.dumps(val) + result[elem.keyword] = val + except (TypeError, ValueError): + result[elem.keyword] = str(val) + return result + + +def _ds_to_json_dict(ds: pydicom.Dataset) -> dict[str, Any]: + """Convert DICOM dataset to a JSON-safe dict, preserving patient tags.""" + result: dict[str, Any] = {} + for elem in ds: + if elem.keyword == "PixelData": + continue + try: + val = elem.value + if isinstance(val, pydicom.Sequence): + continue + if isinstance(val, (pydicom.uid.UID, pydicom.valuerep.PersonName)): + val = str(val) + if isinstance(val, bytes): + continue + if isinstance(val, pydicom.multival.MultiValue): + val = [float(v) if isinstance(v, (float, int)) else str(v) for v in val] + json.dumps(val) + result[elem.keyword] = val + except (TypeError, ValueError): + result[elem.keyword] = str(val) + return result + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def ingest_dicom( + dicom_dir: Path, + output_dir: Path, + *, + product: str = "recon", + name: str, + description: str, + timestamp: str | None = None, + study_metadata: dict | None = None, + deidentify: bool = True, +) -> Path: + """Read a DICOM series directory and produce a sealed fd5 file.""" + dicom_dir = Path(dicom_dir) + output_dir = Path(output_dir) + + all_series = _discover_series(dicom_dir) + if not all_series: + raise ValueError(f"No DICOM series found in {dicom_dir}") + + series_uid = next(iter(all_series)) + dcm_files = all_series[series_uid] + + datasets = _sort_slices(dcm_files) + volume = _assemble_volume(datasets) + affine = _compute_affine(datasets) + + ref_ds = datasets[0] + ts = timestamp or _extract_timestamp(ref_ds) + scanner = _extract_scanner(ref_ds) + + file_records = hash_source_files(dcm_files) + + if deidentify: + header_dict = _deidentify(ref_ds) + else: + header_dict = _ds_to_json_dict(ref_ds) + + with fd5.create( + output_dir, + product=product, + name=name, + description=description, + timestamp=ts, + ) as builder: + builder.file.attrs["scanner"] = scanner + builder.file.attrs["vendor_series_id"] = str(series_uid) + + builder.write_product( + { + "volume": volume, + "affine": affine, + "dimension_order": "ZYX", + "reference_frame": "LPS", + "description": description, + "provenance": { + "dicom_header": json.dumps(header_dict), + }, + } + ) + + builder.write_provenance( + original_files=file_records, + ingest_tool="fd5.ingest.dicom", + ingest_version=fd5.__version__, + ingest_timestamp=ts, + ) + + result_files = sorted(output_dir.glob("*.h5")) + return result_files[-1] + + +class DicomLoader: + """Loader that reads DICOM series and produces fd5 files.""" + + @property + def supported_product_types(self) -> list[str]: + return ["recon"] + + def ingest( + self, + source: Path | str, + output_dir: Path, + *, + product: str, + name: str, + description: str, + timestamp: str | None = None, + **kwargs, + ) -> Fd5Path: + if product not in self.supported_product_types: + raise ValueError( + f"Unsupported product type {product!r}. " + f"Supported: {self.supported_product_types}" + ) + return ingest_dicom( + Path(source), + Path(output_dir), + product=product, + name=name, + description=description, + timestamp=timestamp, + **kwargs, + ) diff --git a/src/fd5/ingest/parquet.py b/src/fd5/ingest/parquet.py new file mode 100644 index 0000000..b1bd109 --- /dev/null +++ b/src/fd5/ingest/parquet.py @@ -0,0 +1,369 @@ +"""fd5.ingest.parquet — Parquet columnar data loader. + +Reads Apache Parquet files via pyarrow and produces sealed fd5 files. +Parquet's columnar layout and embedded schema map naturally to fd5's +typed datasets and attrs. +""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import numpy as np + +from fd5._types import Fd5Path +from fd5.create import create +from fd5.ingest._base import hash_source_files + +try: + import pyarrow.parquet as pq +except ImportError as _exc: + raise ImportError( + "pyarrow is required for Parquet ingest. " + "Install it with: pip install 'fd5[parquet]'" + ) from _exc + +__version__ = "0.1.0" + +_log = logging.getLogger(__name__) + +_SPECTRUM_COUNTS_ALIASES = frozenset({"counts", "count", "intensity", "rate", "y"}) +_SPECTRUM_ENERGY_ALIASES = frozenset( + {"energy", "channel", "bin", "x", "wavelength", "frequency"} +) + + +class ParquetLoader: + """Loader that reads Parquet files and produces sealed fd5 files.""" + + @property + def supported_product_types(self) -> list[str]: + return ["spectrum", "listmode", "device_data"] + + def ingest( + self, + source: Path | str, + output_dir: Path, + *, + product: str, + name: str, + description: str, + timestamp: str | None = None, + column_map: dict[str, str] | None = None, + **kwargs: Any, + ) -> Fd5Path: + """Read a Parquet file and produce a sealed fd5 file.""" + source = Path(source) + if not source.exists(): + raise FileNotFoundError(f"Source file not found: {source}") + + ts = timestamp or datetime.now(tz=timezone.utc).isoformat() + + table = pq.read_table(source) + if table.num_rows == 0: + raise ValueError(f"No data rows found in {source}") + + pq_metadata = _extract_parquet_metadata(source) + columns = _table_to_columns(table) + file_records = hash_source_files([source]) + + writer = _PRODUCT_WRITERS.get(product, _write_spectrum) + return writer( + source=source, + output_dir=output_dir, + product=product, + name=name, + description=description, + timestamp=ts, + columns=columns, + column_names=table.column_names, + column_map=column_map, + pq_metadata=pq_metadata, + file_records=file_records, + **kwargs, + ) + + +# --------------------------------------------------------------------------- +# Parquet reading helpers +# --------------------------------------------------------------------------- + + +def _extract_parquet_metadata(path: Path) -> dict[str, str]: + """Extract key-value metadata from the Parquet file footer.""" + schema = pq.read_schema(path) + raw = schema.metadata or {} + meta: dict[str, str] = {} + for k, v in raw.items(): + key = k.decode() if isinstance(k, bytes) else k + val = v.decode() if isinstance(v, bytes) else v + if not key.startswith("pandas") and not key.startswith("ARROW"): + meta[key] = val + return meta + + +def _table_to_columns(table: Any) -> dict[str, np.ndarray]: + """Convert a PyArrow table to a dict of numpy arrays.""" + columns: dict[str, np.ndarray] = {} + for col_name in table.column_names: + columns[col_name] = table.column(col_name).to_numpy() + return columns + + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + + +def _find_output_file(output_dir: Path) -> Fd5Path: + """Find the sealed fd5 file in *output_dir* after create() exits.""" + files = sorted(output_dir.glob("*.h5"), key=lambda p: p.stat().st_mtime) + return files[-1] + + +def _resolve_column( + columns: dict[str, Any], + column_map: dict[str, str] | None, + target_key: str, + aliases: frozenset[str], +) -> str | None: + """Find the source column name for *target_key* using mapping or aliases.""" + if column_map and target_key in column_map: + mapped = column_map[target_key] + if mapped in columns: + return mapped + for alias in aliases: + if alias in columns: + return alias + return None + + +# --------------------------------------------------------------------------- +# Product-specific writers +# --------------------------------------------------------------------------- + + +def _write_spectrum( + *, + source: Path, + output_dir: Path, + product: str, + name: str, + description: str, + timestamp: str, + columns: dict[str, np.ndarray], + column_names: list[str], + column_map: dict[str, str] | None, + pq_metadata: dict[str, str], + file_records: list[dict[str, Any]], + **kwargs: Any, +) -> Fd5Path: + """Write spectrum product from Parquet columns.""" + counts_col = _resolve_column( + columns, column_map, "counts", _SPECTRUM_COUNTS_ALIASES + ) + energy_col = _resolve_column( + columns, column_map, "energy", _SPECTRUM_ENERGY_ALIASES + ) + + if counts_col is None: + raise ValueError( + f"Cannot find counts column. Available: {list(columns.keys())}" + ) + + counts = np.asarray(columns[counts_col], dtype=np.float32) + + axes = [] + if energy_col is not None: + energy_vals = np.asarray(columns[energy_col], dtype=np.float64) + units = pq_metadata.get("units", "arb") + half_step = 0.0 + if len(energy_vals) > 1: + half_step = (energy_vals[1] - energy_vals[0]) / 2.0 + bin_edges = np.append(energy_vals - half_step, energy_vals[-1] + half_step) + axes.append( + { + "label": energy_col, + "units": units, + "unitSI": 1.0, + "bin_edges": bin_edges, + "description": f"{energy_col} axis", + } + ) + + product_data: dict[str, Any] = {"counts": counts} + if axes: + product_data["axes"] = axes + + with create( + output_dir, + product=product, + name=name, + description=description, + timestamp=timestamp, + ) as builder: + builder.write_product(product_data) + + if pq_metadata: + builder.write_metadata(pq_metadata) + + builder.write_provenance( + original_files=file_records, + ingest_tool="fd5.ingest.parquet", + ingest_version=__version__, + ingest_timestamp=timestamp, + ) + + return _find_output_file(output_dir) + + +def _write_listmode( + *, + source: Path, + output_dir: Path, + product: str, + name: str, + description: str, + timestamp: str, + columns: dict[str, np.ndarray], + column_names: list[str], + column_map: dict[str, str] | None, + pq_metadata: dict[str, str], + file_records: list[dict[str, Any]], + mode: str = "3d", + table_pos: float = 0.0, + z_min: float = 0.0, + z_max: float = 0.0, + **kwargs: Any, +) -> Fd5Path: + """Write listmode product — columns placed in raw_data group.""" + time_col = _resolve_column( + columns, + column_map, + "time", + frozenset({"time", "timestamp", "t", "elapsed"}), + ) + time_arr = ( + np.asarray(columns[time_col], dtype=np.float64) + if time_col + else np.arange(len(next(iter(columns.values()))), dtype=np.float64) + ) + duration = float(time_arr[-1] - time_arr[0]) if len(time_arr) > 1 else 0.0 + + raw_datasets: dict[str, np.ndarray] = {} + for col_name in column_names: + raw_datasets[col_name] = np.asarray(columns[col_name]) + + product_data: dict[str, Any] = { + "mode": mode, + "table_pos": table_pos, + "duration": duration, + "z_min": z_min, + "z_max": z_max, + "raw_data": raw_datasets, + } + + with create( + output_dir, + product=product, + name=name, + description=description, + timestamp=timestamp, + ) as builder: + builder.write_product(product_data) + + if pq_metadata: + builder.write_metadata(pq_metadata) + + builder.write_provenance( + original_files=file_records, + ingest_tool="fd5.ingest.parquet", + ingest_version=__version__, + ingest_timestamp=timestamp, + ) + + return _find_output_file(output_dir) + + +def _write_device_data( + *, + source: Path, + output_dir: Path, + product: str, + name: str, + description: str, + timestamp: str, + columns: dict[str, np.ndarray], + column_names: list[str], + column_map: dict[str, str] | None, + pq_metadata: dict[str, str], + file_records: list[dict[str, Any]], + device_type: str = "environmental_sensor", + device_model: str = "unknown", + **kwargs: Any, +) -> Fd5Path: + """Write device_data product from Parquet columns.""" + time_col = _resolve_column( + columns, + column_map, + "timestamp", + frozenset({"timestamp", "time", "t", "elapsed"}), + ) + signal_cols = [h for h in column_names if h != (time_col or "") and h in columns] + + time_arr = ( + np.asarray(columns[time_col], dtype=np.float64) + if time_col + else np.arange(len(next(iter(columns.values()))), dtype=np.float64) + ) + + duration = float(time_arr[-1] - time_arr[0]) if len(time_arr) > 1 else 0.0 + + channels: dict[str, dict[str, Any]] = {} + for col_name in signal_cols: + signal = np.asarray(columns[col_name], dtype=np.float64) + sampling_rate = len(signal) / max(duration, 1.0) + channels[col_name] = { + "signal": signal, + "time": time_arr, + "sampling_rate": sampling_rate, + "units": pq_metadata.get("units", "arb"), + "unitSI": 1.0, + "description": f"{col_name} channel", + } + + product_data: dict[str, Any] = { + "device_type": device_type, + "device_model": device_model, + "recording_start": timestamp, + "recording_duration": duration, + "channels": channels, + } + + with create( + output_dir, + product=product, + name=name, + description=description, + timestamp=timestamp, + ) as builder: + builder.write_product(product_data) + + builder.write_provenance( + original_files=file_records, + ingest_tool="fd5.ingest.parquet", + ingest_version=__version__, + ingest_timestamp=timestamp, + ) + + return _find_output_file(output_dir) + + +_PRODUCT_WRITERS = { + "spectrum": _write_spectrum, + "listmode": _write_listmode, + "device_data": _write_device_data, +} diff --git a/tests/test_ingest_dicom.py b/tests/test_ingest_dicom.py new file mode 100644 index 0000000..906e429 --- /dev/null +++ b/tests/test_ingest_dicom.py @@ -0,0 +1,774 @@ +"""Tests for fd5.ingest.dicom — DICOM series loader.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import h5py +import numpy as np +import pydicom +import pytest +from pydicom.dataset import FileDataset +from pydicom.uid import ExplicitVRLittleEndian, generate_uid + +from fd5.ingest._base import Loader + + +# --------------------------------------------------------------------------- +# Helpers — synthetic DICOM generation +# --------------------------------------------------------------------------- + +_STUDY_UID = generate_uid() +_SERIES_UID = generate_uid() +_FRAME_OF_REF_UID = generate_uid() + + +def _make_dicom_slice( + tmp_dir: Path, + *, + slice_idx: int, + n_slices: int = 4, + rows: int = 8, + cols: int = 8, + series_uid: str = _SERIES_UID, + study_uid: str = _STUDY_UID, + patient_name: str = "Doe^John", + patient_id: str = "PAT001", +) -> Path: + """Create a single synthetic DICOM CT slice file.""" + sop_uid = generate_uid() + filename = tmp_dir / f"slice_{slice_idx:04d}.dcm" + + file_meta = pydicom.Dataset() + file_meta.MediaStorageSOPClassUID = "1.2.840.10008.5.1.4.1.1.2" # CT + file_meta.MediaStorageSOPInstanceUID = sop_uid + file_meta.TransferSyntaxUID = ExplicitVRLittleEndian + + ds = FileDataset(str(filename), {}, file_meta=file_meta, preamble=b"\x00" * 128) + + ds.SOPClassUID = file_meta.MediaStorageSOPClassUID + ds.SOPInstanceUID = sop_uid + ds.StudyInstanceUID = study_uid + ds.SeriesInstanceUID = series_uid + ds.FrameOfReferenceUID = _FRAME_OF_REF_UID + + ds.Modality = "CT" + ds.Manufacturer = "TestVendor" + ds.StationName = "SCANNER_01" + ds.StudyDescription = "Test Study" + ds.SeriesDescription = "Test Series" + ds.StudyDate = "20250101" + ds.StudyTime = "120000" + ds.AcquisitionDate = "20250101" + ds.AcquisitionTime = "120000" + ds.ContentDate = "20250101" + ds.ContentTime = "120000" + ds.InstanceNumber = slice_idx + 1 + ds.SeriesNumber = 1 + + ds.PatientName = patient_name + ds.PatientID = patient_id + ds.PatientBirthDate = "19800101" + + ds.Rows = rows + ds.Columns = cols + ds.BitsAllocated = 16 + ds.BitsStored = 16 + ds.HighBit = 15 + ds.PixelRepresentation = 1 # signed + ds.SamplesPerPixel = 1 + ds.PhotometricInterpretation = "MONOCHROME2" + ds.RescaleSlope = 1.0 + ds.RescaleIntercept = -1024.0 + + ds.PixelSpacing = [1.0, 1.0] + ds.SliceThickness = 2.0 + z_pos = -float(n_slices) + slice_idx * 2.0 + ds.ImagePositionPatient = [0.0, 0.0, z_pos] + ds.ImageOrientationPatient = [1.0, 0.0, 0.0, 0.0, 1.0, 0.0] + ds.SliceLocation = z_pos + + rng = np.random.default_rng(42 + slice_idx) + pixel_data = rng.integers(-100, 1000, size=(rows, cols), dtype=np.int16) + ds.PixelData = pixel_data.tobytes() + + ds.save_as(str(filename)) + return filename + + +def _make_dicom_series( + tmp_path: Path, + *, + n_slices: int = 4, + rows: int = 8, + cols: int = 8, + **kwargs, +) -> Path: + """Create a directory with a synthetic DICOM CT series.""" + dicom_dir = tmp_path / "dicom_series" + dicom_dir.mkdir() + for i in range(n_slices): + _make_dicom_slice( + dicom_dir, slice_idx=i, n_slices=n_slices, rows=rows, cols=cols, **kwargs + ) + return dicom_dir + + +# --------------------------------------------------------------------------- +# Protocol conformance +# --------------------------------------------------------------------------- + + +class TestProtocolConformance: + def test_dicom_loader_satisfies_loader_protocol(self): + from fd5.ingest.dicom import DicomLoader + + loader = DicomLoader() + assert isinstance(loader, Loader) + + def test_supported_product_types_includes_recon(self): + from fd5.ingest.dicom import DicomLoader + + loader = DicomLoader() + assert "recon" in loader.supported_product_types + + +# --------------------------------------------------------------------------- +# ImportError when pydicom missing +# --------------------------------------------------------------------------- + + +class TestImportGuard: + def test_module_importable_with_pydicom(self): + from fd5.ingest import dicom # noqa: F401 + + +# --------------------------------------------------------------------------- +# ingest_dicom public function +# --------------------------------------------------------------------------- + + +class TestIngestDicomFunction: + def test_function_is_importable(self): + from fd5.ingest.dicom import ingest_dicom + + assert callable(ingest_dicom) + + def test_returns_path(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series(tmp_path) + out_dir = tmp_path / "output" + result = ingest_dicom( + dicom_dir, out_dir, name="test-ct", description="Test CT scan" + ) + assert isinstance(result, Path) + assert result.exists() + + def test_output_is_valid_hdf5(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series(tmp_path) + out_dir = tmp_path / "output" + result = ingest_dicom( + dicom_dir, out_dir, name="test-ct", description="Test CT scan" + ) + with h5py.File(result, "r") as f: + assert "volume" in f + + +# --------------------------------------------------------------------------- +# Series discovery +# --------------------------------------------------------------------------- + + +class TestSeriesDiscovery: + def test_discovers_single_series(self, tmp_path): + from fd5.ingest.dicom import _discover_series + + dicom_dir = _make_dicom_series(tmp_path, n_slices=4) + series = _discover_series(dicom_dir) + assert len(series) == 1 + uid = list(series.keys())[0] + assert len(series[uid]) == 4 + + def test_discovers_multiple_series(self, tmp_path): + from fd5.ingest.dicom import _discover_series + + dicom_dir = tmp_path / "mixed" + dicom_dir.mkdir() + uid_a = generate_uid() + uid_b = generate_uid() + for i in range(3): + _make_dicom_slice(dicom_dir, slice_idx=i, series_uid=uid_a) + for i in range(2): + _make_dicom_slice(dicom_dir, slice_idx=10 + i, series_uid=uid_b) + series = _discover_series(dicom_dir) + assert len(series) == 2 + + def test_ignores_non_dicom_files(self, tmp_path): + from fd5.ingest.dicom import _discover_series + + dicom_dir = _make_dicom_series(tmp_path, n_slices=2) + (dicom_dir / "readme.txt").write_text("not dicom") + (dicom_dir / "data.json").write_text("{}") + series = _discover_series(dicom_dir) + assert len(series) == 1 + uid = list(series.keys())[0] + assert len(series[uid]) == 2 + + +# --------------------------------------------------------------------------- +# Volume assembly +# --------------------------------------------------------------------------- + + +class TestVolumeAssembly: + def test_volume_shape_matches_slices(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + n_slices, rows, cols = 4, 8, 8 + dicom_dir = _make_dicom_series( + tmp_path, n_slices=n_slices, rows=rows, cols=cols + ) + out_dir = tmp_path / "output" + result = ingest_dicom( + dicom_dir, out_dir, name="test-ct", description="Test CT volume" + ) + with h5py.File(result, "r") as f: + vol = f["volume"] + assert vol.shape == (n_slices, rows, cols) + + def test_slices_sorted_by_position(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series(tmp_path, n_slices=4) + out_dir = tmp_path / "output" + result = ingest_dicom( + dicom_dir, out_dir, name="test-ct", description="Test CT volume" + ) + with h5py.File(result, "r") as f: + vol = f["volume"] + assert vol.ndim == 3 + + def test_volume_dtype_is_float32(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series(tmp_path, n_slices=4) + out_dir = tmp_path / "output" + result = ingest_dicom( + dicom_dir, out_dir, name="test-ct", description="Test CT volume" + ) + with h5py.File(result, "r") as f: + assert f["volume"].dtype == np.float32 + + +# --------------------------------------------------------------------------- +# Affine computation +# --------------------------------------------------------------------------- + + +class TestAffineComputation: + def test_affine_exists_on_volume(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series(tmp_path) + out_dir = tmp_path / "output" + result = ingest_dicom(dicom_dir, out_dir, name="test-ct", description="Test CT") + with h5py.File(result, "r") as f: + assert "affine" in f["volume"].attrs + aff = f["volume"].attrs["affine"] + assert aff.shape == (4, 4) + assert aff.dtype == np.float64 + + def test_affine_pixel_spacing(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series(tmp_path) + out_dir = tmp_path / "output" + result = ingest_dicom(dicom_dir, out_dir, name="test-ct", description="Test CT") + with h5py.File(result, "r") as f: + aff = f["volume"].attrs["affine"] + assert aff[0, 0] != 0 or aff[1, 0] != 0 or aff[2, 0] != 0 + + def test_affine_last_row_is_0001(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series(tmp_path) + out_dir = tmp_path / "output" + result = ingest_dicom(dicom_dir, out_dir, name="test-ct", description="Test CT") + with h5py.File(result, "r") as f: + aff = f["volume"].attrs["affine"] + np.testing.assert_array_equal(aff[3, :], [0, 0, 0, 1]) + + +# --------------------------------------------------------------------------- +# Metadata extraction +# --------------------------------------------------------------------------- + + +class TestMetadataExtraction: + def test_root_attrs_present(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series(tmp_path) + out_dir = tmp_path / "output" + result = ingest_dicom(dicom_dir, out_dir, name="test-ct", description="Test CT") + with h5py.File(result, "r") as f: + assert f.attrs["product"] == "recon" + assert f.attrs["name"] == "test-ct" + assert f.attrs["description"] == "Test CT" + + def test_timestamp_extracted_from_dicom(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series(tmp_path) + out_dir = tmp_path / "output" + result = ingest_dicom(dicom_dir, out_dir, name="test-ct", description="Test CT") + with h5py.File(result, "r") as f: + ts = f.attrs["timestamp"] + if isinstance(ts, bytes): + ts = ts.decode() + assert "2025" in ts + + def test_timestamp_override(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series(tmp_path) + out_dir = tmp_path / "output" + result = ingest_dicom( + dicom_dir, + out_dir, + name="test-ct", + description="Test CT", + timestamp="2024-06-15T08:00:00", + ) + with h5py.File(result, "r") as f: + ts = f.attrs["timestamp"] + if isinstance(ts, bytes): + ts = ts.decode() + assert ts == "2024-06-15T08:00:00" + + def test_scanner_attr_on_volume(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series(tmp_path) + out_dir = tmp_path / "output" + result = ingest_dicom(dicom_dir, out_dir, name="test-ct", description="Test CT") + with h5py.File(result, "r") as f: + assert "scanner" in f.attrs or "scanner" in f["volume"].attrs + + +# --------------------------------------------------------------------------- +# Provenance — original files +# --------------------------------------------------------------------------- + + +class TestProvenance: + def test_original_files_recorded(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series(tmp_path, n_slices=3) + out_dir = tmp_path / "output" + result = ingest_dicom(dicom_dir, out_dir, name="test-ct", description="Test CT") + with h5py.File(result, "r") as f: + assert "provenance/original_files" in f + ds = f["provenance/original_files"] + assert ds.shape[0] == 3 + + def test_original_files_have_sha256(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series(tmp_path, n_slices=2) + out_dir = tmp_path / "output" + result = ingest_dicom(dicom_dir, out_dir, name="test-ct", description="Test CT") + with h5py.File(result, "r") as f: + ds = f["provenance/original_files"] + for row in ds: + sha = row["sha256"] + if isinstance(sha, bytes): + sha = sha.decode() + assert sha.startswith("sha256:") + + def test_ingest_provenance_recorded(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series(tmp_path) + out_dir = tmp_path / "output" + result = ingest_dicom(dicom_dir, out_dir, name="test-ct", description="Test CT") + with h5py.File(result, "r") as f: + assert "provenance/ingest" in f + ingest_grp = f["provenance/ingest"] + tool = ingest_grp.attrs.get("tool", "") + if isinstance(tool, bytes): + tool = tool.decode() + assert "dicom" in tool.lower() or "fd5" in tool.lower() + + +# --------------------------------------------------------------------------- +# De-identification +# --------------------------------------------------------------------------- + + +class TestDeidentification: + def test_patient_name_stripped_by_default(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series( + tmp_path, patient_name="Smith^Jane", patient_id="SECRET_ID" + ) + out_dir = tmp_path / "output" + result = ingest_dicom(dicom_dir, out_dir, name="test-ct", description="Test CT") + with h5py.File(result, "r") as f: + + def _check_no_patient_data(name, obj): + for attr_name, attr_val in obj.attrs.items(): + val = attr_val + if isinstance(val, bytes): + val = val.decode("utf-8", errors="replace") + if isinstance(val, str): + assert "Smith" not in val + assert "SECRET_ID" not in val + + f.visititems(_check_no_patient_data) + for attr_name, attr_val in f.attrs.items(): + val = attr_val + if isinstance(val, bytes): + val = val.decode("utf-8", errors="replace") + if isinstance(val, str): + assert "Smith" not in val + assert "SECRET_ID" not in val + + def test_dicom_header_provenance_is_deidentified(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series( + tmp_path, patient_name="Doe^John", patient_id="PAT001" + ) + out_dir = tmp_path / "output" + result = ingest_dicom(dicom_dir, out_dir, name="test-ct", description="Test CT") + with h5py.File(result, "r") as f: + if "provenance/dicom_header" in f: + header_raw = f["provenance/dicom_header"][()] + if isinstance(header_raw, bytes): + header_raw = header_raw.decode() + assert "Doe" not in header_raw + assert "PAT001" not in header_raw + + def test_deidentify_false_preserves_patient_data(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series( + tmp_path, patient_name="Smith^Jane", patient_id="KEEP_ID" + ) + out_dir = tmp_path / "output" + result = ingest_dicom( + dicom_dir, + out_dir, + name="test-ct", + description="Test CT", + deidentify=False, + ) + with h5py.File(result, "r") as f: + if "provenance/dicom_header" in f: + header_raw = f["provenance/dicom_header"][()] + if isinstance(header_raw, bytes): + header_raw = header_raw.decode() + assert "Smith" in header_raw or "KEEP_ID" in header_raw + + +# --------------------------------------------------------------------------- +# fd5 validate integration +# --------------------------------------------------------------------------- + + +class TestFd5Validate: + def test_output_passes_fd5_validate(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + from fd5.schema import validate + + dicom_dir = _make_dicom_series(tmp_path) + out_dir = tmp_path / "output" + result = ingest_dicom(dicom_dir, out_dir, name="test-ct", description="Test CT") + errors = validate(result) + assert errors == [], [e.message for e in errors] + + +# --------------------------------------------------------------------------- +# DicomLoader.ingest method +# --------------------------------------------------------------------------- + + +class TestDicomLoaderIngest: + def test_loader_ingest_produces_file(self, tmp_path): + from fd5.ingest.dicom import DicomLoader + + loader = DicomLoader() + dicom_dir = _make_dicom_series(tmp_path) + out_dir = tmp_path / "output" + result = loader.ingest( + dicom_dir, + out_dir, + product="recon", + name="loader-test", + description="Loader test", + ) + assert isinstance(result, Path) + assert result.exists() + + def test_loader_ingest_unsupported_product_raises(self, tmp_path): + from fd5.ingest.dicom import DicomLoader + + loader = DicomLoader() + dicom_dir = _make_dicom_series(tmp_path) + out_dir = tmp_path / "output" + with pytest.raises(ValueError, match="product"): + loader.ingest( + dicom_dir, + out_dir, + product="unknown_product", + name="test", + description="test", + ) + + +# --------------------------------------------------------------------------- +# Edge cases +# --------------------------------------------------------------------------- + + +class TestEdgeCases: + def test_empty_directory_raises(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + empty_dir = tmp_path / "empty" + empty_dir.mkdir() + out_dir = tmp_path / "output" + with pytest.raises((ValueError, FileNotFoundError)): + ingest_dicom(empty_dir, out_dir, name="test", description="test") + + def test_single_slice(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series(tmp_path, n_slices=1, rows=4, cols=4) + out_dir = tmp_path / "output" + result = ingest_dicom( + dicom_dir, out_dir, name="test-ct", description="Single slice" + ) + with h5py.File(result, "r") as f: + assert f["volume"].shape == (1, 4, 4) + + def test_directory_entries_are_skipped(self, tmp_path): + from fd5.ingest.dicom import _discover_series + + dicom_dir = _make_dicom_series(tmp_path, n_slices=2) + (dicom_dir / "subdir").mkdir() + series = _discover_series(dicom_dir) + assert len(series) == 1 + uid = list(series.keys())[0] + assert len(series[uid]) == 2 + + +# --------------------------------------------------------------------------- +# _deidentify / _ds_to_json_dict internal helpers +# --------------------------------------------------------------------------- + + +class TestSerializationHelpers: + def test_deidentify_strips_patient_tags(self): + from fd5.ingest.dicom import _deidentify + + ds = pydicom.Dataset() + ds.PatientName = "Secret^Name" + ds.PatientID = "ID123" + ds.Modality = "CT" + result = _deidentify(ds) + assert "PatientName" not in result + assert "PatientID" not in result + assert result["Modality"] == "CT" + + def test_ds_to_json_dict_preserves_patient_tags(self): + from fd5.ingest.dicom import _ds_to_json_dict + + ds = pydicom.Dataset() + ds.PatientName = "Keep^Me" + ds.PatientID = "KEEP_ID" + ds.Modality = "CT" + result = _ds_to_json_dict(ds) + assert "PatientName" in result + assert "PatientID" in result + + def test_deidentify_handles_sequence_elements(self): + from fd5.ingest.dicom import _deidentify + + ds = pydicom.Dataset() + ds.Modality = "CT" + inner = pydicom.Dataset() + inner.CodeValue = "123" + ds.AnatomicRegionSequence = pydicom.Sequence([inner]) + result = _deidentify(ds) + assert "AnatomicRegionSequence" not in result + assert "Modality" in result + + def test_deidentify_handles_non_serializable_value(self): + from fd5.ingest.dicom import _deidentify + + ds = pydicom.Dataset() + ds.Modality = "CT" + ds.add_new(0x7FE00010, "OB", b"\x00\x01\x02") + result = _deidentify(ds) + assert "Modality" in result + + def test_ds_to_json_dict_handles_sequence(self): + from fd5.ingest.dicom import _ds_to_json_dict + + ds = pydicom.Dataset() + ds.Modality = "MR" + inner = pydicom.Dataset() + inner.CodeValue = "456" + ds.AnatomicRegionSequence = pydicom.Sequence([inner]) + result = _ds_to_json_dict(ds) + assert "AnatomicRegionSequence" not in result + + def test_ds_to_json_dict_handles_bytes(self): + from fd5.ingest.dicom import _ds_to_json_dict + + ds = pydicom.Dataset() + ds.Modality = "CT" + ds.add_new(0x7FE00010, "OB", b"\x00\x01\x02") + result = _ds_to_json_dict(ds) + assert "Modality" in result + + +# --------------------------------------------------------------------------- +# _extract_timestamp edge cases +# --------------------------------------------------------------------------- + + +class TestExtractTimestamp: + def test_missing_date_falls_back_to_now(self): + from fd5.ingest.dicom import _extract_timestamp + + ds = pydicom.Dataset() + ts = _extract_timestamp(ds) + assert len(ts) > 0 + + def test_date_without_time(self): + from fd5.ingest.dicom import _extract_timestamp + + ds = pydicom.Dataset() + ds.StudyDate = "20240315" + ts = _extract_timestamp(ds) + assert ts == "2024-03-15" + + def test_short_time_string(self): + from fd5.ingest.dicom import _extract_timestamp + + ds = pydicom.Dataset() + ds.StudyDate = "20240315" + ds.StudyTime = "1234" + ts = _extract_timestamp(ds) + assert ts == "2024-03-15T12:34:00" + + def test_malformed_date_falls_back(self): + from fd5.ingest.dicom import _extract_timestamp + + ds = pydicom.Dataset() + ds.StudyDate = "X" + ts = _extract_timestamp(ds) + assert len(ts) > 0 + + def test_acquisition_date_fallback(self): + from fd5.ingest.dicom import _extract_timestamp + + ds = pydicom.Dataset() + ds.AcquisitionDate = "20240601" + ds.AcquisitionTime = "093000" + ts = _extract_timestamp(ds) + assert ts == "2024-06-01T09:30:00" + + +# --------------------------------------------------------------------------- +# _compute_affine edge case — zero slice spacing +# --------------------------------------------------------------------------- + + +class TestAffineEdgeCases: + def test_zero_spacing_uses_slice_thickness(self, tmp_path): + from fd5.ingest.dicom import _compute_affine + + ds0 = pydicom.Dataset() + ds0.ImagePositionPatient = [0.0, 0.0, 0.0] + ds0.ImageOrientationPatient = [1.0, 0.0, 0.0, 0.0, 1.0, 0.0] + ds0.PixelSpacing = [1.0, 1.0] + ds0.SliceThickness = 3.0 + + ds1 = pydicom.Dataset() + ds1.ImagePositionPatient = [0.0, 0.0, 0.0] + ds1.ImageOrientationPatient = [1.0, 0.0, 0.0, 0.0, 1.0, 0.0] + ds1.PixelSpacing = [1.0, 1.0] + + aff = _compute_affine([ds0, ds1]) + assert aff.shape == (4, 4) + np.testing.assert_array_equal(aff[3, :], [0, 0, 0, 1]) + + def test_deidentify_non_json_serializable_value(self): + from fd5.ingest.dicom import _deidentify + + ds = pydicom.Dataset() + ds.Modality = "CT" + ds.add_new(0x00091002, "UN", b"\xff\xfe") + result = _deidentify(ds) + assert "Modality" in result + + def test_ds_to_json_dict_non_serializable_value(self): + from fd5.ingest.dicom import _ds_to_json_dict + + ds = pydicom.Dataset() + ds.Modality = "CT" + ds.add_new(0x00091002, "UN", b"\xff\xfe") + result = _ds_to_json_dict(ds) + assert "Modality" in result + + def test_deidentify_non_json_serializable_type(self): + """Force the except (TypeError, ValueError) branch in _deidentify.""" + from unittest.mock import patch + + from fd5.ingest.dicom import _deidentify + + ds = pydicom.Dataset() + ds.Modality = "CT" + + original_dumps = json.dumps + + def _failing_dumps(val, **kw): + if val == "CT": + raise TypeError("mock non-serializable") + return original_dumps(val, **kw) + + with patch("fd5.ingest.dicom.json.dumps", side_effect=_failing_dumps): + result = _deidentify(ds) + assert "Modality" in result + assert result["Modality"] == "CT" + + def test_ds_to_json_dict_non_json_serializable_type(self): + """Force the except (TypeError, ValueError) branch in _ds_to_json_dict.""" + from unittest.mock import patch + + from fd5.ingest.dicom import _ds_to_json_dict + + ds = pydicom.Dataset() + ds.Modality = "MR" + + original_dumps = json.dumps + + def _failing_dumps(val, **kw): + if val == "MR": + raise TypeError("mock non-serializable") + return original_dumps(val, **kw) + + with patch("fd5.ingest.dicom.json.dumps", side_effect=_failing_dumps): + result = _ds_to_json_dict(ds) + assert "Modality" in result + assert result["Modality"] == "MR" diff --git a/tests/test_ingest_parquet.py b/tests/test_ingest_parquet.py new file mode 100644 index 0000000..18512a4 --- /dev/null +++ b/tests/test_ingest_parquet.py @@ -0,0 +1,483 @@ +"""Tests for fd5.ingest.parquet — Parquet columnar data loader.""" + +from __future__ import annotations + +import hashlib +from pathlib import Path +from typing import Any + +import h5py +import numpy as np +import pyarrow as pa +import pyarrow.parquet as pq +import pytest + +from fd5.ingest._base import Loader +from fd5.ingest.parquet import ParquetLoader + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _write_parquet( + path: Path, + columns: dict[str, list[Any]], + *, + metadata: dict[str, str] | None = None, +) -> Path: + """Write a Parquet file from column data with optional key-value metadata.""" + arrays = {} + for name, values in columns.items(): + arrays[name] = pa.array(values) + table = pa.table(arrays) + if metadata: + existing = table.schema.metadata or {} + merged = {**existing, **{k.encode(): v.encode() for k, v in metadata.items()}} + table = table.replace_schema_metadata(merged) + pq.write_table(table, path) + return path + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def loader() -> ParquetLoader: + return ParquetLoader() + + +@pytest.fixture() +def spectrum_parquet(tmp_path: Path) -> Path: + """Parquet with energy + counts columns.""" + return _write_parquet( + tmp_path / "spectrum.parquet", + {"energy": [100.0, 200.0, 300.0, 400.0], "counts": [10.0, 25.0, 18.0, 7.0]}, + metadata={"units": "keV", "detector": "HPGe"}, + ) + + +@pytest.fixture() +def listmode_parquet(tmp_path: Path) -> Path: + """Parquet with event-level fields: time, energy, detector_id.""" + return _write_parquet( + tmp_path / "listmode.parquet", + { + "time": [0.001, 0.002, 0.005, 0.010], + "energy": [511.0, 511.0, 1274.5, 511.0], + "detector_id": [1, 2, 1, 3], + }, + ) + + +@pytest.fixture() +def device_data_parquet(tmp_path: Path) -> Path: + """Parquet with timestamp + sensor channels.""" + return _write_parquet( + tmp_path / "device.parquet", + { + "timestamp": [0.0, 1.0, 2.0], + "temperature": [22.5, 22.6, 22.4], + "pressure": [101.3, 101.2, 101.4], + }, + metadata={"units": "celsius"}, + ) + + +@pytest.fixture() +def generic_parquet(tmp_path: Path) -> Path: + """Parquet with arbitrary columns.""" + return _write_parquet( + tmp_path / "generic.parquet", + {"x": [1.0, 2.0, 3.0], "y": [4.0, 5.0, 6.0], "z": [7.0, 8.0, 9.0]}, + ) + + +@pytest.fixture() +def metadata_parquet(tmp_path: Path) -> Path: + """Parquet with rich key-value footer metadata.""" + return _write_parquet( + tmp_path / "metadata.parquet", + {"energy": [100.0, 200.0], "counts": [50.0, 75.0]}, + metadata={ + "units": "keV", + "detector": "HPGe", + "facility": "PSI", + "measurement_id": "M-2026-001", + }, + ) + + +@pytest.fixture() +def int_columns_parquet(tmp_path: Path) -> Path: + """Parquet with integer-typed columns (schema extraction test).""" + return _write_parquet( + tmp_path / "int_cols.parquet", + {"channel": [1, 2, 3, 4], "counts": [100, 250, 50, 10]}, + ) + + +# --------------------------------------------------------------------------- +# Loader protocol conformance +# --------------------------------------------------------------------------- + + +class TestParquetLoaderProtocol: + """ParquetLoader satisfies the Loader protocol.""" + + def test_is_loader_instance(self, loader: ParquetLoader): + assert isinstance(loader, Loader) + + def test_supported_product_types(self, loader: ParquetLoader): + types = loader.supported_product_types + assert isinstance(types, list) + assert "spectrum" in types + assert "listmode" in types + assert "device_data" in types + + def test_has_ingest_method(self, loader: ParquetLoader): + assert callable(getattr(loader, "ingest", None)) + + +# --------------------------------------------------------------------------- +# Spectrum ingest — happy path +# --------------------------------------------------------------------------- + + +class TestIngestSpectrum: + """Ingest spectrum Parquet produces valid fd5 file.""" + + def test_returns_path( + self, loader: ParquetLoader, spectrum_parquet: Path, tmp_path: Path + ): + result = loader.ingest( + spectrum_parquet, + tmp_path / "out", + product="spectrum", + name="Test spectrum", + description="A test spectrum from Parquet", + timestamp="2026-02-25T12:00:00+00:00", + ) + assert isinstance(result, Path) + assert result.exists() + assert result.suffix == ".h5" + + def test_root_attrs( + self, loader: ParquetLoader, spectrum_parquet: Path, tmp_path: Path + ): + result = loader.ingest( + spectrum_parquet, + tmp_path / "out", + product="spectrum", + name="Test spectrum", + description="A test spectrum from Parquet", + timestamp="2026-02-25T12:00:00+00:00", + ) + with h5py.File(result, "r") as f: + assert f.attrs["product"] == "spectrum" + assert f.attrs["name"] == "Test spectrum" + assert f.attrs["description"] == "A test spectrum from Parquet" + + def test_data_written( + self, loader: ParquetLoader, spectrum_parquet: Path, tmp_path: Path + ): + result = loader.ingest( + spectrum_parquet, + tmp_path / "out", + product="spectrum", + name="Test spectrum", + description="A test spectrum from Parquet", + timestamp="2026-02-25T12:00:00+00:00", + ) + with h5py.File(result, "r") as f: + assert "counts" in f + counts = f["counts"][:] + assert counts.shape == (4,) + np.testing.assert_array_almost_equal(counts, [10, 25, 18, 7]) + + +# --------------------------------------------------------------------------- +# Listmode ingest +# --------------------------------------------------------------------------- + + +class TestIngestListmode: + """Ingest listmode Parquet produces valid fd5 file.""" + + def test_returns_path( + self, loader: ParquetLoader, listmode_parquet: Path, tmp_path: Path + ): + result = loader.ingest( + listmode_parquet, + tmp_path / "out", + product="listmode", + name="Test listmode", + description="Listmode events from Parquet", + timestamp="2026-02-25T12:00:00+00:00", + ) + assert isinstance(result, Path) + assert result.exists() + + def test_event_data( + self, loader: ParquetLoader, listmode_parquet: Path, tmp_path: Path + ): + result = loader.ingest( + listmode_parquet, + tmp_path / "out", + product="listmode", + name="Test listmode", + description="Listmode events from Parquet", + timestamp="2026-02-25T12:00:00+00:00", + ) + with h5py.File(result, "r") as f: + assert f.attrs["product"] == "listmode" + assert "raw_data" in f + assert "time" in f["raw_data"] + assert "energy" in f["raw_data"] + assert "detector_id" in f["raw_data"] + + +# --------------------------------------------------------------------------- +# Device data ingest +# --------------------------------------------------------------------------- + + +class TestIngestDeviceData: + """Ingest device_data Parquet produces valid fd5 file.""" + + def test_returns_path( + self, loader: ParquetLoader, device_data_parquet: Path, tmp_path: Path + ): + result = loader.ingest( + device_data_parquet, + tmp_path / "out", + product="device_data", + name="Temp log", + description="Temperature logger data", + timestamp="2026-02-25T12:00:00+00:00", + device_type="environmental_sensor", + device_model="TempSensor-100", + ) + assert isinstance(result, Path) + assert result.exists() + + def test_device_data_attrs( + self, loader: ParquetLoader, device_data_parquet: Path, tmp_path: Path + ): + result = loader.ingest( + device_data_parquet, + tmp_path / "out", + product="device_data", + name="Temp log", + description="Temperature logger data", + timestamp="2026-02-25T12:00:00+00:00", + device_type="environmental_sensor", + device_model="TempSensor-100", + ) + with h5py.File(result, "r") as f: + assert f.attrs["product"] == "device_data" + + +# --------------------------------------------------------------------------- +# Column mapping +# --------------------------------------------------------------------------- + + +class TestColumnMapping: + """Column mapping configurable via column_map parameter.""" + + def test_explicit_column_map( + self, loader: ParquetLoader, generic_parquet: Path, tmp_path: Path + ): + result = loader.ingest( + generic_parquet, + tmp_path / "out", + product="spectrum", + name="Mapped spectrum", + description="Spectrum with explicit column mapping", + timestamp="2026-02-25T12:00:00+00:00", + column_map={"counts": "y", "energy": "x"}, + ) + with h5py.File(result, "r") as f: + assert "counts" in f + counts = f["counts"][:] + np.testing.assert_array_almost_equal(counts, [4.0, 5.0, 6.0]) + + +# --------------------------------------------------------------------------- +# Parquet schema metadata preserved as fd5 attrs +# --------------------------------------------------------------------------- + + +class TestParquetMetadata: + """Parquet key-value footer metadata mapped to fd5 attrs.""" + + def test_metadata_preserved( + self, loader: ParquetLoader, metadata_parquet: Path, tmp_path: Path + ): + result = loader.ingest( + metadata_parquet, + tmp_path / "out", + product="spectrum", + name="Metadata test", + description="Testing Parquet metadata extraction", + timestamp="2026-02-25T12:00:00+00:00", + ) + with h5py.File(result, "r") as f: + assert "metadata" in f + meta = f["metadata"] + assert meta.attrs["units"] == "keV" + assert meta.attrs["detector"] == "HPGe" + assert meta.attrs["facility"] == "PSI" + + def test_schema_types_extracted( + self, loader: ParquetLoader, int_columns_parquet: Path, tmp_path: Path + ): + """Parquet column types are used (int columns stay numeric).""" + result = loader.ingest( + int_columns_parquet, + tmp_path / "out", + product="spectrum", + name="Int columns", + description="Integer column types", + timestamp="2026-02-25T12:00:00+00:00", + column_map={"counts": "counts", "energy": "channel"}, + ) + with h5py.File(result, "r") as f: + assert "counts" in f + counts = f["counts"][:] + np.testing.assert_array_almost_equal(counts, [100, 250, 50, 10]) + + +# --------------------------------------------------------------------------- +# Provenance +# --------------------------------------------------------------------------- + + +class TestProvenance: + """Provenance records source Parquet SHA-256.""" + + def test_provenance_original_files( + self, loader: ParquetLoader, spectrum_parquet: Path, tmp_path: Path + ): + result = loader.ingest( + spectrum_parquet, + tmp_path / "out", + product="spectrum", + name="Provenance test", + description="Testing provenance recording", + timestamp="2026-02-25T12:00:00+00:00", + ) + with h5py.File(result, "r") as f: + assert "provenance" in f + assert "original_files" in f["provenance"] + orig = f["provenance/original_files"] + assert orig.shape[0] >= 1 + rec = orig[0] + sha = rec["sha256"] + if isinstance(sha, bytes): + sha = sha.decode() + assert sha.startswith("sha256:") + + def test_provenance_sha256_correct( + self, loader: ParquetLoader, spectrum_parquet: Path, tmp_path: Path + ): + expected_hash = hashlib.sha256(spectrum_parquet.read_bytes()).hexdigest() + result = loader.ingest( + spectrum_parquet, + tmp_path / "out", + product="spectrum", + name="SHA test", + description="Verify SHA-256 hash", + timestamp="2026-02-25T12:00:00+00:00", + ) + with h5py.File(result, "r") as f: + rec = f["provenance/original_files"][0] + sha = rec["sha256"] + if isinstance(sha, bytes): + sha = sha.decode() + assert sha == f"sha256:{expected_hash}" + + def test_provenance_ingest_group( + self, loader: ParquetLoader, spectrum_parquet: Path, tmp_path: Path + ): + result = loader.ingest( + spectrum_parquet, + tmp_path / "out", + product="spectrum", + name="Ingest prov", + description="Testing ingest provenance", + timestamp="2026-02-25T12:00:00+00:00", + ) + with h5py.File(result, "r") as f: + assert "provenance/ingest" in f + ingest = f["provenance/ingest"] + assert "tool" in ingest.attrs + + +# --------------------------------------------------------------------------- +# Edge cases +# --------------------------------------------------------------------------- + + +class TestEdgeCases: + """Edge cases: missing file, empty table, string source path.""" + + def test_nonexistent_file_raises(self, loader: ParquetLoader, tmp_path: Path): + with pytest.raises(FileNotFoundError): + loader.ingest( + tmp_path / "no_such_file.parquet", + tmp_path / "out", + product="spectrum", + name="Missing", + description="Missing file", + timestamp="2026-02-25T12:00:00+00:00", + ) + + def test_empty_table_raises(self, loader: ParquetLoader, tmp_path: Path): + empty_path = tmp_path / "empty.parquet" + table = pa.table({"energy": pa.array([], type=pa.float64())}) + pq.write_table(table, empty_path) + with pytest.raises(ValueError, match="[Nn]o data"): + loader.ingest( + empty_path, + tmp_path / "out", + product="spectrum", + name="Empty", + description="Empty data", + timestamp="2026-02-25T12:00:00+00:00", + ) + + def test_string_source_path( + self, loader: ParquetLoader, spectrum_parquet: Path, tmp_path: Path + ): + """Source can be a str, not just Path.""" + result = loader.ingest( + str(spectrum_parquet), + tmp_path / "out", + product="spectrum", + name="String path", + description="Source as str", + timestamp="2026-02-25T12:00:00+00:00", + ) + assert result.exists() + + +# --------------------------------------------------------------------------- +# ImportError guard +# --------------------------------------------------------------------------- + + +class TestImportGuard: + """Clear ImportError when pyarrow not installed.""" + + def test_module_docstring_mentions_pyarrow(self): + import fd5.ingest.parquet as mod + + assert ( + "pyarrow" in (mod.__doc__ or "").lower() + or "parquet" in (mod.__doc__ or "").lower() + )