diff --git a/pyproject.toml b/pyproject.toml index 3820f5d..921e089 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,8 +31,11 @@ science = [ "pandas>=2.2", "matplotlib>=3.9", ] +nifti = [ + "nibabel>=5.0", +] all = [ - "fd5[dev,science]", + "fd5[dev,science,nifti]", ] [build-system] diff --git a/src/fd5/ingest/__init__.py b/src/fd5/ingest/__init__.py new file mode 100644 index 0000000..65eb2c8 --- /dev/null +++ b/src/fd5/ingest/__init__.py @@ -0,0 +1,5 @@ +"""fd5.ingest — loader protocol and shared ingest helpers.""" + +from fd5.ingest._base import Loader, discover_loaders, hash_source_files + +__all__ = ["Loader", "discover_loaders", "hash_source_files"] diff --git a/src/fd5/ingest/_base.py b/src/fd5/ingest/_base.py new file mode 100644 index 0000000..9c79ce0 --- /dev/null +++ b/src/fd5/ingest/_base.py @@ -0,0 +1,102 @@ +"""fd5.ingest._base — Loader protocol and shared ingest helpers. + +Defines the interface all format-specific loaders must implement and +provides utility functions for source-file hashing and loader discovery. +""" + +from __future__ import annotations + +import hashlib +import importlib.metadata +from collections.abc import Iterable +from pathlib import Path +from typing import Any, Protocol, runtime_checkable + +from fd5._types import Fd5Path + +_READ_CHUNK = 1024 * 1024 # 1 MiB + +_EP_GROUP = "fd5.loaders" + + +@runtime_checkable +class Loader(Protocol): + """Protocol that all fd5 ingest loaders must satisfy.""" + + @property + def supported_product_types(self) -> list[str]: + """Product types this loader can produce (e.g. ``['recon', 'listmode']``).""" + ... + + def ingest( + self, + source: Path | str, + output_dir: Path, + *, + product: str, + name: str, + description: str, + timestamp: str | None = None, + **kwargs: Any, + ) -> Fd5Path: + """Read source data and produce a sealed fd5 file.""" + ... + + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + + +def hash_source_files(paths: Iterable[Path]) -> list[dict[str, Any]]: + """Hash source files for ``provenance/original_files`` records. + + Returns a list of dicts with keys ``path``, ``sha256``, and + ``size_bytes`` — matching the schema expected by + :func:`fd5.provenance.write_original_files`. + """ + records: list[dict[str, Any]] = [] + for p in paths: + p = Path(p) + h = hashlib.sha256() + size = 0 + with p.open("rb") as fh: + while chunk := fh.read(_READ_CHUNK): + h.update(chunk) + size += len(chunk) + records.append( + { + "path": str(p), + "sha256": f"sha256:{h.hexdigest()}", + "size_bytes": size, + } + ) + return records + + +def _load_loader_entry_points() -> dict[str, Any]: + """Load callables from the ``fd5.loaders`` entry-point group.""" + factories: dict[str, Any] = {} + for ep in importlib.metadata.entry_points(group=_EP_GROUP): + factories[ep.name] = ep.load() + return factories + + +def discover_loaders() -> dict[str, Loader]: + """Discover available loaders based on installed optional deps. + + Iterates over entry points in the ``fd5.loaders`` group. Each entry + point should be a callable returning a :class:`Loader` instance. + Loaders whose dependencies are missing (``ImportError``) are silently + skipped. + """ + factories = _load_loader_entry_points() + loaders: dict[str, Loader] = {} + for name, factory in factories.items(): + try: + loader = factory() + except ImportError: + continue + if isinstance(loader, Loader): + loaders[name] = loader + return loaders diff --git a/src/fd5/ingest/csv.py b/src/fd5/ingest/csv.py new file mode 100644 index 0000000..0adba53 --- /dev/null +++ b/src/fd5/ingest/csv.py @@ -0,0 +1,402 @@ +"""fd5.ingest.csv — CSV/TSV tabular data loader. + +Reads CSV/TSV files and produces sealed fd5 files targeting tabular +scientific data: spectra, calibration curves, time series, device logs. +Uses stdlib ``csv`` and ``numpy`` — no pandas dependency required. +""" + +from __future__ import annotations + +import csv as csv_mod +import io +import re +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import numpy as np + +from fd5._types import Fd5Path +from fd5.create import create +from fd5.ingest._base import hash_source_files + +__version__ = "0.1.0" + +_COMMENT_META_RE = re.compile(r"^\s*(\w[\w\s]*\w|\w+)\s*:\s*(.+)\s*$") + +_SPECTRUM_COUNTS_ALIASES = frozenset({"counts", "count", "intensity", "rate", "y"}) +_SPECTRUM_ENERGY_ALIASES = frozenset( + {"energy", "channel", "bin", "x", "wavelength", "frequency"} +) + + +class CsvLoader: + """Loader that reads CSV/TSV files and produces sealed fd5 files.""" + + @property + def supported_product_types(self) -> list[str]: + return ["spectrum", "calibration", "device_data"] + + def ingest( + self, + source: Path | str, + output_dir: Path, + *, + product: str, + name: str, + description: str, + timestamp: str | None = None, + column_map: dict[str, str] | None = None, + delimiter: str = ",", + header_row: int = 0, + comment: str = "#", + **kwargs: Any, + ) -> Fd5Path: + """Read a CSV/TSV file and produce a sealed fd5 file.""" + source = Path(source) + if not source.exists(): + raise FileNotFoundError(f"Source file not found: {source}") + + ts = timestamp or datetime.now(tz=timezone.utc).isoformat() + + comment_meta = _extract_comment_metadata(source, comment) + headers, rows = _read_csv(source, delimiter, header_row, comment) + + if len(rows) == 0: + raise ValueError(f"No data rows found in {source}") + + columns = _parse_columns(headers, rows) + + file_records = hash_source_files([source]) + + writer = _PRODUCT_WRITERS.get(product, _write_spectrum) + return writer( + source=source, + output_dir=output_dir, + product=product, + name=name, + description=description, + timestamp=ts, + columns=columns, + headers=headers, + column_map=column_map, + comment_meta=comment_meta, + file_records=file_records, + **kwargs, + ) + + +# --------------------------------------------------------------------------- +# CSV parsing helpers +# --------------------------------------------------------------------------- + + +def _extract_comment_metadata(path: Path, comment: str) -> dict[str, str]: + """Parse ``# key: value`` lines from the top of the file.""" + meta: dict[str, str] = {} + with path.open() as fh: + for line in fh: + stripped = line.strip() + if not stripped.startswith(comment): + break + content = stripped[len(comment) :].strip() + m = _COMMENT_META_RE.match(content) + if m: + meta[m.group(1).strip()] = m.group(2).strip() + return meta + + +def _read_csv( + path: Path, + delimiter: str, + header_row: int, + comment: str, +) -> tuple[list[str], list[list[str]]]: + """Read CSV, skipping comment lines, returning headers and data rows.""" + with path.open(newline="") as fh: + lines = fh.readlines() + + non_comment = [line for line in lines if not line.strip().startswith(comment)] + + if header_row >= len(non_comment): + raise ValueError( + f"header_row={header_row} but only {len(non_comment)} non-comment lines" + ) + + reader = csv_mod.reader( + io.StringIO("".join(non_comment[header_row:])), + delimiter=delimiter, + ) + all_rows = list(reader) + if not all_rows: + return [], [] + + headers = [h.strip() for h in all_rows[0]] + data_rows = [row for row in all_rows[1:] if any(cell.strip() for cell in row)] + return headers, data_rows + + +def _parse_columns( + headers: list[str], rows: list[list[str]] +) -> dict[str, np.ndarray | list[str]]: + """Parse columns, inferring numeric vs string type per column.""" + columns: dict[str, np.ndarray | list[str]] = {} + for i, header in enumerate(headers): + raw = [row[i].strip() if i < len(row) else "" for row in rows] + try: + arr = np.array([float(v) for v in raw], dtype=np.float64) + columns[header] = arr + except ValueError: + columns[header] = raw + return columns + + +# --------------------------------------------------------------------------- +# Shared writer helper +# --------------------------------------------------------------------------- + + +def _find_output_file(output_dir: Path) -> Fd5Path: + """Find the sealed fd5 file in *output_dir* after create() exits.""" + files = sorted(output_dir.glob("*.h5"), key=lambda p: p.stat().st_mtime) + return files[-1] + + +# --------------------------------------------------------------------------- +# Product-specific writers +# --------------------------------------------------------------------------- + + +def _resolve_column( + columns: dict[str, Any], + column_map: dict[str, str] | None, + target_key: str, + aliases: frozenset[str], +) -> str | None: + """Find the source column name for *target_key* using mapping or aliases.""" + if column_map and target_key in column_map: + mapped = column_map[target_key] + if mapped in columns: + return mapped + for alias in aliases: + if alias in columns: + return alias + return None + + +def _write_spectrum( + *, + source: Path, + output_dir: Path, + product: str, + name: str, + description: str, + timestamp: str, + columns: dict[str, Any], + headers: list[str], + column_map: dict[str, str] | None, + comment_meta: dict[str, str], + file_records: list[dict[str, Any]], + **kwargs: Any, +) -> Fd5Path: + """Write spectrum product from CSV columns.""" + counts_col = _resolve_column( + columns, column_map, "counts", _SPECTRUM_COUNTS_ALIASES + ) + energy_col = _resolve_column( + columns, column_map, "energy", _SPECTRUM_ENERGY_ALIASES + ) + + if counts_col is None: + raise ValueError( + f"Cannot find counts column. Available: {list(columns.keys())}" + ) + + counts = np.asarray(columns[counts_col], dtype=np.float32) + + axes = [] + if energy_col is not None: + energy_vals = np.asarray(columns[energy_col], dtype=np.float64) + units = comment_meta.get("units", "arb") + half_step = 0.0 + if len(energy_vals) > 1: + half_step = (energy_vals[1] - energy_vals[0]) / 2.0 + bin_edges = np.append(energy_vals - half_step, energy_vals[-1] + half_step) + axes.append( + { + "label": energy_col, + "units": units, + "unitSI": 1.0, + "bin_edges": bin_edges, + "description": f"{energy_col} axis", + } + ) + + product_data: dict[str, Any] = {"counts": counts} + if axes: + product_data["axes"] = axes + + with create( + output_dir, + product=product, + name=name, + description=description, + timestamp=timestamp, + ) as builder: + builder.write_product(product_data) + + if comment_meta: + builder.write_metadata(comment_meta) + + builder.write_provenance( + original_files=file_records, + ingest_tool="fd5.ingest.csv", + ingest_version=__version__, + ingest_timestamp=timestamp, + ) + + return _find_output_file(output_dir) + + +def _write_calibration( + *, + source: Path, + output_dir: Path, + product: str, + name: str, + description: str, + timestamp: str, + columns: dict[str, Any], + headers: list[str], + column_map: dict[str, str] | None, + comment_meta: dict[str, str], + file_records: list[dict[str, Any]], + calibration_type: str = "energy_calibration", + scanner_model: str = "unknown", + scanner_serial: str = "unknown", + valid_from: str = "", + valid_until: str = "indefinite", + **kwargs: Any, +) -> Fd5Path: + """Write calibration product from CSV columns.""" + product_data: dict[str, Any] = { + "calibration_type": calibration_type, + "scanner_model": scanner_model, + "scanner_serial": scanner_serial, + "valid_from": valid_from, + "valid_until": valid_until, + } + + if calibration_type == "energy_calibration" and "input" in columns: + product_data["channel_to_energy"] = np.asarray( + columns["input"], dtype=np.float64 + ) + + with create( + output_dir, + product=product, + name=name, + description=description, + timestamp=timestamp, + ) as builder: + builder.write_product(product_data) + + if comment_meta: + builder.write_metadata(comment_meta) + + builder.write_provenance( + original_files=file_records, + ingest_tool="fd5.ingest.csv", + ingest_version=__version__, + ingest_timestamp=timestamp, + ) + + return _find_output_file(output_dir) + + +def _write_device_data( + *, + source: Path, + output_dir: Path, + product: str, + name: str, + description: str, + timestamp: str, + columns: dict[str, Any], + headers: list[str], + column_map: dict[str, str] | None, + comment_meta: dict[str, str], + file_records: list[dict[str, Any]], + device_type: str = "environmental_sensor", + device_model: str = "unknown", + **kwargs: Any, +) -> Fd5Path: + """Write device_data product from CSV/TSV columns.""" + time_col = _resolve_column( + columns, + column_map, + "timestamp", + frozenset({"timestamp", "time", "t", "elapsed"}), + ) + signal_cols = [ + h + for h in headers + if h != (time_col or "") and isinstance(columns.get(h), np.ndarray) + ] + + time_arr = ( + np.asarray(columns[time_col], dtype=np.float64) + if time_col + else np.arange(len(next(iter(columns.values()))), dtype=np.float64) + ) + + duration = float(time_arr[-1] - time_arr[0]) if len(time_arr) > 1 else 0.0 + + channels: dict[str, dict[str, Any]] = {} + for col_name in signal_cols: + signal = np.asarray(columns[col_name], dtype=np.float64) + sampling_rate = len(signal) / max(duration, 1.0) + channels[col_name] = { + "signal": signal, + "time": time_arr, + "sampling_rate": sampling_rate, + "units": comment_meta.get("units", "arb"), + "unitSI": 1.0, + "description": f"{col_name} channel", + } + + product_data: dict[str, Any] = { + "device_type": device_type, + "device_model": device_model, + "recording_start": timestamp, + "recording_duration": duration, + "channels": channels, + } + + with create( + output_dir, + product=product, + name=name, + description=description, + timestamp=timestamp, + ) as builder: + builder.write_product(product_data) + + if comment_meta: + builder.write_metadata(comment_meta) + + builder.write_provenance( + original_files=file_records, + ingest_tool="fd5.ingest.csv", + ingest_version=__version__, + ingest_timestamp=timestamp, + ) + + return _find_output_file(output_dir) + + +_PRODUCT_WRITERS = { + "spectrum": _write_spectrum, + "calibration": _write_calibration, + "device_data": _write_device_data, +} diff --git a/src/fd5/ingest/metadata.py b/src/fd5/ingest/metadata.py new file mode 100644 index 0000000..5748049 --- /dev/null +++ b/src/fd5/ingest/metadata.py @@ -0,0 +1,151 @@ +"""fd5.ingest.metadata — RO-Crate and DataCite metadata import. + +Reads existing metadata files (RO-Crate JSON-LD, DataCite YAML, or +generic structured metadata) and returns dicts suitable for +:meth:`fd5.create.Fd5Builder.write_study`. + +This is the *inverse* of :mod:`fd5.rocrate` and :mod:`fd5.datacite` +exports: instead of generating metadata from fd5 files, we consume +external metadata to populate fd5 files during ingest. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +import yaml + + +def load_rocrate_metadata(rocrate_path: Path) -> dict[str, Any]: + """Extract fd5-compatible study metadata from an RO-Crate JSON-LD file. + + Returns a dict with possible keys: ``name``, ``license``, + ``description``, ``creators``. Missing fields in the source are + omitted (no ``KeyError``). + """ + rocrate_path = Path(rocrate_path) + crate = json.loads(rocrate_path.read_text(encoding="utf-8")) + + dataset = _find_rocrate_dataset(crate) + if dataset is None: + return {} + + result: dict[str, Any] = {} + + if "name" in dataset: + result["name"] = dataset["name"] + if "license" in dataset: + result["license"] = dataset["license"] + if "description" in dataset: + result["description"] = dataset["description"] + + creators = _extract_rocrate_creators(dataset) + if creators: + result["creators"] = creators + + return result + + +def load_datacite_metadata(datacite_path: Path) -> dict[str, Any]: + """Extract fd5-compatible study metadata from a DataCite YAML file. + + Returns a dict with possible keys: ``name``, ``creators``, + ``dates``, ``subjects``. Missing fields in the source are omitted. + """ + datacite_path = Path(datacite_path) + data = yaml.safe_load(datacite_path.read_text(encoding="utf-8")) + if not isinstance(data, dict): + return {} + + result: dict[str, Any] = {} + + if "title" in data: + result["name"] = data["title"] + + creators = data.get("creators") + if creators: + result["creators"] = [_normalise_datacite_creator(c) for c in creators] + + dates = data.get("dates") + if dates: + result["dates"] = dates + + subjects = data.get("subjects") + if subjects: + result["subjects"] = subjects + + return result + + +def load_metadata(path: Path) -> dict[str, Any]: + """Auto-detect metadata format and extract fd5-compatible metadata. + + Detection is filename-based: + - ``ro-crate-metadata.json`` → RO-Crate + - ``datacite.yml`` / ``datacite.yaml`` → DataCite + - other ``.json`` → generic JSON pass-through + - other ``.yml`` / ``.yaml`` → generic YAML pass-through + + Raises :class:`ValueError` for unsupported extensions and + :class:`FileNotFoundError` for missing files. + """ + path = Path(path) + if not path.exists(): + raise FileNotFoundError(path) + + if path.name == "ro-crate-metadata.json": + return load_rocrate_metadata(path) + + if path.name in {"datacite.yml", "datacite.yaml"}: + return load_datacite_metadata(path) + + suffix = path.suffix.lower() + if suffix == ".json": + return json.loads(path.read_text(encoding="utf-8")) + if suffix in {".yml", ".yaml"}: + return yaml.safe_load(path.read_text(encoding="utf-8")) + + msg = f"Unsupported metadata format: {path.name}" + raise ValueError(msg) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _find_rocrate_dataset(crate: dict[str, Any]) -> dict[str, Any] | None: + """Return the root Dataset entity from an RO-Crate ``@graph``.""" + for entity in crate.get("@graph", []): + if entity.get("@id") == "./" and entity.get("@type") == "Dataset": + return entity + return None + + +def _extract_rocrate_creators( + dataset: dict[str, Any], +) -> list[dict[str, Any]]: + """Convert RO-Crate ``author`` Person entities to fd5 creator dicts.""" + authors = dataset.get("author") + if not authors: + return [] + + creators: list[dict[str, Any]] = [] + for person in authors: + creator: dict[str, Any] = {"name": person["name"]} + if "affiliation" in person: + creator["affiliation"] = person["affiliation"] + if "@id" in person and person["@id"].startswith("https://orcid.org/"): + creator["orcid"] = person["@id"] + creators.append(creator) + return creators + + +def _normalise_datacite_creator(raw: dict[str, Any]) -> dict[str, Any]: + """Normalise a single DataCite creator entry.""" + result: dict[str, Any] = {"name": raw["name"]} + if "affiliation" in raw: + result["affiliation"] = raw["affiliation"] + return result diff --git a/src/fd5/ingest/nifti.py b/src/fd5/ingest/nifti.py new file mode 100644 index 0000000..5bb2224 --- /dev/null +++ b/src/fd5/ingest/nifti.py @@ -0,0 +1,173 @@ +"""fd5.ingest.nifti — NIfTI loader for fd5. + +Reads NIfTI-1 / NIfTI-2 files (``.nii``, ``.nii.gz``) via *nibabel* and +produces sealed fd5 ``recon`` files using ``fd5.create()``. +""" + +from __future__ import annotations + +try: + import nibabel as nib +except ImportError as exc: + raise ImportError( + "nibabel is required for NIfTI ingest. " + "Install it with: pip install 'fd5[nifti]'" + ) from exc + +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import numpy as np + +from fd5._types import Fd5Path +from fd5.create import create +from fd5.ingest._base import hash_source_files + +__all__ = ["NiftiLoader", "ingest_nifti"] + +_INGEST_TOOL = "fd5.ingest.nifti" +_INGEST_VERSION = "0.1.0" + + +def _get_affine(img: nib.spatialimages.SpatialImage) -> np.ndarray: + """Extract the best available affine (sform preferred, then qform).""" + header = img.header + if hasattr(header, "get_sform") and header["sform_code"] > 0: + return header.get_sform().astype(np.float64) + if hasattr(header, "get_qform") and header["qform_code"] > 0: + return header.get_qform().astype(np.float64) + return img.affine.astype(np.float64) + + +def _dimension_order(ndim: int) -> str: + """Map array dimensionality to fd5 dimension_order string.""" + if ndim == 3: + return "ZYX" + if ndim == 4: + return "TZYX" + return "".join(["D"] * (ndim - 3)) + "ZYX" + + +def ingest_nifti( + nifti_path: Path | str, + output_dir: Path | str, + *, + product: str = "recon", + name: str, + description: str, + timestamp: str | None = None, + reference_frame: str = "RAS", + study_metadata: dict[str, Any] | None = None, +) -> Fd5Path: + """Read a NIfTI file and produce a sealed fd5 ``recon`` file. + + Parameters + ---------- + nifti_path: + Path to ``.nii`` or ``.nii.gz`` file. + output_dir: + Directory where the sealed fd5 file will be written. + product: + fd5 product type (default ``"recon"``). + name: + Human-readable name for the dataset. + description: + Description of the dataset. + timestamp: + ISO-8601 timestamp; auto-generated if *None*. + reference_frame: + Spatial reference frame (default ``"RAS"``). + study_metadata: + Optional dict with ``study_type``, ``license``, ``description``, + and optionally ``creators`` for the study group. + + Returns + ------- + Path to the sealed fd5 file. + """ + nifti_path = Path(nifti_path) + output_dir = Path(output_dir) + + if not nifti_path.exists(): + raise FileNotFoundError(f"NIfTI file not found: {nifti_path}") + + img = nib.load(nifti_path) + volume = np.asarray(img.dataobj, dtype=np.float32) + affine = _get_affine(img) + dim_order = _dimension_order(volume.ndim) + + if timestamp is None: + timestamp = datetime.now(tz=timezone.utc).isoformat() + + ingest_ts = datetime.now(tz=timezone.utc).isoformat() + original_files = hash_source_files([nifti_path]) + + existing = set(output_dir.glob("*.h5")) if output_dir.exists() else set() + + with create( + output_dir, + product=product, + name=name, + description=description, + timestamp=timestamp, + ) as builder: + builder.file.attrs["scanner"] = "nifti-import" + builder.file.attrs["vendor_series_id"] = str(nifti_path.name) + + builder.write_product( + { + "volume": volume, + "affine": affine, + "dimension_order": dim_order, + "reference_frame": reference_frame, + "description": description, + } + ) + + builder.write_provenance( + original_files=original_files, + ingest_tool=_INGEST_TOOL, + ingest_version=_INGEST_VERSION, + ingest_timestamp=ingest_ts, + ) + + if study_metadata: + builder.write_study( + study_type=study_metadata["study_type"], + license=study_metadata["license"], + description=study_metadata.get("description", description), + creators=study_metadata.get("creators"), + ) + + new_files = set(output_dir.glob("*.h5")) - existing + return next(iter(new_files)) + + +class NiftiLoader: + """Loader implementation for NIfTI files.""" + + @property + def supported_product_types(self) -> list[str]: + return ["recon"] + + def ingest( + self, + source: Path | str, + output_dir: Path, + *, + product: str = "recon", + name: str, + description: str, + timestamp: str | None = None, + **kwargs: Any, + ) -> Fd5Path: + return ingest_nifti( + source, + output_dir, + product=product, + name=name, + description=description, + timestamp=timestamp, + **kwargs, + ) diff --git a/src/fd5/ingest/raw.py b/src/fd5/ingest/raw.py new file mode 100644 index 0000000..f1033dc --- /dev/null +++ b/src/fd5/ingest/raw.py @@ -0,0 +1,186 @@ +"""fd5.ingest.raw — raw/numpy array loader. + +Wraps raw numpy arrays or binary files into sealed fd5 files. +Serves as the reference Loader implementation and fallback when +no format-specific loader is needed. +""" + +from __future__ import annotations + +import importlib.metadata +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import numpy as np + +from fd5._types import Fd5Path +from fd5.create import create +from fd5.ingest._base import hash_source_files +from fd5.registry import list_schemas + +__all__ = ["RawLoader", "ingest_array", "ingest_binary"] + +_INGEST_TOOL = "fd5.ingest.raw" + + +def _fd5_version() -> str: + try: + return importlib.metadata.version("fd5") + except importlib.metadata.PackageNotFoundError: + return "0.0.0" + + +def ingest_array( + data: dict[str, Any], + output_dir: Path, + *, + product: str, + name: str, + description: str, + timestamp: str | None = None, + metadata: dict[str, Any] | None = None, + study_metadata: dict[str, Any] | None = None, + sources: list[dict[str, Any]] | None = None, +) -> Fd5Path: + """Wrap a data dict into a sealed fd5 file. + + The data dict is passed directly to the product schema's ``write()`` method. + + Returns: + Path to the sealed fd5 file. + + Raises: + ValueError: If *product* is not a registered product type. + """ + if timestamp is None: + timestamp = datetime.now(tz=timezone.utc).isoformat() + + output_dir = Path(output_dir) + + with create( + output_dir, + product=product, + name=name, + description=description, + timestamp=timestamp, + ) as builder: + builder.write_product(data) + + if metadata is not None: + builder.write_metadata(metadata) + + if sources is not None: + builder.write_sources(sources) + + if study_metadata is not None: + builder.write_study(**study_metadata) + + sealed_files = sorted(output_dir.glob("*.h5")) + return sealed_files[-1] + + +def ingest_binary( + binary_path: Path, + output_dir: Path, + *, + dtype: str, + shape: tuple[int, ...], + product: str, + name: str, + description: str, + timestamp: str | None = None, + **kwargs: Any, +) -> Fd5Path: + """Read a raw binary file, reshape, and produce a sealed fd5 file. + + The binary data is read via ``numpy.fromfile`` and reshaped to *shape*. + Provenance records the source file's SHA-256. + + Additional keyword arguments are merged into the data dict passed to + the product schema's ``write()`` method. + + Returns: + Path to the sealed fd5 file. + + Raises: + FileNotFoundError: If *binary_path* does not exist. + ValueError: If the file size does not match *dtype* × *shape*. + """ + binary_path = Path(binary_path) + if not binary_path.exists(): + raise FileNotFoundError(f"Binary file not found: {binary_path}") + + raw = np.fromfile(binary_path, dtype=dtype) + expected_size = 1 + for s in shape: + expected_size *= s + if raw.size != expected_size: + msg = f"cannot reshape array of size {raw.size} into shape {shape}" + raise ValueError(msg) + + arr = raw.reshape(shape) + + prov_records = hash_source_files([binary_path]) + + if timestamp is None: + timestamp = datetime.now(tz=timezone.utc).isoformat() + + data: dict[str, Any] = {"volume": arr, "description": description} + data.update(kwargs) + + output_dir = Path(output_dir) + + with create( + output_dir, + product=product, + name=name, + description=description, + timestamp=timestamp, + ) as builder: + builder.write_product(data) + builder.write_provenance( + original_files=prov_records, + ingest_tool=_INGEST_TOOL, + ingest_version=_fd5_version(), + ingest_timestamp=timestamp, + ) + + sealed_files = sorted(output_dir.glob("*.h5")) + return sealed_files[-1] + + +class RawLoader: + """Loader implementation for raw numpy arrays and binary files. + + Satisfies the :class:`~fd5.ingest._base.Loader` protocol. + """ + + @property + def supported_product_types(self) -> list[str]: + return list_schemas() + + def ingest( + self, + source: Path | str, + output_dir: Path, + *, + product: str, + name: str, + description: str, + timestamp: str | None = None, + **kwargs: Any, + ) -> Fd5Path: + """Read a binary source file and produce a sealed fd5 file. + + Requires ``dtype`` and ``shape`` in *kwargs*. + """ + return ingest_binary( + Path(source), + output_dir, + product=product, + name=name, + description=description, + timestamp=timestamp, + **kwargs, + ) diff --git a/tests/test_ingest_base.py b/tests/test_ingest_base.py new file mode 100644 index 0000000..80a17bf --- /dev/null +++ b/tests/test_ingest_base.py @@ -0,0 +1,261 @@ +"""Tests for fd5.ingest._base — Loader protocol and shared helpers.""" + +from __future__ import annotations + +import hashlib +from pathlib import Path +from typing import Any + +import pytest + +from fd5._types import Fd5Path +from fd5.ingest._base import ( + Loader, + _load_loader_entry_points, + discover_loaders, + hash_source_files, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +class _ValidLoader: + """Minimal concrete class satisfying the Loader protocol.""" + + @property + def supported_product_types(self) -> list[str]: + return ["recon"] + + def ingest( + self, + source: Path | str, + output_dir: Path, + *, + product: str, + name: str, + description: str, + timestamp: str | None = None, + **kwargs: Any, + ) -> Fd5Path: + return output_dir / "out.h5" + + +class _MissingIngest: + """Has supported_product_types but no ingest method.""" + + @property + def supported_product_types(self) -> list[str]: + return ["recon"] + + +class _MissingProductTypes: + """Has ingest but no supported_product_types.""" + + def ingest( + self, + source: Path | str, + output_dir: Path, + *, + product: str, + name: str, + description: str, + timestamp: str | None = None, + **kwargs: Any, + ) -> Fd5Path: + return output_dir / "out.h5" + + +# --------------------------------------------------------------------------- +# Loader protocol +# --------------------------------------------------------------------------- + + +class TestLoaderProtocol: + """Loader is a runtime_checkable Protocol.""" + + def test_valid_loader_is_instance(self): + assert isinstance(_ValidLoader(), Loader) + + def test_missing_ingest_not_instance(self): + assert not isinstance(_MissingIngest(), Loader) + + def test_missing_product_types_not_instance(self): + assert not isinstance(_MissingProductTypes(), Loader) + + def test_protocol_requires_supported_product_types(self): + import inspect + + members = { + name for name, _ in inspect.getmembers(Loader) if not name.startswith("_") + } + attrs = set(Loader.__protocol_attrs__) + assert (members | attrs) >= {"supported_product_types", "ingest"} + + def test_plain_object_not_instance(self): + assert not isinstance(object(), Loader) + + +# --------------------------------------------------------------------------- +# hash_source_files +# --------------------------------------------------------------------------- + + +class TestHashSourceFiles: + """hash_source_files computes SHA-256 + size for provenance records.""" + + def test_single_file(self, tmp_path: Path): + p = tmp_path / "data.bin" + content = b"hello world" + p.write_bytes(content) + + result = hash_source_files([p]) + + assert len(result) == 1 + rec = result[0] + assert rec["path"] == str(p) + assert rec["sha256"] == f"sha256:{hashlib.sha256(content).hexdigest()}" + assert rec["size_bytes"] == len(content) + + def test_multiple_files(self, tmp_path: Path): + paths = [] + for i in range(3): + p = tmp_path / f"file_{i}.dat" + p.write_bytes(f"content-{i}".encode()) + paths.append(p) + + result = hash_source_files(paths) + assert len(result) == 3 + assert all(r["sha256"].startswith("sha256:") for r in result) + + def test_empty_iterable(self): + result = hash_source_files([]) + assert result == [] + + def test_large_file_chunked(self, tmp_path: Path): + """Hash must be correct even for files larger than a typical read buffer.""" + p = tmp_path / "large.bin" + data = b"x" * (2 * 1024 * 1024) + p.write_bytes(data) + + result = hash_source_files([p]) + expected = f"sha256:{hashlib.sha256(data).hexdigest()}" + assert result[0]["sha256"] == expected + + def test_record_keys(self, tmp_path: Path): + p = tmp_path / "keys.bin" + p.write_bytes(b"abc") + + rec = hash_source_files([p])[0] + assert set(rec.keys()) == {"path", "sha256", "size_bytes"} + + def test_size_bytes_is_int(self, tmp_path: Path): + p = tmp_path / "sz.bin" + p.write_bytes(b"12345") + + rec = hash_source_files([p])[0] + assert isinstance(rec["size_bytes"], int) + + def test_nonexistent_file_raises(self, tmp_path: Path): + missing = tmp_path / "no_such_file.bin" + with pytest.raises((FileNotFoundError, OSError)): + hash_source_files([missing]) + + +# --------------------------------------------------------------------------- +# discover_loaders +# --------------------------------------------------------------------------- + + +class TestDiscoverLoaders: + """discover_loaders returns loaders whose optional deps are installed.""" + + def test_returns_dict(self): + result = discover_loaders() + assert isinstance(result, dict) + + def test_values_satisfy_protocol(self): + for loader in discover_loaders().values(): + assert isinstance(loader, Loader) + + def test_keys_are_strings(self): + for key in discover_loaders(): + assert isinstance(key, str) + + def test_no_loaders_when_entry_points_empty(self, monkeypatch): + import fd5.ingest._base as base_mod + + monkeypatch.setattr( + base_mod, + "_load_loader_entry_points", + lambda: {}, + ) + result = discover_loaders() + assert result == {} + + def test_loader_with_missing_deps_excluded(self, monkeypatch): + """If a loader's entry point raises ImportError, it is skipped.""" + import fd5.ingest._base as base_mod + + def _fake_load(): + raise ImportError("numpy not installed") + + def _fake_eps(): + return {"broken": _fake_load} + + monkeypatch.setattr(base_mod, "_load_loader_entry_points", _fake_eps) + result = discover_loaders() + assert "broken" not in result + + def test_valid_loader_discovered(self, monkeypatch): + """A factory returning a valid Loader is included in the result.""" + import fd5.ingest._base as base_mod + + monkeypatch.setattr( + base_mod, + "_load_loader_entry_points", + lambda: {"good": _ValidLoader}, + ) + result = discover_loaders() + assert "good" in result + assert isinstance(result["good"], Loader) + + def test_non_loader_object_excluded(self, monkeypatch): + """If a factory returns something that isn't a Loader, skip it.""" + import fd5.ingest._base as base_mod + + monkeypatch.setattr( + base_mod, + "_load_loader_entry_points", + lambda: {"bad": lambda: object()}, + ) + result = discover_loaders() + assert "bad" not in result + + +class TestLoadLoaderEntryPoints: + """_load_loader_entry_points reads the fd5.loaders entry-point group.""" + + def test_returns_dict(self): + result = _load_loader_entry_points() + assert isinstance(result, dict) + + def test_loads_entry_point_callables(self, monkeypatch): + """Each entry point's .load() result is stored by name.""" + import importlib.metadata + from unittest.mock import MagicMock + + ep = MagicMock() + ep.name = "mock_loader" + ep.load.return_value = _ValidLoader + + monkeypatch.setattr( + importlib.metadata, + "entry_points", + lambda group: [ep] if group == "fd5.loaders" else [], + ) + result = _load_loader_entry_points() + assert "mock_loader" in result + assert result["mock_loader"] is _ValidLoader diff --git a/tests/test_ingest_csv.py b/tests/test_ingest_csv.py new file mode 100644 index 0000000..9146495 --- /dev/null +++ b/tests/test_ingest_csv.py @@ -0,0 +1,484 @@ +"""Tests for fd5.ingest.csv — CSV/TSV tabular data loader.""" + +from __future__ import annotations + +import hashlib +from pathlib import Path + +import h5py +import numpy as np +import pytest + +from fd5.ingest._base import Loader +from fd5.ingest.csv import CsvLoader + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def loader() -> CsvLoader: + return CsvLoader() + + +@pytest.fixture() +def spectrum_csv(tmp_path: Path) -> Path: + """Simple two-column spectrum CSV: energy, counts.""" + p = tmp_path / "spectrum.csv" + p.write_text( + "# units: keV\n" + "# detector: HPGe\n" + "energy,counts\n" + "100.0,10\n" + "200.0,25\n" + "300.0,18\n" + "400.0,7\n" + ) + return p + + +@pytest.fixture() +def calibration_csv(tmp_path: Path) -> Path: + """Three-column calibration CSV: input, output, uncertainty.""" + p = tmp_path / "calibration.csv" + p.write_text("input,output,uncertainty\n1.0,2.1,0.1\n2.0,4.0,0.2\n3.0,6.2,0.15\n") + return p + + +@pytest.fixture() +def device_data_tsv(tmp_path: Path) -> Path: + """Tab-delimited device data: timestamp, temperature, pressure.""" + p = tmp_path / "device.tsv" + p.write_text( + "timestamp\ttemperature\tpressure\n" + "0.0\t22.5\t101.3\n" + "1.0\t22.6\t101.2\n" + "2.0\t22.4\t101.4\n" + ) + return p + + +@pytest.fixture() +def comment_metadata_csv(tmp_path: Path) -> Path: + """CSV with comment-line metadata.""" + p = tmp_path / "annotated.csv" + p.write_text( + "# units: keV\n" + "# detector: HPGe\n" + "# facility: PSI\n" + "# measurement_id: M-2026-001\n" + "energy,counts\n" + "100.0,50\n" + "200.0,75\n" + ) + return p + + +@pytest.fixture() +def empty_data_csv(tmp_path: Path) -> Path: + """CSV with header but no data rows.""" + p = tmp_path / "empty.csv" + p.write_text("energy,counts\n") + return p + + +@pytest.fixture() +def mixed_types_csv(tmp_path: Path) -> Path: + """CSV with numeric and string columns.""" + p = tmp_path / "mixed.csv" + p.write_text("channel,counts,label\n1,100,low\n2,250,mid\n3,50,high\n") + return p + + +# --------------------------------------------------------------------------- +# Loader protocol conformance +# --------------------------------------------------------------------------- + + +class TestCsvLoaderProtocol: + """CsvLoader satisfies the Loader protocol.""" + + def test_is_loader_instance(self, loader: CsvLoader): + assert isinstance(loader, Loader) + + def test_supported_product_types(self, loader: CsvLoader): + types = loader.supported_product_types + assert isinstance(types, list) + assert "spectrum" in types + assert "calibration" in types + assert "device_data" in types + + def test_has_ingest_method(self, loader: CsvLoader): + assert callable(getattr(loader, "ingest", None)) + + +# --------------------------------------------------------------------------- +# CSV reading — happy path +# --------------------------------------------------------------------------- + + +class TestIngestSpectrum: + """Ingest spectrum CSV produces valid fd5 file.""" + + def test_returns_path(self, loader: CsvLoader, spectrum_csv: Path, tmp_path: Path): + result = loader.ingest( + spectrum_csv, + tmp_path / "out", + product="spectrum", + name="Test spectrum", + description="A test spectrum from CSV", + timestamp="2026-02-25T12:00:00+00:00", + ) + assert isinstance(result, Path) + assert result.exists() + assert result.suffix == ".h5" + + def test_root_attrs(self, loader: CsvLoader, spectrum_csv: Path, tmp_path: Path): + result = loader.ingest( + spectrum_csv, + tmp_path / "out", + product="spectrum", + name="Test spectrum", + description="A test spectrum from CSV", + timestamp="2026-02-25T12:00:00+00:00", + ) + with h5py.File(result, "r") as f: + assert f.attrs["product"] == "spectrum" + assert f.attrs["name"] == "Test spectrum" + assert f.attrs["description"] == "A test spectrum from CSV" + + def test_data_written(self, loader: CsvLoader, spectrum_csv: Path, tmp_path: Path): + result = loader.ingest( + spectrum_csv, + tmp_path / "out", + product="spectrum", + name="Test spectrum", + description="A test spectrum from CSV", + timestamp="2026-02-25T12:00:00+00:00", + ) + with h5py.File(result, "r") as f: + assert "counts" in f + counts = f["counts"][:] + assert counts.shape == (4,) + np.testing.assert_array_almost_equal(counts, [10, 25, 18, 7]) + + +class TestIngestCalibration: + """Ingest calibration CSV produces valid fd5 file.""" + + def test_returns_path( + self, loader: CsvLoader, calibration_csv: Path, tmp_path: Path + ): + result = loader.ingest( + calibration_csv, + tmp_path / "out", + product="calibration", + name="Energy cal", + description="Energy calibration curve", + timestamp="2026-02-25T12:00:00+00:00", + calibration_type="energy_calibration", + scanner_model="TestScanner", + scanner_serial="SN-001", + valid_from="2026-01-01", + valid_until="2027-01-01", + ) + assert isinstance(result, Path) + assert result.exists() + + def test_calibration_attrs( + self, loader: CsvLoader, calibration_csv: Path, tmp_path: Path + ): + result = loader.ingest( + calibration_csv, + tmp_path / "out", + product="calibration", + name="Energy cal", + description="Energy calibration curve", + timestamp="2026-02-25T12:00:00+00:00", + calibration_type="energy_calibration", + scanner_model="TestScanner", + scanner_serial="SN-001", + valid_from="2026-01-01", + valid_until="2027-01-01", + ) + with h5py.File(result, "r") as f: + assert f.attrs["product"] == "calibration" + + +class TestIngestDeviceData: + """Ingest TSV device data produces valid fd5 file.""" + + def test_tsv_delimiter( + self, loader: CsvLoader, device_data_tsv: Path, tmp_path: Path + ): + result = loader.ingest( + device_data_tsv, + tmp_path / "out", + product="device_data", + name="Temp log", + description="Temperature logger data", + timestamp="2026-02-25T12:00:00+00:00", + delimiter="\t", + device_type="environmental_sensor", + device_model="TempSensor-100", + ) + assert isinstance(result, Path) + assert result.exists() + + def test_device_data_attrs( + self, loader: CsvLoader, device_data_tsv: Path, tmp_path: Path + ): + result = loader.ingest( + device_data_tsv, + tmp_path / "out", + product="device_data", + name="Temp log", + description="Temperature logger data", + timestamp="2026-02-25T12:00:00+00:00", + delimiter="\t", + device_type="environmental_sensor", + device_model="TempSensor-100", + ) + with h5py.File(result, "r") as f: + assert f.attrs["product"] == "device_data" + + +# --------------------------------------------------------------------------- +# Column mapping +# --------------------------------------------------------------------------- + + +class TestColumnMapping: + """Column mapping configurable and auto-detected from headers.""" + + def test_explicit_column_map( + self, loader: CsvLoader, spectrum_csv: Path, tmp_path: Path + ): + result = loader.ingest( + spectrum_csv, + tmp_path / "out", + product="spectrum", + name="Mapped spectrum", + description="Spectrum with explicit column mapping", + timestamp="2026-02-25T12:00:00+00:00", + column_map={"counts": "counts", "energy": "energy"}, + ) + with h5py.File(result, "r") as f: + assert "counts" in f + assert "axes" in f + + def test_auto_detect_columns( + self, loader: CsvLoader, spectrum_csv: Path, tmp_path: Path + ): + """When column_map is None, loader auto-detects columns from headers.""" + result = loader.ingest( + spectrum_csv, + tmp_path / "out", + product="spectrum", + name="Auto spectrum", + description="Spectrum with auto-detected columns", + timestamp="2026-02-25T12:00:00+00:00", + ) + with h5py.File(result, "r") as f: + assert "counts" in f + + +# --------------------------------------------------------------------------- +# Comment-line metadata extraction +# --------------------------------------------------------------------------- + + +class TestCommentMetadata: + """Extract metadata from CSV comment lines.""" + + def test_metadata_extracted( + self, loader: CsvLoader, comment_metadata_csv: Path, tmp_path: Path + ): + result = loader.ingest( + comment_metadata_csv, + tmp_path / "out", + product="spectrum", + name="Annotated spectrum", + description="Spectrum with comment metadata", + timestamp="2026-02-25T12:00:00+00:00", + ) + with h5py.File(result, "r") as f: + assert "metadata" in f + meta = f["metadata"] + assert meta.attrs["units"] == "keV" + assert meta.attrs["detector"] == "HPGe" + assert meta.attrs["facility"] == "PSI" + + +# --------------------------------------------------------------------------- +# Provenance +# --------------------------------------------------------------------------- + + +class TestProvenance: + """Provenance records source CSV SHA-256.""" + + def test_provenance_original_files( + self, loader: CsvLoader, spectrum_csv: Path, tmp_path: Path + ): + result = loader.ingest( + spectrum_csv, + tmp_path / "out", + product="spectrum", + name="Provenance test", + description="Testing provenance recording", + timestamp="2026-02-25T12:00:00+00:00", + ) + with h5py.File(result, "r") as f: + assert "provenance" in f + assert "original_files" in f["provenance"] + orig = f["provenance/original_files"] + assert orig.shape[0] >= 1 + rec = orig[0] + sha = rec["sha256"] + if isinstance(sha, bytes): + sha = sha.decode() + assert sha.startswith("sha256:") + + def test_provenance_sha256_correct( + self, loader: CsvLoader, spectrum_csv: Path, tmp_path: Path + ): + expected_hash = hashlib.sha256(spectrum_csv.read_bytes()).hexdigest() + result = loader.ingest( + spectrum_csv, + tmp_path / "out", + product="spectrum", + name="SHA test", + description="Verify SHA-256 hash", + timestamp="2026-02-25T12:00:00+00:00", + ) + with h5py.File(result, "r") as f: + rec = f["provenance/original_files"][0] + sha = rec["sha256"] + if isinstance(sha, bytes): + sha = sha.decode() + assert sha == f"sha256:{expected_hash}" + + def test_provenance_ingest_group( + self, loader: CsvLoader, spectrum_csv: Path, tmp_path: Path + ): + result = loader.ingest( + spectrum_csv, + tmp_path / "out", + product="spectrum", + name="Ingest prov", + description="Testing ingest provenance", + timestamp="2026-02-25T12:00:00+00:00", + ) + with h5py.File(result, "r") as f: + assert "provenance/ingest" in f + ingest = f["provenance/ingest"] + assert "tool" in ingest.attrs + + +# --------------------------------------------------------------------------- +# Edge cases +# --------------------------------------------------------------------------- + + +class TestEdgeCases: + """Edge cases: empty data, missing file, custom delimiter/comment.""" + + def test_nonexistent_file_raises(self, loader: CsvLoader, tmp_path: Path): + with pytest.raises(FileNotFoundError): + loader.ingest( + tmp_path / "no_such_file.csv", + tmp_path / "out", + product="spectrum", + name="Missing", + description="Missing file", + timestamp="2026-02-25T12:00:00+00:00", + ) + + def test_empty_csv_raises( + self, loader: CsvLoader, empty_data_csv: Path, tmp_path: Path + ): + with pytest.raises(ValueError, match="[Nn]o data"): + loader.ingest( + empty_data_csv, + tmp_path / "out", + product="spectrum", + name="Empty", + description="Empty data", + timestamp="2026-02-25T12:00:00+00:00", + ) + + def test_custom_comment_char(self, loader: CsvLoader, tmp_path: Path): + csv_file = tmp_path / "custom_comment.csv" + csv_file.write_text("% units: MeV\nenergy,counts\n100.0,10\n200.0,20\n") + result = loader.ingest( + csv_file, + tmp_path / "out", + product="spectrum", + name="Custom comment", + description="CSV with % comments", + timestamp="2026-02-25T12:00:00+00:00", + comment="%", + ) + with h5py.File(result, "r") as f: + assert "metadata" in f + assert f["metadata"].attrs["units"] == "MeV" + + def test_custom_header_row(self, loader: CsvLoader, tmp_path: Path): + csv_file = tmp_path / "header_row.csv" + csv_file.write_text("This is a title line\nenergy,counts\n100.0,10\n200.0,20\n") + result = loader.ingest( + csv_file, + tmp_path / "out", + product="spectrum", + name="Header offset", + description="CSV with header on row 1", + timestamp="2026-02-25T12:00:00+00:00", + header_row=1, + ) + with h5py.File(result, "r") as f: + assert "counts" in f + counts = f["counts"][:] + assert counts.shape == (2,) + + def test_string_source_path( + self, loader: CsvLoader, spectrum_csv: Path, tmp_path: Path + ): + """Source can be a str, not just Path.""" + result = loader.ingest( + str(spectrum_csv), + tmp_path / "out", + product="spectrum", + name="String path", + description="Source as str", + timestamp="2026-02-25T12:00:00+00:00", + ) + assert result.exists() + + +# --------------------------------------------------------------------------- +# Generic product type +# --------------------------------------------------------------------------- + + +class TestGenericProduct: + """Generic product: user specifies product type + column mapping.""" + + def test_generic_ingest(self, loader: CsvLoader, tmp_path: Path): + csv_file = tmp_path / "generic.csv" + csv_file.write_text("x,y,z\n1.0,2.0,3.0\n4.0,5.0,6.0\n") + result = loader.ingest( + csv_file, + tmp_path / "out", + product="spectrum", + name="Generic CSV", + description="Generic columnar data", + timestamp="2026-02-25T12:00:00+00:00", + column_map={"counts": "y", "energy": "x"}, + ) + with h5py.File(result, "r") as f: + assert "counts" in f + counts = f["counts"][:] + np.testing.assert_array_almost_equal(counts, [2.0, 5.0]) diff --git a/tests/test_ingest_metadata.py b/tests/test_ingest_metadata.py new file mode 100644 index 0000000..eea8284 --- /dev/null +++ b/tests/test_ingest_metadata.py @@ -0,0 +1,365 @@ +"""Tests for fd5.ingest.metadata — RO-Crate and DataCite metadata import.""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +import pytest +import yaml + +from fd5.ingest.metadata import ( + load_datacite_metadata, + load_metadata, + load_rocrate_metadata, +) + + +# --------------------------------------------------------------------------- +# Synthetic RO-Crate fixtures +# --------------------------------------------------------------------------- + +ROCRATE_FULL: dict[str, Any] = { + "@context": "https://w3id.org/ro/crate/1.2/context", + "@graph": [ + { + "@id": "ro-crate-metadata.json", + "@type": "CreativeWork", + "about": {"@id": "./"}, + "conformsTo": {"@id": "https://w3id.org/ro/crate/1.2"}, + }, + { + "@id": "./", + "@type": "Dataset", + "name": "DOGPLET DD01", + "license": "CC-BY-4.0", + "description": "Full PET dataset", + "author": [ + { + "@type": "Person", + "name": "Jane Doe", + "affiliation": "ETH Zurich", + "@id": "https://orcid.org/0000-0002-1234-5678", + }, + { + "@type": "Person", + "name": "John Smith", + "affiliation": "MIT", + }, + ], + }, + ], +} + +ROCRATE_MINIMAL: dict[str, Any] = { + "@context": "https://w3id.org/ro/crate/1.2/context", + "@graph": [ + { + "@id": "ro-crate-metadata.json", + "@type": "CreativeWork", + "about": {"@id": "./"}, + }, + { + "@id": "./", + "@type": "Dataset", + "name": "Minimal Dataset", + }, + ], +} + +ROCRATE_NO_DATASET: dict[str, Any] = { + "@context": "https://w3id.org/ro/crate/1.2/context", + "@graph": [ + { + "@id": "ro-crate-metadata.json", + "@type": "CreativeWork", + "about": {"@id": "./"}, + }, + ], +} + + +def _write_rocrate(path: Path, data: dict[str, Any]) -> Path: + out = path / "ro-crate-metadata.json" + out.write_text(json.dumps(data, indent=2)) + return out + + +# --------------------------------------------------------------------------- +# Synthetic DataCite fixtures +# --------------------------------------------------------------------------- + +DATACITE_FULL: dict[str, Any] = { + "title": "DOGPLET DD01", + "creators": [ + {"name": "Jane Doe", "affiliation": "ETH Zurich"}, + {"name": "John Smith", "affiliation": "MIT"}, + ], + "dates": [ + {"date": "2024-07-24", "dateType": "Collected"}, + ], + "subjects": [ + {"subject": "FDG", "subjectScheme": "Radiotracer"}, + ], + "resourceType": "Dataset", +} + +DATACITE_MINIMAL: dict[str, Any] = { + "title": "Minimal", +} + + +def _write_datacite(path: Path, data: dict[str, Any]) -> Path: + out = path / "datacite.yml" + out.write_text(yaml.dump(data, default_flow_style=False)) + return out + + +# --------------------------------------------------------------------------- +# load_rocrate_metadata +# --------------------------------------------------------------------------- + + +class TestLoadRocrateMetadata: + def test_extracts_name(self, tmp_path: Path): + f = _write_rocrate(tmp_path, ROCRATE_FULL) + result = load_rocrate_metadata(f) + assert result["name"] == "DOGPLET DD01" + + def test_extracts_license(self, tmp_path: Path): + f = _write_rocrate(tmp_path, ROCRATE_FULL) + result = load_rocrate_metadata(f) + assert result["license"] == "CC-BY-4.0" + + def test_extracts_description(self, tmp_path: Path): + f = _write_rocrate(tmp_path, ROCRATE_FULL) + result = load_rocrate_metadata(f) + assert result["description"] == "Full PET dataset" + + def test_extracts_creators(self, tmp_path: Path): + f = _write_rocrate(tmp_path, ROCRATE_FULL) + result = load_rocrate_metadata(f) + creators = result["creators"] + assert len(creators) == 2 + assert creators[0]["name"] == "Jane Doe" + assert creators[0]["affiliation"] == "ETH Zurich" + assert creators[0]["orcid"] == "https://orcid.org/0000-0002-1234-5678" + + def test_creator_without_orcid(self, tmp_path: Path): + f = _write_rocrate(tmp_path, ROCRATE_FULL) + result = load_rocrate_metadata(f) + john = result["creators"][1] + assert john["name"] == "John Smith" + assert "orcid" not in john + + def test_creator_without_affiliation(self, tmp_path: Path): + crate = { + "@context": "https://w3id.org/ro/crate/1.2/context", + "@graph": [ + { + "@id": "./", + "@type": "Dataset", + "name": "Test", + "author": [{"@type": "Person", "name": "Solo Dev"}], + }, + ], + } + f = _write_rocrate(tmp_path, crate) + result = load_rocrate_metadata(f) + assert result["creators"][0]["name"] == "Solo Dev" + assert "affiliation" not in result["creators"][0] + + def test_missing_license_absent_key(self, tmp_path: Path): + f = _write_rocrate(tmp_path, ROCRATE_MINIMAL) + result = load_rocrate_metadata(f) + assert "license" not in result + + def test_missing_description_absent_key(self, tmp_path: Path): + f = _write_rocrate(tmp_path, ROCRATE_MINIMAL) + result = load_rocrate_metadata(f) + assert "description" not in result + + def test_missing_authors_absent_creators(self, tmp_path: Path): + f = _write_rocrate(tmp_path, ROCRATE_MINIMAL) + result = load_rocrate_metadata(f) + assert "creators" not in result + + def test_no_dataset_entity_returns_empty(self, tmp_path: Path): + f = _write_rocrate(tmp_path, ROCRATE_NO_DATASET) + result = load_rocrate_metadata(f) + assert result == {} + + def test_returns_dict(self, tmp_path: Path): + f = _write_rocrate(tmp_path, ROCRATE_FULL) + result = load_rocrate_metadata(f) + assert isinstance(result, dict) + + def test_result_usable_with_write_study(self, tmp_path: Path): + """Returned dict keys should match builder.write_study() parameters.""" + f = _write_rocrate(tmp_path, ROCRATE_FULL) + result = load_rocrate_metadata(f) + allowed = {"study_type", "license", "name", "description", "creators"} + assert set(result.keys()) <= allowed + + def test_study_type_not_set(self, tmp_path: Path): + """RO-Crate doesn't map to study_type, so key should be absent.""" + f = _write_rocrate(tmp_path, ROCRATE_FULL) + result = load_rocrate_metadata(f) + assert "study_type" not in result + + def test_empty_author_list(self, tmp_path: Path): + crate = { + "@context": "https://w3id.org/ro/crate/1.2/context", + "@graph": [ + {"@id": "./", "@type": "Dataset", "name": "Test", "author": []}, + ], + } + f = _write_rocrate(tmp_path, crate) + result = load_rocrate_metadata(f) + assert "creators" not in result + + def test_nonexistent_file_raises(self, tmp_path: Path): + with pytest.raises(FileNotFoundError): + load_rocrate_metadata(tmp_path / "nonexistent.json") + + +# --------------------------------------------------------------------------- +# load_datacite_metadata +# --------------------------------------------------------------------------- + + +class TestLoadDataciteMetadata: + def test_extracts_name_from_title(self, tmp_path: Path): + f = _write_datacite(tmp_path, DATACITE_FULL) + result = load_datacite_metadata(f) + assert result["name"] == "DOGPLET DD01" + + def test_extracts_creators(self, tmp_path: Path): + f = _write_datacite(tmp_path, DATACITE_FULL) + result = load_datacite_metadata(f) + creators = result["creators"] + assert len(creators) == 2 + assert creators[0]["name"] == "Jane Doe" + assert creators[0]["affiliation"] == "ETH Zurich" + + def test_extracts_dates(self, tmp_path: Path): + f = _write_datacite(tmp_path, DATACITE_FULL) + result = load_datacite_metadata(f) + assert result["dates"] == [{"date": "2024-07-24", "dateType": "Collected"}] + + def test_extracts_subjects(self, tmp_path: Path): + f = _write_datacite(tmp_path, DATACITE_FULL) + result = load_datacite_metadata(f) + assert result["subjects"] == [ + {"subject": "FDG", "subjectScheme": "Radiotracer"} + ] + + def test_missing_creators_absent_key(self, tmp_path: Path): + f = _write_datacite(tmp_path, DATACITE_MINIMAL) + result = load_datacite_metadata(f) + assert "creators" not in result + + def test_missing_dates_absent_key(self, tmp_path: Path): + f = _write_datacite(tmp_path, DATACITE_MINIMAL) + result = load_datacite_metadata(f) + assert "dates" not in result + + def test_missing_subjects_absent_key(self, tmp_path: Path): + f = _write_datacite(tmp_path, DATACITE_MINIMAL) + result = load_datacite_metadata(f) + assert "subjects" not in result + + def test_missing_title_absent_name(self, tmp_path: Path): + f = _write_datacite(tmp_path, {}) + result = load_datacite_metadata(f) + assert "name" not in result + + def test_returns_dict(self, tmp_path: Path): + f = _write_datacite(tmp_path, DATACITE_FULL) + result = load_datacite_metadata(f) + assert isinstance(result, dict) + + def test_result_keys_subset_of_study_params(self, tmp_path: Path): + """Keys must be compatible with builder.write_study() + extra metadata.""" + f = _write_datacite(tmp_path, DATACITE_FULL) + result = load_datacite_metadata(f) + allowed = { + "study_type", + "license", + "name", + "description", + "creators", + "dates", + "subjects", + } + assert set(result.keys()) <= allowed + + def test_empty_creators_list(self, tmp_path: Path): + f = _write_datacite(tmp_path, {"title": "Test", "creators": []}) + result = load_datacite_metadata(f) + assert "creators" not in result + + def test_nonexistent_file_raises(self, tmp_path: Path): + with pytest.raises(FileNotFoundError): + load_datacite_metadata(tmp_path / "nonexistent.yml") + + +# --------------------------------------------------------------------------- +# load_metadata (auto-detect) +# --------------------------------------------------------------------------- + + +class TestLoadMetadata: + def test_detects_rocrate(self, tmp_path: Path): + f = _write_rocrate(tmp_path, ROCRATE_FULL) + result = load_metadata(f) + assert result["name"] == "DOGPLET DD01" + assert result["license"] == "CC-BY-4.0" + + def test_detects_datacite_yml(self, tmp_path: Path): + f = _write_datacite(tmp_path, DATACITE_FULL) + result = load_metadata(f) + assert result["name"] == "DOGPLET DD01" + + def test_detects_datacite_yaml(self, tmp_path: Path): + out = tmp_path / "datacite.yaml" + out.write_text(yaml.dump(DATACITE_FULL, default_flow_style=False)) + result = load_metadata(out) + assert result["name"] == "DOGPLET DD01" + + def test_generic_json(self, tmp_path: Path): + data = {"name": "Generic Study", "license": "MIT"} + f = tmp_path / "meta.json" + f.write_text(json.dumps(data)) + result = load_metadata(f) + assert result == data + + def test_generic_yaml(self, tmp_path: Path): + data = {"name": "YAML Study", "description": "A study from YAML"} + f = tmp_path / "meta.yml" + f.write_text(yaml.dump(data)) + result = load_metadata(f) + assert result == data + + def test_generic_yaml_extension(self, tmp_path: Path): + data = {"name": "YAML Study"} + f = tmp_path / "meta.yaml" + f.write_text(yaml.dump(data)) + result = load_metadata(f) + assert result == data + + def test_unsupported_extension_raises(self, tmp_path: Path): + f = tmp_path / "meta.txt" + f.write_text("hello") + with pytest.raises(ValueError, match="Unsupported"): + load_metadata(f) + + def test_nonexistent_file_raises(self, tmp_path: Path): + with pytest.raises(FileNotFoundError): + load_metadata(tmp_path / "nonexistent.json") + + def test_returns_dict(self, tmp_path: Path): + f = _write_rocrate(tmp_path, ROCRATE_FULL) + result = load_metadata(f) + assert isinstance(result, dict) diff --git a/tests/test_ingest_nifti.py b/tests/test_ingest_nifti.py new file mode 100644 index 0000000..cf3b62d --- /dev/null +++ b/tests/test_ingest_nifti.py @@ -0,0 +1,395 @@ +"""Tests for fd5.ingest.nifti — NIfTI loader producing sealed fd5 recon files.""" + +from __future__ import annotations + +import hashlib +from pathlib import Path +from unittest import mock + +import h5py +import nibabel as nib +import numpy as np +import pytest + +from fd5.ingest._base import Loader +from fd5.ingest.nifti import NiftiLoader, ingest_nifti + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def nifti_3d(tmp_path: Path) -> Path: + """Create a synthetic 3D NIfTI-1 file (.nii).""" + vol = np.arange(24, dtype=np.float32).reshape(2, 3, 4) + affine = np.diag([2.0, 2.0, 2.0, 1.0]) + img = nib.Nifti1Image(vol, affine) + p = tmp_path / "volume_3d.nii" + nib.save(img, p) + return p + + +@pytest.fixture() +def nifti_4d(tmp_path: Path) -> Path: + """Create a synthetic 4D NIfTI-1 file (.nii).""" + vol = np.arange(48, dtype=np.float32).reshape(2, 2, 3, 4) + affine = np.diag([1.0, 1.0, 1.0, 1.0]) + img = nib.Nifti1Image(vol, affine) + p = tmp_path / "volume_4d.nii" + nib.save(img, p) + return p + + +@pytest.fixture() +def nifti_gz(tmp_path: Path) -> Path: + """Create a synthetic 3D NIfTI-1 file (.nii.gz).""" + vol = np.ones((3, 4, 5), dtype=np.float32) + affine = np.eye(4) + img = nib.Nifti1Image(vol, affine) + p = tmp_path / "compressed.nii.gz" + nib.save(img, p) + return p + + +@pytest.fixture() +def nifti2_3d(tmp_path: Path) -> Path: + """Create a synthetic 3D NIfTI-2 file.""" + vol = np.ones((3, 4, 5), dtype=np.float32) + affine = np.eye(4) + img = nib.Nifti2Image(vol, affine) + p = tmp_path / "volume_nifti2.nii" + nib.save(img, p) + return p + + +# --------------------------------------------------------------------------- +# Loader protocol conformance +# --------------------------------------------------------------------------- + + +class TestNiftiLoaderProtocol: + def test_implements_loader(self): + loader = NiftiLoader() + assert isinstance(loader, Loader) + + def test_supported_product_types(self): + loader = NiftiLoader() + assert "recon" in loader.supported_product_types + + +# --------------------------------------------------------------------------- +# ingest_nifti — happy paths +# --------------------------------------------------------------------------- + + +class TestIngestNifti3D: + def test_returns_path(self, nifti_3d: Path, tmp_path: Path): + out = tmp_path / "out" + result = ingest_nifti( + nifti_3d, + out, + name="test-vol", + description="A test volume", + ) + assert isinstance(result, Path) + assert result.exists() + assert result.suffix == ".h5" + + def test_fd5_root_attrs(self, nifti_3d: Path, tmp_path: Path): + out = tmp_path / "out" + result = ingest_nifti( + nifti_3d, + out, + name="test-vol", + description="A test volume", + ) + with h5py.File(result, "r") as f: + assert f.attrs["product"] == "recon" + assert f.attrs["name"] == "test-vol" + assert f.attrs["description"] == "A test volume" + assert "timestamp" in f.attrs + assert "id" in f.attrs + assert "content_hash" in f.attrs + + def test_volume_dataset(self, nifti_3d: Path, tmp_path: Path): + out = tmp_path / "out" + result = ingest_nifti( + nifti_3d, + out, + name="test-vol", + description="A test volume", + ) + with h5py.File(result, "r") as f: + assert "volume" in f + vol = f["volume"][:] + assert vol.shape == (2, 3, 4) + np.testing.assert_allclose(vol, np.arange(24).reshape(2, 3, 4)) + + def test_affine_from_sform(self, nifti_3d: Path, tmp_path: Path): + out = tmp_path / "out" + result = ingest_nifti( + nifti_3d, + out, + name="test-vol", + description="A test volume", + ) + with h5py.File(result, "r") as f: + affine = f["volume"].attrs["affine"] + expected = np.diag([2.0, 2.0, 2.0, 1.0]) + np.testing.assert_allclose(affine, expected) + + def test_dimension_order_3d(self, nifti_3d: Path, tmp_path: Path): + out = tmp_path / "out" + result = ingest_nifti( + nifti_3d, + out, + name="test-vol", + description="A test volume", + ) + with h5py.File(result, "r") as f: + dim_order = f["volume"].attrs["dimension_order"] + assert dim_order == "ZYX" + + def test_reference_frame_default(self, nifti_3d: Path, tmp_path: Path): + out = tmp_path / "out" + result = ingest_nifti( + nifti_3d, + out, + name="test-vol", + description="A test volume", + ) + with h5py.File(result, "r") as f: + assert f["volume"].attrs["reference_frame"] == "RAS" + + def test_reference_frame_custom(self, nifti_3d: Path, tmp_path: Path): + out = tmp_path / "out" + result = ingest_nifti( + nifti_3d, + out, + name="test-vol", + description="A test volume", + reference_frame="LPS", + ) + with h5py.File(result, "r") as f: + assert f["volume"].attrs["reference_frame"] == "LPS" + + +# --------------------------------------------------------------------------- +# 4D support +# --------------------------------------------------------------------------- + + +class TestIngestNifti4D: + def test_4d_volume_shape(self, nifti_4d: Path, tmp_path: Path): + out = tmp_path / "out" + result = ingest_nifti( + nifti_4d, + out, + name="dynamic", + description="4D test", + ) + with h5py.File(result, "r") as f: + assert f["volume"][:].shape == (2, 2, 3, 4) + + def test_4d_dimension_order(self, nifti_4d: Path, tmp_path: Path): + out = tmp_path / "out" + result = ingest_nifti( + nifti_4d, + out, + name="dynamic", + description="4D test", + ) + with h5py.File(result, "r") as f: + assert f["volume"].attrs["dimension_order"] == "TZYX" + + +# --------------------------------------------------------------------------- +# Compressed (.nii.gz) +# --------------------------------------------------------------------------- + + +class TestIngestNiftiGz: + def test_compressed_file(self, nifti_gz: Path, tmp_path: Path): + out = tmp_path / "out" + result = ingest_nifti( + nifti_gz, + out, + name="compressed", + description="gzip test", + ) + with h5py.File(result, "r") as f: + assert f["volume"][:].shape == (3, 4, 5) + + +# --------------------------------------------------------------------------- +# NIfTI-2 support +# --------------------------------------------------------------------------- + + +class TestIngestNifti2: + def test_nifti2_file(self, nifti2_3d: Path, tmp_path: Path): + out = tmp_path / "out" + result = ingest_nifti( + nifti2_3d, + out, + name="nifti2-vol", + description="NIfTI-2 test", + ) + with h5py.File(result, "r") as f: + assert f["volume"][:].shape == (3, 4, 5) + + +# --------------------------------------------------------------------------- +# Provenance +# --------------------------------------------------------------------------- + + +class TestProvenance: + def test_provenance_original_files(self, nifti_3d: Path, tmp_path: Path): + out = tmp_path / "out" + result = ingest_nifti( + nifti_3d, + out, + name="prov-test", + description="Provenance test", + ) + with h5py.File(result, "r") as f: + assert "provenance" in f + assert "original_files" in f["provenance"] + rec = f["provenance/original_files"][0] + assert str(nifti_3d) in rec["path"].decode() + sha = hashlib.sha256(nifti_3d.read_bytes()).hexdigest() + assert rec["sha256"].decode() == sha + + def test_provenance_ingest_group(self, nifti_3d: Path, tmp_path: Path): + out = tmp_path / "out" + result = ingest_nifti( + nifti_3d, + out, + name="prov-test", + description="Provenance test", + ) + with h5py.File(result, "r") as f: + assert "provenance/ingest" in f + ingest_grp = f["provenance/ingest"] + assert "tool" in ingest_grp.attrs or "tool" in ingest_grp + + +# --------------------------------------------------------------------------- +# Study metadata +# --------------------------------------------------------------------------- + + +class TestStudyMetadata: + def test_study_group_written(self, nifti_3d: Path, tmp_path: Path): + out = tmp_path / "out" + result = ingest_nifti( + nifti_3d, + out, + name="study-test", + description="Study test", + study_metadata={ + "study_type": "phantom", + "license": "CC-BY-4.0", + "description": "Phantom study", + }, + ) + with h5py.File(result, "r") as f: + assert "study" in f + assert f["study"].attrs["type"] == "phantom" + + +# --------------------------------------------------------------------------- +# Timestamp +# --------------------------------------------------------------------------- + + +class TestTimestamp: + def test_custom_timestamp(self, nifti_3d: Path, tmp_path: Path): + out = tmp_path / "out" + ts = "2025-01-15T10:30:00Z" + result = ingest_nifti( + nifti_3d, + out, + name="ts-test", + description="Timestamp test", + timestamp=ts, + ) + with h5py.File(result, "r") as f: + assert f.attrs["timestamp"] == ts + + def test_auto_timestamp(self, nifti_3d: Path, tmp_path: Path): + out = tmp_path / "out" + result = ingest_nifti( + nifti_3d, + out, + name="ts-test", + description="Timestamp test", + ) + with h5py.File(result, "r") as f: + assert len(f.attrs["timestamp"]) > 0 + + +# --------------------------------------------------------------------------- +# Error paths +# --------------------------------------------------------------------------- + + +class TestErrors: + def test_nonexistent_file(self, tmp_path: Path): + with pytest.raises(FileNotFoundError): + ingest_nifti( + tmp_path / "missing.nii", + tmp_path / "out", + name="err", + description="err", + ) + + def test_invalid_file(self, tmp_path: Path): + bad = tmp_path / "bad.nii" + bad.write_bytes(b"not a nifti file") + with pytest.raises(Exception): + ingest_nifti( + bad, + tmp_path / "out", + name="err", + description="err", + ) + + +# --------------------------------------------------------------------------- +# NiftiLoader.ingest method +# --------------------------------------------------------------------------- + + +class TestNiftiLoaderIngest: + def test_ingest_method(self, nifti_3d: Path, tmp_path: Path): + loader = NiftiLoader() + result = loader.ingest( + nifti_3d, + tmp_path / "out", + product="recon", + name="loader-test", + description="Via loader", + ) + assert result.exists() + with h5py.File(result, "r") as f: + assert f.attrs["product"] == "recon" + + +# --------------------------------------------------------------------------- +# ImportError when nibabel missing +# --------------------------------------------------------------------------- + + +class TestNibabelImportError: + def test_clear_message_when_nibabel_missing(self): + with mock.patch.dict("sys.modules", {"nibabel": None}): + with pytest.raises(ImportError, match="nibabel"): + import importlib + + import fd5.ingest.nifti as mod + + importlib.reload(mod) diff --git a/tests/test_ingest_raw.py b/tests/test_ingest_raw.py new file mode 100644 index 0000000..7feedbc --- /dev/null +++ b/tests/test_ingest_raw.py @@ -0,0 +1,328 @@ +"""Tests for fd5.ingest.raw module.""" + +from __future__ import annotations + +import hashlib +from pathlib import Path +from typing import Any + +import h5py +import numpy as np +import pytest + +from fd5.imaging.recon import ReconSchema +from fd5.imaging.sinogram import SinogramSchema +from fd5.registry import register_schema + + +@pytest.fixture(autouse=True) +def _register_schemas(): + register_schema("recon", ReconSchema()) + register_schema("sinogram", SinogramSchema()) + + +def _recon_data(shape: tuple[int, ...] = (8, 16, 16)) -> dict[str, Any]: + return { + "volume": np.random.default_rng(42).random(shape, dtype=np.float32), + "affine": np.eye(4, dtype=np.float64), + "dimension_order": "ZYX", + "reference_frame": "LPS", + "description": "Test recon volume", + } + + +def _sinogram_data() -> dict[str, Any]: + n_planes, n_angular, n_radial = 5, 12, 16 + return { + "sinogram": np.random.default_rng(7).random( + (n_planes, n_angular, n_radial), dtype=np.float32 + ), + "n_radial": n_radial, + "n_angular": n_angular, + "n_planes": n_planes, + "span": 3, + "max_ring_diff": 2, + "tof_bins": 0, + } + + +class TestIngestArray: + """Tests for ingest_array().""" + + def test_produces_sealed_recon_file(self, tmp_path: Path): + from fd5.ingest.raw import ingest_array + + result = ingest_array( + _recon_data(), + tmp_path, + product="recon", + name="test-recon", + description="A test recon file", + timestamp="2025-01-01T00:00:00+00:00", + ) + + assert result.exists() + assert result.suffix == ".h5" + with h5py.File(result, "r") as f: + assert f.attrs["product"] == "recon" + assert f.attrs["name"] == "test-recon" + assert "content_hash" in f.attrs + assert "id" in f.attrs + assert "_schema" in f.attrs + assert "volume" in f + + def test_writes_metadata(self, tmp_path: Path): + from fd5.ingest.raw import ingest_array + + metadata = {"scanner": "test-scanner", "vendor_series_id": "S001"} + result = ingest_array( + _recon_data(), + tmp_path, + product="recon", + name="test-meta", + description="Metadata test", + timestamp="2025-01-01T00:00:00+00:00", + metadata=metadata, + ) + + with h5py.File(result, "r") as f: + assert "metadata" in f + assert f["metadata"].attrs["scanner"] == "test-scanner" + + def test_writes_sources(self, tmp_path: Path): + from fd5.ingest.raw import ingest_array + + sources = [ + { + "name": "src0", + "id": "abc", + "product": "raw", + "file": "source.h5", + "content_hash": "sha256:deadbeef", + "role": "input", + "description": "test source", + } + ] + result = ingest_array( + _recon_data(), + tmp_path, + product="recon", + name="test-src", + description="Sources test", + timestamp="2025-01-01T00:00:00+00:00", + sources=sources, + ) + + with h5py.File(result, "r") as f: + assert "sources" in f + + def test_writes_study_metadata(self, tmp_path: Path): + from fd5.ingest.raw import ingest_array + + study = { + "study_type": "clinical", + "license": "CC-BY-4.0", + "description": "Test study", + } + result = ingest_array( + _recon_data(), + tmp_path, + product="recon", + name="test-study", + description="Study test", + timestamp="2025-01-01T00:00:00+00:00", + study_metadata=study, + ) + + with h5py.File(result, "r") as f: + assert "study" in f + assert f["study"].attrs["type"] == "clinical" + + def test_default_timestamp(self, tmp_path: Path): + from fd5.ingest.raw import ingest_array + + result = ingest_array( + _recon_data(), + tmp_path, + product="recon", + name="test-ts", + description="Default timestamp test", + ) + + assert result.exists() + with h5py.File(result, "r") as f: + ts = f.attrs["timestamp"] + assert len(ts) > 0 + + def test_unknown_product_raises(self, tmp_path: Path): + from fd5.ingest.raw import ingest_array + + with pytest.raises(ValueError, match="no-such-product"): + ingest_array( + {}, + tmp_path, + product="no-such-product", + name="bad", + description="Should fail", + ) + + def test_sinogram_product(self, tmp_path: Path): + from fd5.ingest.raw import ingest_array + + result = ingest_array( + _sinogram_data(), + tmp_path, + product="sinogram", + name="test-sino", + description="A test sinogram", + timestamp="2025-01-01T00:00:00+00:00", + ) + + assert result.exists() + with h5py.File(result, "r") as f: + assert f.attrs["product"] == "sinogram" + assert "sinogram" in f + + +class TestIngestBinary: + """Tests for ingest_binary().""" + + def _write_binary(self, path: Path, arr: np.ndarray) -> None: + arr.tofile(path) + + def test_reads_binary_and_produces_fd5(self, tmp_path: Path): + from fd5.ingest.raw import ingest_binary + + shape = (8, 16, 16) + arr = np.random.default_rng(99).random(shape, dtype=np.float32) + bin_path = tmp_path / "volume.bin" + self._write_binary(bin_path, arr) + + out_dir = tmp_path / "output" + result = ingest_binary( + bin_path, + out_dir, + dtype="float32", + shape=shape, + product="recon", + name="test-binary", + description="Binary ingest test", + timestamp="2025-01-01T00:00:00+00:00", + affine=np.eye(4, dtype=np.float64), + dimension_order="ZYX", + reference_frame="LPS", + ) + + assert result.exists() + with h5py.File(result, "r") as f: + assert f.attrs["product"] == "recon" + read_vol = f["volume"][:] + np.testing.assert_array_almost_equal(read_vol, arr) + + def test_records_provenance_sha256(self, tmp_path: Path): + from fd5.ingest.raw import ingest_binary + + shape = (4, 8, 8) + arr = np.ones(shape, dtype=np.float32) + bin_path = tmp_path / "ones.bin" + self._write_binary(bin_path, arr) + + out_dir = tmp_path / "output" + result = ingest_binary( + bin_path, + out_dir, + dtype="float32", + shape=shape, + product="recon", + name="test-prov", + description="Provenance test", + timestamp="2025-01-01T00:00:00+00:00", + affine=np.eye(4, dtype=np.float64), + dimension_order="ZYX", + reference_frame="LPS", + ) + + expected_sha = hashlib.sha256(bin_path.read_bytes()).hexdigest() + with h5py.File(result, "r") as f: + assert "provenance" in f + assert "original_files" in f["provenance"] + rec = f["provenance"]["original_files"][0] + assert rec["sha256"].decode() == expected_sha + + def test_nonexistent_binary_raises(self, tmp_path: Path): + from fd5.ingest.raw import ingest_binary + + with pytest.raises(FileNotFoundError): + ingest_binary( + tmp_path / "missing.bin", + tmp_path / "output", + dtype="float32", + shape=(4, 4, 4), + product="recon", + name="bad", + description="Should fail", + ) + + def test_shape_mismatch_raises(self, tmp_path: Path): + from fd5.ingest.raw import ingest_binary + + arr = np.ones((4, 4, 4), dtype=np.float32) + bin_path = tmp_path / "small.bin" + self._write_binary(bin_path, arr) + + with pytest.raises(ValueError, match="cannot reshape"): + ingest_binary( + bin_path, + tmp_path / "output", + dtype="float32", + shape=(100, 100, 100), + product="recon", + name="bad", + description="Should fail", + ) + + +class TestRawLoader: + """Tests for RawLoader protocol conformance.""" + + def test_satisfies_loader_protocol(self): + from fd5.ingest._base import Loader + from fd5.ingest.raw import RawLoader + + loader = RawLoader() + assert isinstance(loader, Loader) + + def test_supported_product_types(self): + from fd5.ingest.raw import RawLoader + + loader = RawLoader() + types = loader.supported_product_types + assert isinstance(types, list) + assert "recon" in types + + def test_ingest_produces_file(self, tmp_path: Path): + from fd5.ingest.raw import RawLoader + + data_path = tmp_path / "data.bin" + arr = np.random.default_rng(1).random((4, 8, 8), dtype=np.float32) + arr.tofile(data_path) + + out_dir = tmp_path / "output" + loader = RawLoader() + result = loader.ingest( + data_path, + out_dir, + product="recon", + name="loader-test", + description="RawLoader test", + timestamp="2025-01-01T00:00:00+00:00", + dtype="float32", + shape=(4, 8, 8), + affine=np.eye(4, dtype=np.float64), + dimension_order="ZYX", + reference_frame="LPS", + ) + + assert result.exists() + with h5py.File(result, "r") as f: + assert f.attrs["product"] == "recon"