From d71702357e70f40e377a0436a375b42c48db18a9 Mon Sep 17 00:00:00 2001 From: gerchowl Date: Mon, 30 Mar 2026 11:04:48 +0200 Subject: [PATCH] feat(export): add nifti, csv, and parquet export modules MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round-trip export from fd5 back to standard formats: - fd5 export nifti: recon/ndarray → .nii.gz (via nibabel) - fd5 export csv: tabular/spectrum/timeseries → .csv - fd5 export parquet: tabular/timeseries → .parquet (via pyarrow) - Shared extract_columns() helper to avoid duplication - Lazy imports for optional deps with helpful error messages Co-Authored-By: Claude Opus 4.6 (1M context) --- pyproject.toml | 6 +- src/fd5/cli.py | 58 +++++++++++ src/fd5/export/__init__.py | 9 ++ src/fd5/export/csv.py | 186 +++++++++++++++++++++++++++++++++++ src/fd5/export/nifti.py | 95 ++++++++++++++++++ src/fd5/export/parquet.py | 56 +++++++++++ tests/test_export_csv.py | 178 +++++++++++++++++++++++++++++++++ tests/test_export_nifti.py | 126 ++++++++++++++++++++++++ tests/test_export_parquet.py | 130 ++++++++++++++++++++++++ 9 files changed, 843 insertions(+), 1 deletion(-) create mode 100644 src/fd5/export/__init__.py create mode 100644 src/fd5/export/csv.py create mode 100644 src/fd5/export/nifti.py create mode 100644 src/fd5/export/parquet.py create mode 100644 tests/test_export_csv.py create mode 100644 tests/test_export_nifti.py create mode 100644 tests/test_export_parquet.py diff --git a/pyproject.toml b/pyproject.toml index c86d026..6c79f80 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,8 +40,12 @@ nifti = [ parquet = [ "pyarrow>=14.0", ] +export = [ + "nibabel>=4.0", + "pyarrow>=14.0", +] all = [ - "fd5[dev,science,dicom,nifti,parquet]", + "fd5[dev,science,dicom,nifti,parquet,export]", ] [build-system] diff --git a/src/fd5/cli.py b/src/fd5/cli.py index 7ef85b3..2f16af3 100644 --- a/src/fd5/cli.py +++ b/src/fd5/cli.py @@ -493,6 +493,64 @@ def ingest_parquet( sys.exit(1) +# --------------------------------------------------------------------------- +# fd5 export — subcommand group +# --------------------------------------------------------------------------- + + +@cli.group() +def export() -> None: + """Export fd5 files to standard formats.""" + + +@export.command("nifti") +@click.argument("fd5_file", type=click.Path(exists=True)) +@click.option("-o", "--output", required=True, type=click.Path()) +@click.option("--dataset", default="volume", help="Dataset path to export.") +def export_nifti_cmd(fd5_file: str, output: str, dataset: str) -> None: + """Export volume data to NIfTI (.nii.gz).""" + from fd5.export.nifti import export_nifti + + try: + path = export_nifti(fd5_file, output, dataset=dataset) + click.echo(f"Exported: {path}") + except (ImportError, KeyError, ValueError) as exc: + click.echo(f"Error: {exc}", err=True) + sys.exit(1) + + +@export.command("csv") +@click.argument("fd5_file", type=click.Path(exists=True)) +@click.option("-o", "--output", required=True, type=click.Path()) +@click.option("--group", default=None, help="HDF5 group path to export from.") +def export_csv_cmd(fd5_file: str, output: str, group: str | None) -> None: + """Export tabular/timeseries/spectrum data to CSV.""" + from fd5.export.csv import export_csv + + try: + path = export_csv(fd5_file, output, group=group) + click.echo(f"Exported: {path}") + except (KeyError, ValueError) as exc: + click.echo(f"Error: {exc}", err=True) + sys.exit(1) + + +@export.command("parquet") +@click.argument("fd5_file", type=click.Path(exists=True)) +@click.option("-o", "--output", required=True, type=click.Path()) +@click.option("--group", default=None, help="HDF5 group path to export from.") +def export_parquet_cmd(fd5_file: str, output: str, group: str | None) -> None: + """Export tabular/timeseries data to Parquet.""" + from fd5.export.parquet import export_parquet + + try: + path = export_parquet(fd5_file, output, group=group) + click.echo(f"Exported: {path}") + except (ImportError, KeyError, ValueError) as exc: + click.echo(f"Error: {exc}", err=True) + sys.exit(1) + + # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- diff --git a/src/fd5/export/__init__.py b/src/fd5/export/__init__.py new file mode 100644 index 0000000..8a473be --- /dev/null +++ b/src/fd5/export/__init__.py @@ -0,0 +1,9 @@ +"""fd5.export — Export fd5 files to standard formats (NIfTI, CSV, Parquet).""" + +from __future__ import annotations + +from fd5.export.csv import export_csv +from fd5.export.nifti import export_nifti +from fd5.export.parquet import export_parquet + +__all__ = ["export_csv", "export_nifti", "export_parquet"] diff --git a/src/fd5/export/csv.py b/src/fd5/export/csv.py new file mode 100644 index 0000000..5f130ba --- /dev/null +++ b/src/fd5/export/csv.py @@ -0,0 +1,186 @@ +"""fd5.export.csv — Export tabular/spectrum/timeseries data to CSV. + +Reads product data from an fd5 file and writes a standard CSV file. +Supports product types: spectrum, device_data, and generic tabular data. +""" + +from __future__ import annotations + +import csv as csv_mod +from pathlib import Path + +import h5py +import numpy as np + + +def extract_columns( + fd5_path: str | Path, + *, + group: str | None = None, +) -> dict[str, np.ndarray]: + """Read tabular column data from an fd5 file. + + Shared by :func:`export_csv` and :func:`~fd5.export.parquet.export_parquet`. + """ + fd5_path = Path(fd5_path) + with h5py.File(fd5_path, "r") as f: + if group is not None: + return _extract_group(f, group) + product = _read_product(f) + return _PRODUCT_EXTRACTORS.get(product, _extract_generic)(f) + + +def export_csv( + fd5_path: str | Path, + output_path: str | Path, + *, + group: str | None = None, +) -> Path: + """Export tabular data from an fd5 file to CSV. + + Parameters + ---------- + fd5_path: + Path to the source fd5 (``.h5``) file. + output_path: + Destination path for the CSV file. + group: + Optional HDF5 group path to export from. If *None*, the product + type is auto-detected from root attrs. + + Returns + ------- + Path to the written CSV file. + """ + output_path = Path(output_path) + columns = extract_columns(fd5_path, group=group) + _write_csv(output_path, columns) + return output_path + + +# --------------------------------------------------------------------------- +# Product-type detection +# --------------------------------------------------------------------------- + + +def _read_product(f: h5py.File) -> str: + """Read the product root attribute.""" + val = f.attrs.get("product", "") + if isinstance(val, bytes): + val = val.decode("utf-8") + return val + + +# --------------------------------------------------------------------------- +# Data extraction per product type +# --------------------------------------------------------------------------- + + +def _extract_spectrum(f: h5py.File) -> dict[str, np.ndarray]: + """Extract spectrum data: bin_centers + counts (+ counts_errors).""" + columns: dict[str, np.ndarray] = {} + + if "counts" in f: + counts = f["counts"][()] + columns["counts"] = counts.ravel() + + if "counts_errors" in f: + columns["counts_errors"] = f["counts_errors"][()].ravel() + + # Extract bin_centers from the first axis + if "axes" in f: + axes_grp = f["axes"] + for ax_name in sorted(axes_grp.keys()): + ax = axes_grp[ax_name] + if "bin_centers" in ax: + label = ax.attrs.get("label", ax_name) + if isinstance(label, bytes): + label = label.decode("utf-8") + columns[label] = ax["bin_centers"][()] + + # Reorder so axis columns come first + reordered: dict[str, np.ndarray] = {} + for key in columns: + if key not in ("counts", "counts_errors"): + reordered[key] = columns[key] + for key in ("counts", "counts_errors"): + if key in columns: + reordered[key] = columns[key] + + return reordered + + +def _extract_device_data(f: h5py.File) -> dict[str, np.ndarray]: + """Extract device_data: time + signal per channel.""" + columns: dict[str, np.ndarray] = {} + + if "channels" not in f: + return columns + + channels_grp = f["channels"] + time_written = False + + for ch_name in sorted(channels_grp.keys()): + ch = channels_grp[ch_name] + + # Write time column from the first channel only + if not time_written and "time" in ch: + columns["time"] = ch["time"][()] + time_written = True + + if "signal" in ch: + columns[ch_name] = ch["signal"][()] + + return columns + + +def _extract_1d_datasets(group: h5py.Group) -> dict[str, np.ndarray]: + """Extract all 1D datasets from an HDF5 group.""" + columns: dict[str, np.ndarray] = {} + for key in sorted(group.keys()): + item = group[key] + if isinstance(item, h5py.Dataset) and item.ndim == 1: + columns[key] = item[()] + return columns + + +def _extract_generic(f: h5py.File) -> dict[str, np.ndarray]: + """Fallback: extract all 1D datasets from root level.""" + return _extract_1d_datasets(f) + + +def _extract_group(f: h5py.File, group: str) -> dict[str, np.ndarray]: + """Extract all 1D datasets from a specific group.""" + if group not in f: + raise KeyError(f"Group {group!r} not found in file") + return _extract_1d_datasets(f[group]) + + +_PRODUCT_EXTRACTORS = { + "spectrum": _extract_spectrum, + "device_data": _extract_device_data, +} + + +# --------------------------------------------------------------------------- +# CSV writer +# --------------------------------------------------------------------------- + + +def _write_csv(output_path: Path, columns: dict[str, np.ndarray]) -> None: + """Write column dict to CSV file.""" + if not columns: + raise ValueError("No tabular data found to export") + + headers = list(columns.keys()) + arrays = [columns[h] for h in headers] + n_rows = max(len(a) for a in arrays) + + output_path.parent.mkdir(parents=True, exist_ok=True) + + with output_path.open("w", newline="") as fh: + writer = csv_mod.writer(fh) + writer.writerow(headers) + for i in range(n_rows): + row = [str(a[i]) if i < len(a) else "" for a in arrays] + writer.writerow(row) diff --git a/src/fd5/export/nifti.py b/src/fd5/export/nifti.py new file mode 100644 index 0000000..2b47f39 --- /dev/null +++ b/src/fd5/export/nifti.py @@ -0,0 +1,95 @@ +"""fd5.export.nifti — Export recon/ndarray volumes to NIfTI format. + +Reads volume data + spatial metadata from an fd5 file and writes a +NIfTI-1 ``.nii.gz`` file preserving the affine transform. +Requires nibabel (optional dependency). +""" + +from __future__ import annotations + +from pathlib import Path + +import h5py +import numpy as np + + +def export_nifti( + fd5_path: str | Path, + output_path: str | Path, + *, + dataset: str = "volume", +) -> Path: + """Export a volume dataset from an fd5 file to NIfTI format. + + Parameters + ---------- + fd5_path: + Path to the source fd5 (``.h5``) file. + output_path: + Destination path for the NIfTI file (``.nii`` or ``.nii.gz``). + dataset: + HDF5 dataset path containing the volume data (default ``"volume"``). + + Returns + ------- + Path to the written NIfTI file. + """ + try: + import nibabel as nib + except ImportError: + raise ImportError( + "nibabel is required for NIfTI export. " + "Install it with: pip install 'fd5[nifti]'" + ) from None + + fd5_path = Path(fd5_path) + output_path = Path(output_path) + + with h5py.File(fd5_path, "r") as f: + if dataset not in f: + raise KeyError(f"Dataset {dataset!r} not found in {fd5_path}") + + data = f[dataset][()] + + affine = _read_affine(f) + dim_order = _read_dimension_order(f) + + data = _reorder_to_nifti(data, dim_order) + + img = nib.Nifti1Image(data, affine) + nib.save(img, output_path) + return output_path + + +def _read_affine(f: h5py.File) -> np.ndarray: + """Read affine from the fd5 file, falling back to identity.""" + if "affine" in f: + return np.asarray(f["affine"][()], dtype=np.float64) + if "affine" in f.attrs: + return np.asarray(f.attrs["affine"], dtype=np.float64).reshape(4, 4) + return np.eye(4, dtype=np.float64) + + +def _read_dimension_order(f: h5py.File) -> str: + """Read dimension_order attribute, defaulting to ZYX.""" + for source in (f, f.get("volume")): + if source is not None and "dimension_order" in getattr(source, "attrs", {}): + val = source.attrs["dimension_order"] + if isinstance(val, bytes): + val = val.decode("utf-8") + return val + return "ZYX" + + +def _reorder_to_nifti(data: np.ndarray, dim_order: str) -> np.ndarray: + """Reorder axes from fd5 dimension_order to NIfTI convention (XYZ[T]). + + fd5 stores volumes as ZYX (or TZYX for 4D). NIfTI expects the spatial + axes in XYZ order (fastest axis first). + """ + if dim_order in ("ZYX", "TZYX"): + n_spatial = 3 + n_extra = data.ndim - n_spatial + axes = list(range(n_extra)) + list(range(data.ndim - 1, n_extra - 1, -1)) + return np.transpose(data, axes) + return data diff --git a/src/fd5/export/parquet.py b/src/fd5/export/parquet.py new file mode 100644 index 0000000..4be9967 --- /dev/null +++ b/src/fd5/export/parquet.py @@ -0,0 +1,56 @@ +"""fd5.export.parquet — Export tabular/spectrum/timeseries data to Parquet. + +Same data extraction as the CSV exporter but writes via pyarrow. +Preserves column dtypes (float64, int64, string). +Requires pyarrow (optional dependency). +""" + +from __future__ import annotations + +from pathlib import Path + +from fd5.export.csv import extract_columns + + +def export_parquet( + fd5_path: str | Path, + output_path: str | Path, + *, + group: str | None = None, +) -> Path: + """Export tabular data from an fd5 file to Apache Parquet. + + Parameters + ---------- + fd5_path: + Path to the source fd5 (``.h5``) file. + output_path: + Destination path for the Parquet file. + group: + Optional HDF5 group path to export from. If *None*, the product + type is auto-detected from root attrs. + + Returns + ------- + Path to the written Parquet file. + """ + try: + import pyarrow as pa + import pyarrow.parquet as pq + except ImportError: + raise ImportError( + "pyarrow is required for Parquet export. " + "Install it with: pip install 'fd5[parquet]'" + ) from None + + output_path = Path(output_path) + columns = extract_columns(fd5_path, group=group) + + if not columns: + raise ValueError("No tabular data found to export") + + table = pa.table(dict(columns)) + + output_path.parent.mkdir(parents=True, exist_ok=True) + pq.write_table(table, output_path) + return output_path diff --git a/tests/test_export_csv.py b/tests/test_export_csv.py new file mode 100644 index 0000000..325a74b --- /dev/null +++ b/tests/test_export_csv.py @@ -0,0 +1,178 @@ +"""Tests for fd5.export.csv — CSV export from fd5 files.""" + +from __future__ import annotations + +import csv as csv_mod +from pathlib import Path + +import h5py +import numpy as np +import pytest + +from fd5.export.csv import export_csv + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def spectrum_fd5(tmp_path: Path) -> Path: + """Create a minimal fd5 spectrum file with counts + axes.""" + path = tmp_path / "spectrum.h5" + counts = np.array([10.0, 25.0, 18.0, 7.0], dtype=np.float32) + bin_edges = np.array([50.0, 150.0, 250.0, 350.0, 450.0], dtype=np.float64) + bin_centers = 0.5 * (bin_edges[:-1] + bin_edges[1:]) + + with h5py.File(path, "w") as f: + f.attrs["product"] = "spectrum" + f.create_dataset("counts", data=counts) + axes = f.create_group("axes") + ax0 = axes.create_group("ax0") + ax0.attrs["label"] = "energy" + ax0.attrs["units"] = "keV" + ax0.attrs["unitSI"] = 1.602e-16 + ax0.attrs["description"] = "Photon energy" + ax0.create_dataset("bin_edges", data=bin_edges) + ax0.create_dataset("bin_centers", data=bin_centers) + return path + + +@pytest.fixture() +def device_data_fd5(tmp_path: Path) -> Path: + """Create a minimal fd5 device_data file with two channels.""" + path = tmp_path / "device_data.h5" + time = np.array([0.0, 1.0, 2.0], dtype=np.float64) + temp = np.array([22.5, 22.6, 22.4], dtype=np.float64) + pressure = np.array([101.3, 101.2, 101.4], dtype=np.float64) + + with h5py.File(path, "w") as f: + f.attrs["product"] = "device_data" + channels = f.create_group("channels") + ch_temp = channels.create_group("temperature") + ch_temp.create_dataset("signal", data=temp) + ch_temp.create_dataset("time", data=time) + ch_press = channels.create_group("pressure") + ch_press.create_dataset("signal", data=pressure) + ch_press.create_dataset("time", data=time) + return path + + +@pytest.fixture() +def empty_fd5(tmp_path: Path) -> Path: + """Create an fd5 file with no exportable data.""" + path = tmp_path / "empty.h5" + with h5py.File(path, "w") as f: + f.attrs["product"] = "unknown_product" + return path + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +def test_export_spectrum_csv(spectrum_fd5: Path, tmp_path: Path) -> None: + """Spectrum export should produce CSV with energy + counts columns.""" + out = tmp_path / "spectrum.csv" + result = export_csv(spectrum_fd5, out) + + assert result == out + assert out.exists() + + with out.open() as fh: + reader = csv_mod.reader(fh) + rows = list(reader) + + headers = rows[0] + assert "energy" in headers + assert "counts" in headers + + # energy column comes before counts + assert headers.index("energy") < headers.index("counts") + + # Verify data values + data_rows = rows[1:] + assert len(data_rows) == 4 + counts_idx = headers.index("counts") + assert float(data_rows[0][counts_idx]) == pytest.approx(10.0) + + +def test_export_spectrum_with_errors(tmp_path: Path) -> None: + """Spectrum with counts_errors should include that column.""" + fd5_path = tmp_path / "spectrum_errors.h5" + counts = np.array([10.0, 25.0], dtype=np.float32) + errors = np.array([3.16, 5.0], dtype=np.float32) + + with h5py.File(fd5_path, "w") as f: + f.attrs["product"] = "spectrum" + f.create_dataset("counts", data=counts) + f.create_dataset("counts_errors", data=errors) + + out = tmp_path / "out.csv" + export_csv(fd5_path, out) + + with out.open() as fh: + reader = csv_mod.reader(fh) + rows = list(reader) + + assert "counts_errors" in rows[0] + + +def test_export_device_data_csv(device_data_fd5: Path, tmp_path: Path) -> None: + """Device data export should produce CSV with time + channel columns.""" + out = tmp_path / "device.csv" + result = export_csv(device_data_fd5, out) + + assert result == out + assert out.exists() + + with out.open() as fh: + reader = csv_mod.reader(fh) + rows = list(reader) + + headers = rows[0] + assert "time" in headers + assert "pressure" in headers + assert "temperature" in headers + + data_rows = rows[1:] + assert len(data_rows) == 3 + + +def test_export_empty_raises(empty_fd5: Path, tmp_path: Path) -> None: + """Exporting an fd5 file with no tabular data should raise ValueError.""" + out = tmp_path / "empty.csv" + with pytest.raises(ValueError, match="No tabular data"): + export_csv(empty_fd5, out) + + +def test_export_with_group(tmp_path: Path) -> None: + """Export from a specific group should work.""" + fd5_path = tmp_path / "grouped.h5" + with h5py.File(fd5_path, "w") as f: + grp = f.create_group("my_data") + grp.create_dataset("x", data=np.array([1.0, 2.0, 3.0])) + grp.create_dataset("y", data=np.array([4.0, 5.0, 6.0])) + + out = tmp_path / "grouped.csv" + export_csv(fd5_path, out, group="my_data") + + with out.open() as fh: + reader = csv_mod.reader(fh) + rows = list(reader) + + assert rows[0] == ["x", "y"] + assert len(rows) == 4 # header + 3 data rows + + +def test_export_missing_group_raises(tmp_path: Path) -> None: + """Requesting a non-existent group should raise KeyError.""" + fd5_path = tmp_path / "test.h5" + with h5py.File(fd5_path, "w") as f: + f.attrs["product"] = "spectrum" + + out = tmp_path / "out.csv" + with pytest.raises(KeyError, match="nonexistent"): + export_csv(fd5_path, out, group="nonexistent") diff --git a/tests/test_export_nifti.py b/tests/test_export_nifti.py new file mode 100644 index 0000000..3fe0d30 --- /dev/null +++ b/tests/test_export_nifti.py @@ -0,0 +1,126 @@ +"""Tests for fd5.export.nifti — NIfTI export from fd5 recon files.""" + +from __future__ import annotations + +from pathlib import Path + +import h5py +import nibabel as nib +import numpy as np +import pytest + +from fd5.export.nifti import export_nifti + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def recon_fd5_3d(tmp_path: Path) -> Path: + """Create a minimal fd5 file with a 3D volume (ZYX order).""" + path = tmp_path / "recon_3d.h5" + vol = np.arange(24, dtype=np.float32).reshape(2, 3, 4) + affine = np.diag([2.0, 2.0, 2.0, 1.0]) + with h5py.File(path, "w") as f: + f.attrs["product"] = "recon" + f.attrs["dimension_order"] = "ZYX" + f.create_dataset("volume", data=vol) + f.create_dataset("affine", data=affine) + return path + + +@pytest.fixture() +def recon_fd5_4d(tmp_path: Path) -> Path: + """Create a minimal fd5 file with a 4D volume (TZYX order).""" + path = tmp_path / "recon_4d.h5" + vol = np.arange(48, dtype=np.float32).reshape(2, 2, 3, 4) + affine = np.eye(4) + with h5py.File(path, "w") as f: + f.attrs["product"] = "recon" + f.attrs["dimension_order"] = "TZYX" + f.create_dataset("volume", data=vol) + f.create_dataset("affine", data=affine) + return path + + +@pytest.fixture() +def recon_fd5_no_affine(tmp_path: Path) -> Path: + """Create a minimal fd5 file with no affine (should default to eye(4)).""" + path = tmp_path / "recon_no_affine.h5" + vol = np.ones((3, 4, 5), dtype=np.float32) + with h5py.File(path, "w") as f: + f.attrs["product"] = "recon" + f.create_dataset("volume", data=vol) + return path + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +def test_export_3d_roundtrip(recon_fd5_3d: Path, tmp_path: Path) -> None: + """Exported 3D NIfTI should contain the same data and affine.""" + out = tmp_path / "output.nii.gz" + result = export_nifti(recon_fd5_3d, out) + + assert result == out + assert out.exists() + + img = nib.load(out) + data = np.asarray(img.dataobj) + # Original ZYX (2,3,4) reordered to XYZ (4,3,2) + assert data.shape == (4, 3, 2) + + # Verify affine preserved + np.testing.assert_allclose(img.affine, np.diag([2.0, 2.0, 2.0, 1.0])) + + +def test_export_4d_roundtrip(recon_fd5_4d: Path, tmp_path: Path) -> None: + """Exported 4D NIfTI should reorder TZYX to TXYZ.""" + out = tmp_path / "output_4d.nii.gz" + result = export_nifti(recon_fd5_4d, out) + + assert result == out + img = nib.load(out) + data = np.asarray(img.dataobj) + # TZYX (2,2,3,4) -> TXYZ (2,4,3,2) + assert data.shape == (2, 4, 3, 2) + + +def test_export_missing_affine_defaults_to_identity( + recon_fd5_no_affine: Path, tmp_path: Path +) -> None: + """When no affine is stored, default to np.eye(4).""" + out = tmp_path / "output_no_affine.nii.gz" + export_nifti(recon_fd5_no_affine, out) + + img = nib.load(out) + np.testing.assert_allclose(img.affine, np.eye(4)) + + +def test_export_missing_dataset_raises(recon_fd5_3d: Path, tmp_path: Path) -> None: + """Requesting a non-existent dataset should raise KeyError.""" + out = tmp_path / "output.nii.gz" + with pytest.raises(KeyError, match="nonexistent"): + export_nifti(recon_fd5_3d, out, dataset="nonexistent") + + +def test_export_custom_dataset(tmp_path: Path) -> None: + """Export from a non-default dataset path.""" + fd5_path = tmp_path / "custom.h5" + vol = np.ones((3, 4, 5), dtype=np.float32) * 42.0 + with h5py.File(fd5_path, "w") as f: + f.attrs["product"] = "recon" + f.attrs["dimension_order"] = "ZYX" + f.create_dataset("my_volume", data=vol) + + out = tmp_path / "output.nii.gz" + export_nifti(fd5_path, out, dataset="my_volume") + + img = nib.load(out) + data = np.asarray(img.dataobj) + assert data.shape == (5, 4, 3) + np.testing.assert_allclose(data, 42.0) diff --git a/tests/test_export_parquet.py b/tests/test_export_parquet.py new file mode 100644 index 0000000..494294a --- /dev/null +++ b/tests/test_export_parquet.py @@ -0,0 +1,130 @@ +"""Tests for fd5.export.parquet — Parquet export from fd5 files.""" + +from __future__ import annotations + +from pathlib import Path + +import h5py +import numpy as np +import pyarrow.parquet as pq +import pytest + +from fd5.export.parquet import export_parquet + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def spectrum_fd5(tmp_path: Path) -> Path: + """Create a minimal fd5 spectrum file.""" + path = tmp_path / "spectrum.h5" + counts = np.array([10.0, 25.0, 18.0, 7.0], dtype=np.float32) + bin_edges = np.array([50.0, 150.0, 250.0, 350.0, 450.0], dtype=np.float64) + bin_centers = 0.5 * (bin_edges[:-1] + bin_edges[1:]) + + with h5py.File(path, "w") as f: + f.attrs["product"] = "spectrum" + f.create_dataset("counts", data=counts) + axes = f.create_group("axes") + ax0 = axes.create_group("ax0") + ax0.attrs["label"] = "energy" + ax0.attrs["units"] = "keV" + ax0.attrs["unitSI"] = 1.602e-16 + ax0.attrs["description"] = "Photon energy" + ax0.create_dataset("bin_edges", data=bin_edges) + ax0.create_dataset("bin_centers", data=bin_centers) + return path + + +@pytest.fixture() +def device_data_fd5(tmp_path: Path) -> Path: + """Create a minimal fd5 device_data file.""" + path = tmp_path / "device_data.h5" + time = np.array([0.0, 1.0, 2.0], dtype=np.float64) + temp = np.array([22.5, 22.6, 22.4], dtype=np.float64) + + with h5py.File(path, "w") as f: + f.attrs["product"] = "device_data" + channels = f.create_group("channels") + ch = channels.create_group("temperature") + ch.create_dataset("signal", data=temp) + ch.create_dataset("time", data=time) + return path + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +def test_export_spectrum_parquet(spectrum_fd5: Path, tmp_path: Path) -> None: + """Spectrum export to Parquet should preserve column types and values.""" + out = tmp_path / "spectrum.parquet" + result = export_parquet(spectrum_fd5, out) + + assert result == out + assert out.exists() + + table = pq.read_table(out) + assert "energy" in table.column_names + assert "counts" in table.column_names + + counts = table.column("counts").to_numpy() + np.testing.assert_allclose(counts, [10.0, 25.0, 18.0, 7.0], atol=0.1) + + +def test_export_device_data_parquet(device_data_fd5: Path, tmp_path: Path) -> None: + """Device data export to Parquet should include time + channels.""" + out = tmp_path / "device.parquet" + export_parquet(device_data_fd5, out) + + table = pq.read_table(out) + assert "time" in table.column_names + assert "temperature" in table.column_names + assert len(table) == 3 + + +def test_export_empty_raises(tmp_path: Path) -> None: + """Exporting an fd5 file with no tabular data should raise ValueError.""" + fd5_path = tmp_path / "empty.h5" + with h5py.File(fd5_path, "w") as f: + f.attrs["product"] = "unknown_product" + + out = tmp_path / "empty.parquet" + with pytest.raises(ValueError, match="No tabular data"): + export_parquet(fd5_path, out) + + +def test_export_with_group(tmp_path: Path) -> None: + """Export from a specific group should work.""" + fd5_path = tmp_path / "grouped.h5" + with h5py.File(fd5_path, "w") as f: + grp = f.create_group("my_data") + grp.create_dataset("x", data=np.array([1.0, 2.0, 3.0])) + grp.create_dataset("y", data=np.array([4.0, 5.0, 6.0])) + + out = tmp_path / "grouped.parquet" + export_parquet(fd5_path, out, group="my_data") + + table = pq.read_table(out) + assert table.column_names == ["x", "y"] + assert len(table) == 3 + np.testing.assert_allclose(table.column("x").to_numpy(), [1.0, 2.0, 3.0]) + + +def test_export_preserves_float64_dtype(tmp_path: Path) -> None: + """Parquet export should preserve float64 precision.""" + fd5_path = tmp_path / "precise.h5" + vals = np.array([1.23456789012345, 9.87654321098765], dtype=np.float64) + with h5py.File(fd5_path, "w") as f: + f.attrs["product"] = "generic" + f.create_dataset("values", data=vals) + + out = tmp_path / "precise.parquet" + export_parquet(fd5_path, out) + + table = pq.read_table(out) + np.testing.assert_array_equal(table.column("values").to_numpy(), vals)