diff --git a/src/fd5/cli.py b/src/fd5/cli.py index 40df521..7ef85b3 100644 --- a/src/fd5/cli.py +++ b/src/fd5/cli.py @@ -225,7 +225,7 @@ def check_descriptions_cmd(file: str) -> None: # fd5 ingest — subcommand group # --------------------------------------------------------------------------- -_ALL_LOADER_NAMES = ("raw", "csv", "nifti", "dicom") +_ALL_LOADER_NAMES = ("raw", "csv", "nifti", "dicom", "parquet") def _ingest_binary( @@ -253,6 +253,13 @@ def _get_dicom_loader(): # type: ignore[no-untyped-def] return DicomLoader() +def _get_parquet_loader(): # type: ignore[no-untyped-def] + """Lazy-import the ParquetLoader so missing pyarrow is caught at call time.""" + from fd5.ingest.parquet import ParquetLoader + + return ParquetLoader() + + @cli.group() def ingest() -> None: """Ingest external data formats into sealed fd5 files.""" @@ -433,6 +440,59 @@ def ingest_dicom( sys.exit(1) +@ingest.command("parquet") +@click.argument("source", type=click.Path(exists=True, dir_okay=False)) +@click.option( + "--output", "-o", type=click.Path(), required=True, help="Output directory." +) +@click.option("--name", required=True, help="Human-readable name.") +@click.option("--description", required=True, help="Description for AI-readability.") +@click.option("--product", required=True, help="Product type (e.g. spectrum).") +@click.option("--timestamp", default=None, help="Override ISO-8601 timestamp.") +@click.option( + "--column-map", + default=None, + help="JSON string mapping source columns to fd5 columns.", +) +def ingest_parquet( + source: str, + output: str, + name: str, + description: str, + product: str, + timestamp: str | None, + column_map: str | None, +) -> None: + """Ingest a Parquet file into a sealed fd5 file.""" + try: + loader = _get_parquet_loader() + except ImportError: + click.echo( + "Error: pyarrow is not installed. Install with: pip install 'fd5[parquet]'", + err=True, + ) + sys.exit(1) + + parsed_map: dict[str, str] | None = None + if column_map is not None: + parsed_map = json.loads(column_map) + + try: + result = loader.ingest( + Path(source), + Path(output), + product=product, + name=name, + description=description, + timestamp=timestamp, + column_map=parsed_map, + ) + click.echo(f"Ingested {Path(source).name} \u2192 {result}") + except (ValueError, FileNotFoundError) as exc: + click.echo(f"Error: {exc}", err=True) + sys.exit(1) + + # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- diff --git a/tests/test_ingest_cli.py b/tests/test_ingest_cli.py index f37e0b6..054d512 100644 --- a/tests/test_ingest_cli.py +++ b/tests/test_ingest_cli.py @@ -51,7 +51,7 @@ def test_ingest_help_exits_zero(self, runner: CliRunner): def test_ingest_help_lists_subcommands(self, runner: CliRunner): result = runner.invoke(cli, ["ingest", "--help"]) - for sub in ("raw", "nifti", "csv", "list"): + for sub in ("raw", "nifti", "csv", "list", "parquet"): assert sub in result.output def test_ingest_appears_in_main_help(self, runner: CliRunner): @@ -503,3 +503,170 @@ def test_missing_pydicom_shows_error(self, runner: CliRunner, tmp_path: Path): ) assert result.exit_code != 0 assert "pydicom" in result.output.lower() or "install" in result.output.lower() + + +# --------------------------------------------------------------------------- +# fd5 ingest parquet +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def parquet_file(tmp_path: Path) -> Path: + """Create a minimal Parquet file with energy + counts columns.""" + import pyarrow as pa + import pyarrow.parquet as pq + + table = pa.table({"energy": [1.0, 2.0, 3.0], "counts": [100, 200, 300]}) + path = tmp_path / "spectrum.parquet" + pq.write_table(table, path) + return path + + +class TestIngestParquet: + def test_exits_zero_with_mock(self, runner: CliRunner, tmp_path: Path): + mock_loader = MagicMock() + fake_h5 = tmp_path / "out" / "result.h5" + (tmp_path / "out").mkdir() + fake_h5.touch() + mock_loader.ingest.return_value = fake_h5 + + pq_file = tmp_path / "data.parquet" + pq_file.touch() + + with patch("fd5.cli._get_parquet_loader", return_value=mock_loader): + result = runner.invoke( + cli, + [ + "ingest", + "parquet", + str(pq_file), + "--output", + str(tmp_path / "out"), + "--name", + "Gamma spectrum", + "--description", + "HPGe detector measurement", + "--product", + "spectrum", + ], + ) + assert result.exit_code == 0, result.output + + def test_prints_confirmation(self, runner: CliRunner, tmp_path: Path): + mock_loader = MagicMock() + fake_h5 = tmp_path / "out" / "result.h5" + (tmp_path / "out").mkdir() + fake_h5.touch() + mock_loader.ingest.return_value = fake_h5 + + pq_file = tmp_path / "data.parquet" + pq_file.touch() + + with patch("fd5.cli._get_parquet_loader", return_value=mock_loader): + result = runner.invoke( + cli, + [ + "ingest", + "parquet", + str(pq_file), + "--output", + str(tmp_path / "out"), + "--name", + "Test", + "--description", + "desc", + "--product", + "spectrum", + ], + ) + assert "ingested" in result.output.lower() or ".h5" in result.output.lower() + + def test_passes_column_map(self, runner: CliRunner, tmp_path: Path): + mock_loader = MagicMock() + fake_h5 = tmp_path / "out" / "result.h5" + (tmp_path / "out").mkdir() + fake_h5.touch() + mock_loader.ingest.return_value = fake_h5 + + pq_file = tmp_path / "data.parquet" + pq_file.touch() + + col_map = '{"en": "energy", "ct": "counts"}' + with patch("fd5.cli._get_parquet_loader", return_value=mock_loader): + result = runner.invoke( + cli, + [ + "ingest", + "parquet", + str(pq_file), + "--output", + str(tmp_path / "out"), + "--name", + "Test", + "--description", + "desc", + "--product", + "spectrum", + "--column-map", + col_map, + ], + ) + assert result.exit_code == 0, result.output + _, kwargs = mock_loader.ingest.call_args + assert kwargs["column_map"] == {"en": "energy", "ct": "counts"} + + def test_missing_pyarrow_shows_error(self, runner: CliRunner, tmp_path: Path): + pq_file = tmp_path / "data.parquet" + pq_file.touch() + out = tmp_path / "out" + out.mkdir() + + with patch( + "fd5.cli._get_parquet_loader", + side_effect=ImportError("no pyarrow"), + ): + result = runner.invoke( + cli, + [ + "ingest", + "parquet", + str(pq_file), + "--output", + str(out), + "--name", + "Test", + "--description", + "desc", + "--product", + "spectrum", + ], + ) + assert result.exit_code != 0 + assert "pyarrow" in result.output.lower() or "install" in result.output.lower() + + def test_nonexistent_source_exits_nonzero(self, runner: CliRunner, tmp_path: Path): + result = runner.invoke( + cli, + [ + "ingest", + "parquet", + str(tmp_path / "ghost.parquet"), + "--output", + str(tmp_path), + "--name", + "x", + "--description", + "x", + "--product", + "spectrum", + ], + ) + assert result.exit_code != 0 + + def test_ingest_list_shows_parquet(self, runner: CliRunner): + with patch( + "fd5.cli.discover_loaders", + return_value={"parquet": MagicMock()}, + ): + result = runner.invoke(cli, ["ingest", "list"]) + assert "parquet" in result.output diff --git a/tests/test_ingest_csv.py b/tests/test_ingest_csv.py index 9146495..878c646 100644 --- a/tests/test_ingest_csv.py +++ b/tests/test_ingest_csv.py @@ -463,6 +463,30 @@ def test_string_source_path( # --------------------------------------------------------------------------- +class TestIdempotency: + """Calling ingest twice with identical inputs produces two valid, independently sealed files.""" + + def test_deterministic( + self, loader: CsvLoader, spectrum_csv: Path, tmp_path: Path + ): + kwargs = dict( + product="spectrum", + name="idem-spectrum", + description="Idempotency test", + timestamp="2026-02-25T12:00:00+00:00", + ) + r1 = loader.ingest(spectrum_csv, tmp_path / "a", **kwargs) + r2 = loader.ingest(spectrum_csv, tmp_path / "b", **kwargs) + + assert r1.exists() and r2.exists() + assert r1.suffix == ".h5" and r2.suffix == ".h5" + with h5py.File(r1, "r") as f1, h5py.File(r2, "r") as f2: + assert f1.attrs["id"] == f2.attrs["id"] + assert "content_hash" in f1.attrs + assert "content_hash" in f2.attrs + np.testing.assert_array_equal(f1["counts"][:], f2["counts"][:]) + + class TestGenericProduct: """Generic product: user specifies product type + column mapping.""" @@ -482,3 +506,23 @@ def test_generic_ingest(self, loader: CsvLoader, tmp_path: Path): assert "counts" in f counts = f["counts"][:] np.testing.assert_array_almost_equal(counts, [2.0, 5.0]) + + +class TestFd5Validate: + """Smoke test: fd5.schema.validate() on CsvLoader output.""" + + def test_spectrum_passes_validate( + self, loader: CsvLoader, spectrum_csv: Path, tmp_path: Path + ): + from fd5.schema import validate + + result = loader.ingest( + spectrum_csv, + tmp_path / "out", + product="spectrum", + name="Validate spectrum", + description="Validate smoke test", + timestamp="2026-02-25T12:00:00+00:00", + ) + errors = validate(result) + assert errors == [], [e.message for e in errors] diff --git a/tests/test_ingest_dicom.py b/tests/test_ingest_dicom.py index 906e429..2d7ad96 100644 --- a/tests/test_ingest_dicom.py +++ b/tests/test_ingest_dicom.py @@ -500,6 +500,30 @@ def test_output_passes_fd5_validate(self, tmp_path): # --------------------------------------------------------------------------- +class TestIdempotency: + """Calling ingest twice with identical inputs produces two valid, independently sealed files.""" + + def test_deterministic(self, tmp_path): + from fd5.ingest.dicom import ingest_dicom + + dicom_dir = _make_dicom_series(tmp_path) + kwargs = dict( + name="idem-ct", + description="Idempotency test", + timestamp="2024-06-15T08:00:00", + ) + r1 = ingest_dicom(dicom_dir, tmp_path / "a", **kwargs) + r2 = ingest_dicom(dicom_dir, tmp_path / "b", **kwargs) + + assert r1.exists() and r2.exists() + assert r1.suffix == ".h5" and r2.suffix == ".h5" + with h5py.File(r1, "r") as f1, h5py.File(r2, "r") as f2: + assert f1.attrs["id"] == f2.attrs["id"] + assert "content_hash" in f1.attrs + assert "content_hash" in f2.attrs + np.testing.assert_array_equal(f1["volume"][:], f2["volume"][:]) + + class TestDicomLoaderIngest: def test_loader_ingest_produces_file(self, tmp_path): from fd5.ingest.dicom import DicomLoader diff --git a/tests/test_ingest_nifti.py b/tests/test_ingest_nifti.py index cf3b62d..dd3824f 100644 --- a/tests/test_ingest_nifti.py +++ b/tests/test_ingest_nifti.py @@ -364,6 +364,27 @@ def test_invalid_file(self, tmp_path: Path): # --------------------------------------------------------------------------- +class TestIdempotency: + """Calling ingest twice with identical inputs produces two valid, independently sealed files.""" + + def test_deterministic(self, nifti_3d: Path, tmp_path: Path): + kwargs = dict( + name="idem-vol", + description="Idempotency test", + timestamp="2025-01-15T10:30:00Z", + ) + r1 = ingest_nifti(nifti_3d, tmp_path / "a", **kwargs) + r2 = ingest_nifti(nifti_3d, tmp_path / "b", **kwargs) + + assert r1.exists() and r2.exists() + assert r1.suffix == ".h5" and r2.suffix == ".h5" + with h5py.File(r1, "r") as f1, h5py.File(r2, "r") as f2: + assert f1.attrs["id"] == f2.attrs["id"] + assert "content_hash" in f1.attrs + assert "content_hash" in f2.attrs + np.testing.assert_array_equal(f1["volume"][:], f2["volume"][:]) + + class TestNiftiLoaderIngest: def test_ingest_method(self, nifti_3d: Path, tmp_path: Path): loader = NiftiLoader() @@ -393,3 +414,19 @@ def test_clear_message_when_nibabel_missing(self): import fd5.ingest.nifti as mod importlib.reload(mod) + + +class TestFd5Validate: + """Smoke test: fd5.schema.validate() on ingest_nifti output.""" + + def test_nifti_passes_validate(self, nifti_3d: Path, tmp_path: Path): + from fd5.schema import validate + + result = ingest_nifti( + nifti_3d, + tmp_path / "out", + name="validate-nifti", + description="Validate smoke test", + ) + errors = validate(result) + assert errors == [], [e.message for e in errors] diff --git a/tests/test_ingest_parquet.py b/tests/test_ingest_parquet.py index 18512a4..2831f4d 100644 --- a/tests/test_ingest_parquet.py +++ b/tests/test_ingest_parquet.py @@ -423,6 +423,30 @@ def test_provenance_ingest_group( # --------------------------------------------------------------------------- +class TestIdempotency: + """Calling ingest twice with identical inputs produces two valid, independently sealed files.""" + + def test_deterministic( + self, loader: ParquetLoader, spectrum_parquet: Path, tmp_path: Path + ): + kwargs = dict( + product="spectrum", + name="idem-spectrum", + description="Idempotency test", + timestamp="2026-02-25T12:00:00+00:00", + ) + r1 = loader.ingest(spectrum_parquet, tmp_path / "a", **kwargs) + r2 = loader.ingest(spectrum_parquet, tmp_path / "b", **kwargs) + + assert r1.exists() and r2.exists() + assert r1.suffix == ".h5" and r2.suffix == ".h5" + with h5py.File(r1, "r") as f1, h5py.File(r2, "r") as f2: + assert f1.attrs["id"] == f2.attrs["id"] + assert "content_hash" in f1.attrs + assert "content_hash" in f2.attrs + np.testing.assert_array_equal(f1["counts"][:], f2["counts"][:]) + + class TestEdgeCases: """Edge cases: missing file, empty table, string source path.""" @@ -481,3 +505,23 @@ def test_module_docstring_mentions_pyarrow(self): "pyarrow" in (mod.__doc__ or "").lower() or "parquet" in (mod.__doc__ or "").lower() ) + + +class TestFd5Validate: + """Smoke test: fd5.schema.validate() on ParquetLoader output.""" + + def test_spectrum_passes_validate( + self, loader: ParquetLoader, spectrum_parquet: Path, tmp_path: Path + ): + from fd5.schema import validate + + result = loader.ingest( + spectrum_parquet, + tmp_path / "out", + product="spectrum", + name="Validate spectrum", + description="Validate smoke test", + timestamp="2026-02-25T12:00:00+00:00", + ) + errors = validate(result) + assert errors == [], [e.message for e in errors] diff --git a/tests/test_ingest_raw.py b/tests/test_ingest_raw.py index 7feedbc..58de43d 100644 --- a/tests/test_ingest_raw.py +++ b/tests/test_ingest_raw.py @@ -282,6 +282,58 @@ def test_shape_mismatch_raises(self, tmp_path: Path): ) +class TestIdempotency: + """Calling ingest twice with identical inputs produces two valid, independently sealed files.""" + + def test_ingest_array_deterministic(self, tmp_path: Path): + from fd5.ingest.raw import ingest_array + + kwargs = dict( + product="recon", + name="idem-recon", + description="Idempotency test", + timestamp="2025-01-01T00:00:00+00:00", + ) + r1 = ingest_array(_recon_data(), tmp_path / "a", **kwargs) + r2 = ingest_array(_recon_data(), tmp_path / "b", **kwargs) + + assert r1.exists() and r2.exists() + assert r1.suffix == ".h5" and r2.suffix == ".h5" + with h5py.File(r1, "r") as f1, h5py.File(r2, "r") as f2: + assert f1.attrs["id"] == f2.attrs["id"] + assert f1.attrs["content_hash"] == f2.attrs["content_hash"] + + def test_ingest_binary_produces_two_valid_sealed_files(self, tmp_path: Path): + from fd5.ingest.raw import ingest_binary + + shape = (4, 8, 8) + arr = np.ones(shape, dtype=np.float32) + bin_path = tmp_path / "data.bin" + arr.tofile(bin_path) + + common = dict( + dtype="float32", + shape=shape, + product="recon", + name="idem-binary", + description="Idempotency test", + timestamp="2025-01-01T00:00:00+00:00", + affine=np.eye(4, dtype=np.float64), + dimension_order="ZYX", + reference_frame="LPS", + ) + r1 = ingest_binary(bin_path, tmp_path / "a", **common) + r2 = ingest_binary(bin_path, tmp_path / "b", **common) + + assert r1.exists() and r2.exists() + assert r1.suffix == ".h5" and r2.suffix == ".h5" + with h5py.File(r1, "r") as f1, h5py.File(r2, "r") as f2: + assert f1.attrs["id"] == f2.attrs["id"] + assert "content_hash" in f1.attrs + assert "content_hash" in f2.attrs + np.testing.assert_array_equal(f1["volume"][:], f2["volume"][:]) + + class TestRawLoader: """Tests for RawLoader protocol conformance.""" @@ -326,3 +378,48 @@ def test_ingest_produces_file(self, tmp_path: Path): assert result.exists() with h5py.File(result, "r") as f: assert f.attrs["product"] == "recon" + + +class TestFd5Validate: + """Smoke tests: fd5.schema.validate() on sealed output.""" + + def test_ingest_array_passes_validate(self, tmp_path: Path): + from fd5.ingest.raw import ingest_array + from fd5.schema import validate + + result = ingest_array( + _recon_data(), + tmp_path, + product="recon", + name="validate-array", + description="Validate smoke test", + timestamp="2025-01-01T00:00:00+00:00", + ) + errors = validate(result) + assert errors == [], [e.message for e in errors] + + def test_ingest_binary_passes_validate(self, tmp_path: Path): + from fd5.ingest.raw import ingest_binary + from fd5.schema import validate + + shape = (8, 16, 16) + arr = np.random.default_rng(99).random(shape, dtype=np.float32) + bin_path = tmp_path / "volume.bin" + arr.tofile(bin_path) + + out_dir = tmp_path / "output" + result = ingest_binary( + bin_path, + out_dir, + dtype="float32", + shape=shape, + product="recon", + name="validate-binary", + description="Validate smoke test", + timestamp="2025-01-01T00:00:00+00:00", + affine=np.eye(4, dtype=np.float64), + dimension_order="ZYX", + reference_frame="LPS", + ) + errors = validate(result) + assert errors == [], [e.message for e in errors]