Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ nifti = [
parquet = [
"pyarrow>=14.0",
]
export = [
"nibabel>=4.0",
"pyarrow>=14.0",
]
all = [
"fd5[dev,science,dicom,nifti,parquet]",
"fd5[dev,science,dicom,nifti,parquet,export]",
]

[build-system]
Expand Down
58 changes: 58 additions & 0 deletions src/fd5/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,64 @@ def ingest_parquet(
sys.exit(1)


# ---------------------------------------------------------------------------
# fd5 export — subcommand group
# ---------------------------------------------------------------------------


@cli.group()
def export() -> None:
"""Export fd5 files to standard formats."""


@export.command("nifti")
@click.argument("fd5_file", type=click.Path(exists=True))
@click.option("-o", "--output", required=True, type=click.Path())
@click.option("--dataset", default="volume", help="Dataset path to export.")
def export_nifti_cmd(fd5_file: str, output: str, dataset: str) -> None:
"""Export volume data to NIfTI (.nii.gz)."""
from fd5.export.nifti import export_nifti

try:
path = export_nifti(fd5_file, output, dataset=dataset)
click.echo(f"Exported: {path}")
except (ImportError, KeyError, ValueError) as exc:
click.echo(f"Error: {exc}", err=True)
sys.exit(1)


@export.command("csv")
@click.argument("fd5_file", type=click.Path(exists=True))
@click.option("-o", "--output", required=True, type=click.Path())
@click.option("--group", default=None, help="HDF5 group path to export from.")
def export_csv_cmd(fd5_file: str, output: str, group: str | None) -> None:
"""Export tabular/timeseries/spectrum data to CSV."""
from fd5.export.csv import export_csv

try:
path = export_csv(fd5_file, output, group=group)
click.echo(f"Exported: {path}")
except (KeyError, ValueError) as exc:
click.echo(f"Error: {exc}", err=True)
sys.exit(1)


@export.command("parquet")
@click.argument("fd5_file", type=click.Path(exists=True))
@click.option("-o", "--output", required=True, type=click.Path())
@click.option("--group", default=None, help="HDF5 group path to export from.")
def export_parquet_cmd(fd5_file: str, output: str, group: str | None) -> None:
"""Export tabular/timeseries data to Parquet."""
from fd5.export.parquet import export_parquet

try:
path = export_parquet(fd5_file, output, group=group)
click.echo(f"Exported: {path}")
except (ImportError, KeyError, ValueError) as exc:
click.echo(f"Error: {exc}", err=True)
sys.exit(1)


# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
Expand Down
9 changes: 9 additions & 0 deletions src/fd5/export/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""fd5.export — Export fd5 files to standard formats (NIfTI, CSV, Parquet)."""

from __future__ import annotations

from fd5.export.csv import export_csv
from fd5.export.nifti import export_nifti
from fd5.export.parquet import export_parquet

__all__ = ["export_csv", "export_nifti", "export_parquet"]
186 changes: 186 additions & 0 deletions src/fd5/export/csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
"""fd5.export.csv — Export tabular/spectrum/timeseries data to CSV.

Reads product data from an fd5 file and writes a standard CSV file.
Supports product types: spectrum, device_data, and generic tabular data.
"""

from __future__ import annotations

import csv as csv_mod
from pathlib import Path

import h5py
import numpy as np


def extract_columns(
fd5_path: str | Path,
*,
group: str | None = None,
) -> dict[str, np.ndarray]:
"""Read tabular column data from an fd5 file.

Shared by :func:`export_csv` and :func:`~fd5.export.parquet.export_parquet`.
"""
fd5_path = Path(fd5_path)
with h5py.File(fd5_path, "r") as f:
if group is not None:
return _extract_group(f, group)
product = _read_product(f)
return _PRODUCT_EXTRACTORS.get(product, _extract_generic)(f)


def export_csv(
fd5_path: str | Path,
output_path: str | Path,
*,
group: str | None = None,
) -> Path:
"""Export tabular data from an fd5 file to CSV.

Parameters
----------
fd5_path:
Path to the source fd5 (``.h5``) file.
output_path:
Destination path for the CSV file.
group:
Optional HDF5 group path to export from. If *None*, the product
type is auto-detected from root attrs.

Returns
-------
Path to the written CSV file.
"""
output_path = Path(output_path)
columns = extract_columns(fd5_path, group=group)
_write_csv(output_path, columns)
return output_path


# ---------------------------------------------------------------------------
# Product-type detection
# ---------------------------------------------------------------------------


def _read_product(f: h5py.File) -> str:
"""Read the product root attribute."""
val = f.attrs.get("product", "")
if isinstance(val, bytes):
val = val.decode("utf-8")
return val


# ---------------------------------------------------------------------------
# Data extraction per product type
# ---------------------------------------------------------------------------


def _extract_spectrum(f: h5py.File) -> dict[str, np.ndarray]:
"""Extract spectrum data: bin_centers + counts (+ counts_errors)."""
columns: dict[str, np.ndarray] = {}

if "counts" in f:
counts = f["counts"][()]
columns["counts"] = counts.ravel()

if "counts_errors" in f:
columns["counts_errors"] = f["counts_errors"][()].ravel()

# Extract bin_centers from the first axis
if "axes" in f:
axes_grp = f["axes"]
for ax_name in sorted(axes_grp.keys()):
ax = axes_grp[ax_name]
if "bin_centers" in ax:
label = ax.attrs.get("label", ax_name)
if isinstance(label, bytes):
label = label.decode("utf-8")
columns[label] = ax["bin_centers"][()]

# Reorder so axis columns come first
reordered: dict[str, np.ndarray] = {}
for key in columns:
if key not in ("counts", "counts_errors"):
reordered[key] = columns[key]
for key in ("counts", "counts_errors"):
if key in columns:
reordered[key] = columns[key]

return reordered


def _extract_device_data(f: h5py.File) -> dict[str, np.ndarray]:
"""Extract device_data: time + signal per channel."""
columns: dict[str, np.ndarray] = {}

if "channels" not in f:
return columns

channels_grp = f["channels"]
time_written = False

for ch_name in sorted(channels_grp.keys()):
ch = channels_grp[ch_name]

# Write time column from the first channel only
if not time_written and "time" in ch:
columns["time"] = ch["time"][()]
time_written = True

if "signal" in ch:
columns[ch_name] = ch["signal"][()]

return columns


def _extract_1d_datasets(group: h5py.Group) -> dict[str, np.ndarray]:
"""Extract all 1D datasets from an HDF5 group."""
columns: dict[str, np.ndarray] = {}
for key in sorted(group.keys()):
item = group[key]
if isinstance(item, h5py.Dataset) and item.ndim == 1:
columns[key] = item[()]
return columns


def _extract_generic(f: h5py.File) -> dict[str, np.ndarray]:
"""Fallback: extract all 1D datasets from root level."""
return _extract_1d_datasets(f)


def _extract_group(f: h5py.File, group: str) -> dict[str, np.ndarray]:
"""Extract all 1D datasets from a specific group."""
if group not in f:
raise KeyError(f"Group {group!r} not found in file")
return _extract_1d_datasets(f[group])


_PRODUCT_EXTRACTORS = {
"spectrum": _extract_spectrum,
"device_data": _extract_device_data,
}


# ---------------------------------------------------------------------------
# CSV writer
# ---------------------------------------------------------------------------


def _write_csv(output_path: Path, columns: dict[str, np.ndarray]) -> None:
"""Write column dict to CSV file."""
if not columns:
raise ValueError("No tabular data found to export")

headers = list(columns.keys())
arrays = [columns[h] for h in headers]
n_rows = max(len(a) for a in arrays)

output_path.parent.mkdir(parents=True, exist_ok=True)

with output_path.open("w", newline="") as fh:
writer = csv_mod.writer(fh)
writer.writerow(headers)
for i in range(n_rows):
row = [str(a[i]) if i < len(a) else "" for a in arrays]
writer.writerow(row)
95 changes: 95 additions & 0 deletions src/fd5/export/nifti.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""fd5.export.nifti — Export recon/ndarray volumes to NIfTI format.

Reads volume data + spatial metadata from an fd5 file and writes a
NIfTI-1 ``.nii.gz`` file preserving the affine transform.
Requires nibabel (optional dependency).
"""

from __future__ import annotations

from pathlib import Path

import h5py
import numpy as np


def export_nifti(
fd5_path: str | Path,
output_path: str | Path,
*,
dataset: str = "volume",
) -> Path:
"""Export a volume dataset from an fd5 file to NIfTI format.

Parameters
----------
fd5_path:
Path to the source fd5 (``.h5``) file.
output_path:
Destination path for the NIfTI file (``.nii`` or ``.nii.gz``).
dataset:
HDF5 dataset path containing the volume data (default ``"volume"``).

Returns
-------
Path to the written NIfTI file.
"""
try:
import nibabel as nib
except ImportError:
raise ImportError(
"nibabel is required for NIfTI export. "
"Install it with: pip install 'fd5[nifti]'"
) from None

fd5_path = Path(fd5_path)
output_path = Path(output_path)

with h5py.File(fd5_path, "r") as f:
if dataset not in f:
raise KeyError(f"Dataset {dataset!r} not found in {fd5_path}")

data = f[dataset][()]

affine = _read_affine(f)
dim_order = _read_dimension_order(f)

data = _reorder_to_nifti(data, dim_order)

img = nib.Nifti1Image(data, affine)
nib.save(img, output_path)
return output_path


def _read_affine(f: h5py.File) -> np.ndarray:
"""Read affine from the fd5 file, falling back to identity."""
if "affine" in f:
return np.asarray(f["affine"][()], dtype=np.float64)
if "affine" in f.attrs:
return np.asarray(f.attrs["affine"], dtype=np.float64).reshape(4, 4)
return np.eye(4, dtype=np.float64)


def _read_dimension_order(f: h5py.File) -> str:
"""Read dimension_order attribute, defaulting to ZYX."""
for source in (f, f.get("volume")):
if source is not None and "dimension_order" in getattr(source, "attrs", {}):
val = source.attrs["dimension_order"]
if isinstance(val, bytes):
val = val.decode("utf-8")
return val
return "ZYX"


def _reorder_to_nifti(data: np.ndarray, dim_order: str) -> np.ndarray:
"""Reorder axes from fd5 dimension_order to NIfTI convention (XYZ[T]).

fd5 stores volumes as ZYX (or TZYX for 4D). NIfTI expects the spatial
axes in XYZ order (fastest axis first).
"""
if dim_order in ("ZYX", "TZYX"):
n_spatial = 3
n_extra = data.ndim - n_spatial
axes = list(range(n_extra)) + list(range(data.ndim - 1, n_extra - 1, -1))
return np.transpose(data, axes)
return data
Loading
Loading