Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion src/fd5/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def check_descriptions_cmd(file: str) -> None:
# fd5 ingest — subcommand group
# ---------------------------------------------------------------------------

_ALL_LOADER_NAMES = ("raw", "csv", "nifti", "dicom")
_ALL_LOADER_NAMES = ("raw", "csv", "nifti", "dicom", "parquet")


def _ingest_binary(
Expand Down Expand Up @@ -253,6 +253,13 @@ def _get_dicom_loader(): # type: ignore[no-untyped-def]
return DicomLoader()


def _get_parquet_loader(): # type: ignore[no-untyped-def]
"""Lazy-import the ParquetLoader so missing pyarrow is caught at call time."""
from fd5.ingest.parquet import ParquetLoader

return ParquetLoader()


@cli.group()
def ingest() -> None:
"""Ingest external data formats into sealed fd5 files."""
Expand Down Expand Up @@ -433,6 +440,59 @@ def ingest_dicom(
sys.exit(1)


@ingest.command("parquet")
@click.argument("source", type=click.Path(exists=True, dir_okay=False))
@click.option(
"--output", "-o", type=click.Path(), required=True, help="Output directory."
)
@click.option("--name", required=True, help="Human-readable name.")
@click.option("--description", required=True, help="Description for AI-readability.")
@click.option("--product", required=True, help="Product type (e.g. spectrum).")
@click.option("--timestamp", default=None, help="Override ISO-8601 timestamp.")
@click.option(
"--column-map",
default=None,
help="JSON string mapping source columns to fd5 columns.",
)
def ingest_parquet(
source: str,
output: str,
name: str,
description: str,
product: str,
timestamp: str | None,
column_map: str | None,
) -> None:
"""Ingest a Parquet file into a sealed fd5 file."""
try:
loader = _get_parquet_loader()
except ImportError:
click.echo(
"Error: pyarrow is not installed. Install with: pip install 'fd5[parquet]'",
err=True,
)
sys.exit(1)

parsed_map: dict[str, str] | None = None
if column_map is not None:
parsed_map = json.loads(column_map)

try:
result = loader.ingest(
Path(source),
Path(output),
product=product,
name=name,
description=description,
timestamp=timestamp,
column_map=parsed_map,
)
click.echo(f"Ingested {Path(source).name} \u2192 {result}")
except (ValueError, FileNotFoundError) as exc:
click.echo(f"Error: {exc}", err=True)
sys.exit(1)


# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
Expand Down
169 changes: 168 additions & 1 deletion tests/test_ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_ingest_help_exits_zero(self, runner: CliRunner):

def test_ingest_help_lists_subcommands(self, runner: CliRunner):
result = runner.invoke(cli, ["ingest", "--help"])
for sub in ("raw", "nifti", "csv", "list"):
for sub in ("raw", "nifti", "csv", "list", "parquet"):
assert sub in result.output

def test_ingest_appears_in_main_help(self, runner: CliRunner):
Expand Down Expand Up @@ -503,3 +503,170 @@ def test_missing_pydicom_shows_error(self, runner: CliRunner, tmp_path: Path):
)
assert result.exit_code != 0
assert "pydicom" in result.output.lower() or "install" in result.output.lower()


# ---------------------------------------------------------------------------
# fd5 ingest parquet
# ---------------------------------------------------------------------------


@pytest.fixture()
def parquet_file(tmp_path: Path) -> Path:
"""Create a minimal Parquet file with energy + counts columns."""
import pyarrow as pa
import pyarrow.parquet as pq

table = pa.table({"energy": [1.0, 2.0, 3.0], "counts": [100, 200, 300]})
path = tmp_path / "spectrum.parquet"
pq.write_table(table, path)
return path


class TestIngestParquet:
def test_exits_zero_with_mock(self, runner: CliRunner, tmp_path: Path):
mock_loader = MagicMock()
fake_h5 = tmp_path / "out" / "result.h5"
(tmp_path / "out").mkdir()
fake_h5.touch()
mock_loader.ingest.return_value = fake_h5

pq_file = tmp_path / "data.parquet"
pq_file.touch()

with patch("fd5.cli._get_parquet_loader", return_value=mock_loader):
result = runner.invoke(
cli,
[
"ingest",
"parquet",
str(pq_file),
"--output",
str(tmp_path / "out"),
"--name",
"Gamma spectrum",
"--description",
"HPGe detector measurement",
"--product",
"spectrum",
],
)
assert result.exit_code == 0, result.output

def test_prints_confirmation(self, runner: CliRunner, tmp_path: Path):
mock_loader = MagicMock()
fake_h5 = tmp_path / "out" / "result.h5"
(tmp_path / "out").mkdir()
fake_h5.touch()
mock_loader.ingest.return_value = fake_h5

pq_file = tmp_path / "data.parquet"
pq_file.touch()

with patch("fd5.cli._get_parquet_loader", return_value=mock_loader):
result = runner.invoke(
cli,
[
"ingest",
"parquet",
str(pq_file),
"--output",
str(tmp_path / "out"),
"--name",
"Test",
"--description",
"desc",
"--product",
"spectrum",
],
)
assert "ingested" in result.output.lower() or ".h5" in result.output.lower()

def test_passes_column_map(self, runner: CliRunner, tmp_path: Path):
mock_loader = MagicMock()
fake_h5 = tmp_path / "out" / "result.h5"
(tmp_path / "out").mkdir()
fake_h5.touch()
mock_loader.ingest.return_value = fake_h5

pq_file = tmp_path / "data.parquet"
pq_file.touch()

col_map = '{"en": "energy", "ct": "counts"}'
with patch("fd5.cli._get_parquet_loader", return_value=mock_loader):
result = runner.invoke(
cli,
[
"ingest",
"parquet",
str(pq_file),
"--output",
str(tmp_path / "out"),
"--name",
"Test",
"--description",
"desc",
"--product",
"spectrum",
"--column-map",
col_map,
],
)
assert result.exit_code == 0, result.output
_, kwargs = mock_loader.ingest.call_args
assert kwargs["column_map"] == {"en": "energy", "ct": "counts"}

def test_missing_pyarrow_shows_error(self, runner: CliRunner, tmp_path: Path):
pq_file = tmp_path / "data.parquet"
pq_file.touch()
out = tmp_path / "out"
out.mkdir()

with patch(
"fd5.cli._get_parquet_loader",
side_effect=ImportError("no pyarrow"),
):
result = runner.invoke(
cli,
[
"ingest",
"parquet",
str(pq_file),
"--output",
str(out),
"--name",
"Test",
"--description",
"desc",
"--product",
"spectrum",
],
)
assert result.exit_code != 0
assert "pyarrow" in result.output.lower() or "install" in result.output.lower()

def test_nonexistent_source_exits_nonzero(self, runner: CliRunner, tmp_path: Path):
result = runner.invoke(
cli,
[
"ingest",
"parquet",
str(tmp_path / "ghost.parquet"),
"--output",
str(tmp_path),
"--name",
"x",
"--description",
"x",
"--product",
"spectrum",
],
)
assert result.exit_code != 0

def test_ingest_list_shows_parquet(self, runner: CliRunner):
with patch(
"fd5.cli.discover_loaders",
return_value={"parquet": MagicMock()},
):
result = runner.invoke(cli, ["ingest", "list"])
assert "parquet" in result.output
44 changes: 44 additions & 0 deletions tests/test_ingest_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,30 @@ def test_string_source_path(
# ---------------------------------------------------------------------------


class TestIdempotency:
"""Calling ingest twice with identical inputs produces two valid, independently sealed files."""

def test_deterministic(
self, loader: CsvLoader, spectrum_csv: Path, tmp_path: Path
):
kwargs = dict(
product="spectrum",
name="idem-spectrum",
description="Idempotency test",
timestamp="2026-02-25T12:00:00+00:00",
)
r1 = loader.ingest(spectrum_csv, tmp_path / "a", **kwargs)
r2 = loader.ingest(spectrum_csv, tmp_path / "b", **kwargs)

assert r1.exists() and r2.exists()
assert r1.suffix == ".h5" and r2.suffix == ".h5"
with h5py.File(r1, "r") as f1, h5py.File(r2, "r") as f2:
assert f1.attrs["id"] == f2.attrs["id"]
assert "content_hash" in f1.attrs
assert "content_hash" in f2.attrs
np.testing.assert_array_equal(f1["counts"][:], f2["counts"][:])


class TestGenericProduct:
"""Generic product: user specifies product type + column mapping."""

Expand All @@ -482,3 +506,23 @@ def test_generic_ingest(self, loader: CsvLoader, tmp_path: Path):
assert "counts" in f
counts = f["counts"][:]
np.testing.assert_array_almost_equal(counts, [2.0, 5.0])


class TestFd5Validate:
"""Smoke test: fd5.schema.validate() on CsvLoader output."""

def test_spectrum_passes_validate(
self, loader: CsvLoader, spectrum_csv: Path, tmp_path: Path
):
from fd5.schema import validate

result = loader.ingest(
spectrum_csv,
tmp_path / "out",
product="spectrum",
name="Validate spectrum",
description="Validate smoke test",
timestamp="2026-02-25T12:00:00+00:00",
)
errors = validate(result)
assert errors == [], [e.message for e in errors]
24 changes: 24 additions & 0 deletions tests/test_ingest_dicom.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,30 @@ def test_output_passes_fd5_validate(self, tmp_path):
# ---------------------------------------------------------------------------


class TestIdempotency:
"""Calling ingest twice with identical inputs produces two valid, independently sealed files."""

def test_deterministic(self, tmp_path):
from fd5.ingest.dicom import ingest_dicom

dicom_dir = _make_dicom_series(tmp_path)
kwargs = dict(
name="idem-ct",
description="Idempotency test",
timestamp="2024-06-15T08:00:00",
)
r1 = ingest_dicom(dicom_dir, tmp_path / "a", **kwargs)
r2 = ingest_dicom(dicom_dir, tmp_path / "b", **kwargs)

assert r1.exists() and r2.exists()
assert r1.suffix == ".h5" and r2.suffix == ".h5"
with h5py.File(r1, "r") as f1, h5py.File(r2, "r") as f2:
assert f1.attrs["id"] == f2.attrs["id"]
assert "content_hash" in f1.attrs
assert "content_hash" in f2.attrs
np.testing.assert_array_equal(f1["volume"][:], f2["volume"][:])


class TestDicomLoaderIngest:
def test_loader_ingest_produces_file(self, tmp_path):
from fd5.ingest.dicom import DicomLoader
Expand Down
37 changes: 37 additions & 0 deletions tests/test_ingest_nifti.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,27 @@ def test_invalid_file(self, tmp_path: Path):
# ---------------------------------------------------------------------------


class TestIdempotency:
"""Calling ingest twice with identical inputs produces two valid, independently sealed files."""

def test_deterministic(self, nifti_3d: Path, tmp_path: Path):
kwargs = dict(
name="idem-vol",
description="Idempotency test",
timestamp="2025-01-15T10:30:00Z",
)
r1 = ingest_nifti(nifti_3d, tmp_path / "a", **kwargs)
r2 = ingest_nifti(nifti_3d, tmp_path / "b", **kwargs)

assert r1.exists() and r2.exists()
assert r1.suffix == ".h5" and r2.suffix == ".h5"
with h5py.File(r1, "r") as f1, h5py.File(r2, "r") as f2:
assert f1.attrs["id"] == f2.attrs["id"]
assert "content_hash" in f1.attrs
assert "content_hash" in f2.attrs
np.testing.assert_array_equal(f1["volume"][:], f2["volume"][:])


class TestNiftiLoaderIngest:
def test_ingest_method(self, nifti_3d: Path, tmp_path: Path):
loader = NiftiLoader()
Expand Down Expand Up @@ -393,3 +414,19 @@ def test_clear_message_when_nibabel_missing(self):
import fd5.ingest.nifti as mod

importlib.reload(mod)


class TestFd5Validate:
"""Smoke test: fd5.schema.validate() on ingest_nifti output."""

def test_nifti_passes_validate(self, nifti_3d: Path, tmp_path: Path):
from fd5.schema import validate

result = ingest_nifti(
nifti_3d,
tmp_path / "out",
name="validate-nifti",
description="Validate smoke test",
)
errors = validate(result)
assert errors == [], [e.message for e in errors]
Loading
Loading