diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e3432b..27c1a6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Cross-language conformance test suite** ([#155](https://github.com/vig-os/fd5/issues/155)) + - 6 canonical fixture generators: minimal, sealed, with-provenance, multiscale, tabular, complex-metadata + - 3 invalid fixture generators: missing-id, bad-hash, no-schema + - Expected-result JSON files defining the format contract for any language binding + - 39 pytest conformance tests covering structure, hash verification, provenance, multiscale, tabular, metadata, schema validation, and negative tests + - README documenting how to use the suite and add new cases + - **Preflight feedback and status dashboard for devc-remote** ([#149](https://github.com/vig-os/fd5/issues/149)) - Each preflight check now prints a success/warning/error status line as it completes - New checks: container-already-running, runtime version, compose version, SSH agent forwarding diff --git a/tests/conformance/README.md b/tests/conformance/README.md new file mode 100644 index 0000000..881dd0b --- /dev/null +++ b/tests/conformance/README.md @@ -0,0 +1,70 @@ +# Cross-Language Conformance Test Suite + +Language-agnostic test suite for the fd5 format. Any fd5 implementation +(Python, Rust, Julia, C/C++, TypeScript) must pass these tests to prove +format conformance. + +## Structure + +``` +tests/conformance/ +├── README.md # This file +├── generate_fixtures.py # Regenerates .fd5 fixture files +├── test_conformance.py # Python conformance runner +├── fixtures/ # Generated .fd5 files (not checked in) +├── expected/ # Expected-result JSON (checked in) +│ ├── minimal.json +│ ├── with-provenance.json +│ ├── multiscale.json +│ ├── tabular.json +│ ├── complex-metadata.json +│ └── sealed.json +└── invalid/ # Invalid .fd5 files + expected errors + └── expected-errors.json +``` + +## How It Works + +1. `generate_fixtures.py` uses the Python reference implementation to create + canonical `.fd5` fixture files in `fixtures/` and invalid files in `invalid/`. +2. Each fixture has a corresponding JSON file in `expected/` that defines the + expected root attributes, dataset shapes, dtypes, group hierarchy, etc. +3. A conformance runner opens each fixture with the language's own reader, + extracts values, and asserts equality against the expected JSON. + +## Running (Python) + +```bash +uv run pytest tests/conformance/ -v +``` + +Fixtures are auto-generated by a pytest session-scoped fixture before tests run. + +## Adding a New Conformance Case + +1. Add a generator function in `generate_fixtures.py`. +2. Create a corresponding `expected/.json` with the expected structure. +3. Add test functions in `test_conformance.py` (or the equivalent in your language). +4. Run the suite to verify. + +## Test Categories + +| Category | What it tests | +|-----------------------|--------------------------------------------------------| +| Structure | Correct group hierarchy, required attributes present | +| Data round-trip | Write values, read back, compare dtype/shape/values | +| Hash verification | Sealed files verify; tampered files fail | +| Provenance | DAG traversal returns expected source chain | +| Schema validation | Embedded schema validates the file's own structure | +| Negative tests | Invalid files are rejected with appropriate errors | + +## For Other Languages + +To implement the conformance suite in a new language: + +1. Generate fixtures using the Python script (or use pre-generated ones from CI). +2. Load each `.fd5` file with your HDF5 library. +3. Parse the corresponding `expected/*.json`. +4. Assert that the extracted values match the expected JSON. + +This is a black-box test -- it tests the format contract, not internal APIs. diff --git a/tests/conformance/__init__.py b/tests/conformance/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conformance/expected/complex-metadata.json b/tests/conformance/expected/complex-metadata.json new file mode 100644 index 0000000..f4fced2 --- /dev/null +++ b/tests/conformance/expected/complex-metadata.json @@ -0,0 +1,46 @@ +{ + "description": "Deeply nested metadata groups — metadata tree tests", + "root_attrs": { + "product": "test/conformance", + "name": "complex-metadata-conformance", + "description": "Complex metadata conformance fixture", + "timestamp": "2026-01-01T00:00:00Z", + "_schema_version": 1 + }, + "root_attrs_prefixed": { + "id": "sha256:", + "content_hash": "sha256:" + }, + "datasets": [ + { + "path": "/volume", + "shape": [4, 4], + "dtype": "float32" + } + ], + "groups": [ + "/", + "/metadata", + "/metadata/acquisition", + "/metadata/reconstruction", + "/metadata/reconstruction/parameters" + ], + "verify": true, + "metadata_tree": { + "metadata": { + "version": 2, + "acquisition": { + "modality": "PET", + "duration_sec": 300.0, + "isotope": "F-18" + }, + "reconstruction": { + "algorithm": "osem", + "parameters": { + "iterations": 4, + "subsets": 21 + } + } + } + } +} diff --git a/tests/conformance/expected/minimal.json b/tests/conformance/expected/minimal.json new file mode 100644 index 0000000..f94eb6f --- /dev/null +++ b/tests/conformance/expected/minimal.json @@ -0,0 +1,26 @@ +{ + "description": "Smallest valid fd5 file — structure tests", + "root_attrs": { + "product": "test/conformance", + "name": "minimal-conformance", + "description": "Minimal conformance fixture", + "timestamp": "2026-01-01T00:00:00Z", + "_schema_version": 1 + }, + "root_attrs_prefixed": { + "id": "sha256:", + "content_hash": "sha256:" + }, + "datasets": [ + { + "path": "/volume", + "shape": [4, 4], + "dtype": "float32" + } + ], + "groups": [ + "/" + ], + "verify": true, + "schema_valid": true +} diff --git a/tests/conformance/expected/multiscale.json b/tests/conformance/expected/multiscale.json new file mode 100644 index 0000000..0d31c48 --- /dev/null +++ b/tests/conformance/expected/multiscale.json @@ -0,0 +1,45 @@ +{ + "description": "File with pyramid/multiscale datasets — multiscale tests", + "root_attrs": { + "product": "recon", + "name": "multiscale-conformance", + "description": "Multiscale conformance fixture", + "timestamp": "2026-01-01T00:00:00Z", + "_schema_version": 1 + }, + "root_attrs_prefixed": { + "id": "sha256:", + "content_hash": "sha256:" + }, + "groups": [ + "/", + "/pyramid", + "/pyramid/level_1", + "/pyramid/level_2" + ], + "pyramid": { + "n_levels": 2, + "scale_factors": [2, 4], + "level_shapes": { + "level_1": [4, 4, 4], + "level_2": [2, 2, 2] + } + }, + "datasets": [ + { + "path": "/volume", + "shape": [8, 8, 8], + "dtype": "float32" + }, + { + "path": "/mip_coronal", + "dtype": "float32" + }, + { + "path": "/mip_sagittal", + "dtype": "float32" + } + ], + "verify": true, + "schema_valid": true +} diff --git a/tests/conformance/expected/sealed.json b/tests/conformance/expected/sealed.json new file mode 100644 index 0000000..258d365 --- /dev/null +++ b/tests/conformance/expected/sealed.json @@ -0,0 +1,28 @@ +{ + "description": "File with verified content hash — hash verification tests", + "root_attrs": { + "product": "test/conformance", + "name": "sealed-conformance", + "description": "Sealed conformance fixture", + "timestamp": "2026-01-01T00:00:00Z", + "_schema_version": 1 + }, + "root_attrs_prefixed": { + "id": "sha256:", + "content_hash": "sha256:" + }, + "datasets": [ + { + "path": "/volume", + "shape": [8, 8], + "dtype": "float32" + } + ], + "verify": true, + "schema_valid": true, + "hash_verification": { + "intact_verifies": true, + "tampered_attr_fails": true, + "tampered_data_fails": true + } +} diff --git a/tests/conformance/expected/tabular.json b/tests/conformance/expected/tabular.json new file mode 100644 index 0000000..6061341 --- /dev/null +++ b/tests/conformance/expected/tabular.json @@ -0,0 +1,36 @@ +{ + "description": "Compound dataset (event table) — tabular data tests", + "root_attrs": { + "product": "test/conformance", + "name": "tabular-conformance", + "description": "Tabular conformance fixture", + "timestamp": "2026-01-01T00:00:00Z", + "_schema_version": 1 + }, + "root_attrs_prefixed": { + "id": "sha256:", + "content_hash": "sha256:" + }, + "datasets": [ + { + "path": "/volume", + "shape": [4, 4], + "dtype": "float32" + }, + { + "path": "/events", + "shape": [5], + "columns": ["time", "energy", "detector_id"] + } + ], + "verify": true, + "tabular": { + "row_count": 5, + "column_names": ["time", "energy", "detector_id"], + "column_dtypes": { + "time": "float64", + "energy": "float32", + "detector_id": "int32" + } + } +} diff --git a/tests/conformance/expected/with-provenance.json b/tests/conformance/expected/with-provenance.json new file mode 100644 index 0000000..514319a --- /dev/null +++ b/tests/conformance/expected/with-provenance.json @@ -0,0 +1,46 @@ +{ + "description": "File with source links — provenance tests", + "root_attrs": { + "product": "test/conformance", + "name": "provenance-conformance", + "description": "Provenance conformance fixture", + "timestamp": "2026-01-01T00:00:00Z", + "_schema_version": 1 + }, + "root_attrs_prefixed": { + "id": "sha256:", + "content_hash": "sha256:" + }, + "datasets": [ + { + "path": "/volume", + "shape": [4, 4], + "dtype": "float32" + } + ], + "groups": [ + "/", + "/sources", + "/sources/upstream", + "/provenance", + "/provenance/ingest" + ], + "verify": false, + "provenance": { + "sources": [ + { + "name": "upstream", + "id": "sha256:aaa111", + "product": "raw", + "role": "input_data", + "description": "Upstream raw data" + } + ], + "has_original_files": true, + "original_files_count": 1, + "ingest": { + "tool": "conformance_generator", + "tool_version": "1.0.0" + } + } +} diff --git a/tests/conformance/fixtures/.gitignore b/tests/conformance/fixtures/.gitignore new file mode 100644 index 0000000..d8d9d7c --- /dev/null +++ b/tests/conformance/fixtures/.gitignore @@ -0,0 +1,2 @@ +*.fd5 +*.h5 diff --git a/tests/conformance/generate_fixtures.py b/tests/conformance/generate_fixtures.py new file mode 100644 index 0000000..38efab8 --- /dev/null +++ b/tests/conformance/generate_fixtures.py @@ -0,0 +1,328 @@ +"""Generate canonical fd5 fixture files for cross-language conformance testing. + +Run via pytest (session-scoped autouse fixture) or standalone: + uv run python -m tests.conformance.generate_fixtures +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import h5py +import numpy as np + +from fd5.create import create +from fd5.hash import compute_content_hash +from fd5.registry import register_schema +from fd5.schema import embed_schema + +TIMESTAMP = "2026-01-01T00:00:00Z" + + +class _ConformanceSchema: + """Minimal product schema for conformance testing.""" + + product_type: str = "test/conformance" + schema_version: str = "1.0.0" + + def json_schema(self) -> dict[str, Any]: + return { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "properties": { + "_schema_version": {"type": "integer"}, + "product": {"type": "string", "const": "test/conformance"}, + "name": {"type": "string"}, + "description": {"type": "string"}, + "timestamp": {"type": "string"}, + }, + "required": ["_schema_version", "product", "name"], + } + + def required_root_attrs(self) -> dict[str, Any]: + return {"product": "test/conformance"} + + def write(self, target: Any, data: Any) -> None: + target.create_dataset("volume", data=data) + + def id_inputs(self) -> list[str]: + return ["product", "name", "timestamp"] + + +def _register_schemas() -> None: + import fd5.registry as reg + + reg._ensure_loaded() + register_schema("test/conformance", _ConformanceSchema()) + + +def _unregister_schemas() -> None: + import fd5.registry as reg + + reg._registry.pop("test/conformance", None) + + +def _create_minimal(fixtures_dir: Path) -> Path: + """Smallest valid fd5 file.""" + data = np.zeros((4, 4), dtype=np.float32) + with create( + fixtures_dir, + product="test/conformance", + name="minimal-conformance", + description="Minimal conformance fixture", + timestamp=TIMESTAMP, + ) as builder: + builder.write_product(data) + + return _find_and_rename(fixtures_dir, "minimal.fd5") + + +def _create_sealed(fixtures_dir: Path) -> Path: + """File with verified content hash for hash verification tests.""" + data = np.arange(64, dtype=np.float32).reshape(8, 8) + with create( + fixtures_dir, + product="test/conformance", + name="sealed-conformance", + description="Sealed conformance fixture", + timestamp=TIMESTAMP, + ) as builder: + builder.write_product(data) + + return _find_and_rename(fixtures_dir, "sealed.fd5") + + +def _create_with_provenance(fixtures_dir: Path) -> Path: + """File with source links and provenance data.""" + data = np.zeros((4, 4), dtype=np.float32) + with create( + fixtures_dir, + product="test/conformance", + name="provenance-conformance", + description="Provenance conformance fixture", + timestamp=TIMESTAMP, + ) as builder: + builder.write_product(data) + builder.write_sources( + [ + { + "name": "upstream", + "id": "sha256:aaa111", + "product": "raw", + "file": "upstream.h5", + "content_hash": "sha256:bbb222", + "role": "input_data", + "description": "Upstream raw data", + } + ] + ) + builder.write_provenance( + original_files=[ + { + "path": "/data/raw/scan.dcm", + "sha256": "sha256:ccc333", + "size_bytes": 4096, + } + ], + ingest_tool="conformance_generator", + ingest_version="1.0.0", + ingest_timestamp=TIMESTAMP, + ) + + return _find_and_rename(fixtures_dir, "with-provenance.fd5") + + +def _create_multiscale(fixtures_dir: Path) -> Path: + """File with pyramid/multiscale datasets using recon schema.""" + rng = np.random.default_rng(42) + volume = rng.standard_normal((8, 8, 8)).astype(np.float32) + + with create( + fixtures_dir, + product="recon", + name="multiscale-conformance", + description="Multiscale conformance fixture", + timestamp=TIMESTAMP, + ) as builder: + builder.write_product( + { + "volume": volume, + "affine": np.eye(4, dtype=np.float64), + "dimension_order": "ZYX", + "reference_frame": "LPS", + "description": "Test volume for multiscale conformance", + "pyramid": { + "scale_factors": [2, 4], + "method": "stride", + }, + } + ) + builder.file.attrs["scanner"] = "test-scanner" + builder.file.attrs["vendor_series_id"] = "test-series-001" + + return _find_and_rename(fixtures_dir, "multiscale.fd5") + + +def _create_tabular(fixtures_dir: Path) -> Path: + """Compound dataset (event table) with typed columns.""" + volume_data = np.zeros((4, 4), dtype=np.float32) + + dt = np.dtype( + [ + ("time", np.float64), + ("energy", np.float32), + ("detector_id", np.int32), + ] + ) + events = np.array( + [ + (0.0, 511.0, 1), + (0.1, 510.5, 2), + (0.2, 511.2, 1), + (0.3, 509.8, 3), + (0.4, 511.0, 2), + ], + dtype=dt, + ) + + with create( + fixtures_dir, + product="test/conformance", + name="tabular-conformance", + description="Tabular conformance fixture", + timestamp=TIMESTAMP, + ) as builder: + builder.write_product(volume_data) + builder.file.create_dataset("events", data=events) + + return _find_and_rename(fixtures_dir, "tabular.fd5") + + +def _create_complex_metadata(fixtures_dir: Path) -> Path: + """Deeply nested metadata groups.""" + volume_data = np.zeros((4, 4), dtype=np.float32) + + with create( + fixtures_dir, + product="test/conformance", + name="complex-metadata-conformance", + description="Complex metadata conformance fixture", + timestamp=TIMESTAMP, + ) as builder: + builder.write_product(volume_data) + builder.write_metadata( + { + "version": 2, + "acquisition": { + "modality": "PET", + "duration_sec": 300.0, + "isotope": "F-18", + }, + "reconstruction": { + "algorithm": "osem", + "parameters": { + "iterations": 4, + "subsets": 21, + }, + }, + } + ) + + return _find_and_rename(fixtures_dir, "complex-metadata.fd5") + + +def _create_invalid_missing_id(invalid_dir: Path) -> None: + """File missing required root 'id' attribute.""" + path = invalid_dir / "missing-id.fd5" + with h5py.File(path, "w") as f: + f.attrs["product"] = "test/conformance" + f.attrs["name"] = "missing-id" + f.attrs["description"] = "Missing id attribute" + f.attrs["timestamp"] = TIMESTAMP + f.attrs["_schema_version"] = np.int64(1) + f.create_dataset("volume", data=np.zeros((4, 4), dtype=np.float32)) + schema_dict = _ConformanceSchema().json_schema() + embed_schema(f, schema_dict) + f.attrs["content_hash"] = compute_content_hash(f) + + +def _create_invalid_bad_hash(invalid_dir: Path) -> None: + """File whose content_hash doesn't match actual content.""" + path = invalid_dir / "bad-hash.fd5" + with h5py.File(path, "w") as f: + f.attrs["product"] = "test/conformance" + f.attrs["name"] = "bad-hash" + f.attrs["description"] = "Bad hash fixture" + f.attrs["timestamp"] = TIMESTAMP + f.attrs["_schema_version"] = np.int64(1) + f.attrs["id"] = "sha256:fake_id_not_real" + f.create_dataset("volume", data=np.zeros((4, 4), dtype=np.float32)) + schema_dict = _ConformanceSchema().json_schema() + embed_schema(f, schema_dict) + f.attrs["content_hash"] = ( + "sha256:0000000000000000000000000000000000000000000000000000000000000000" + ) + + +def _create_invalid_no_schema(invalid_dir: Path) -> None: + """File missing the _schema attribute.""" + path = invalid_dir / "no-schema.fd5" + with h5py.File(path, "w") as f: + f.attrs["product"] = "test/conformance" + f.attrs["name"] = "no-schema" + f.attrs["description"] = "No schema fixture" + f.attrs["timestamp"] = TIMESTAMP + f.attrs["_schema_version"] = np.int64(1) + f.attrs["id"] = "sha256:fake_id_not_real" + f.create_dataset("volume", data=np.zeros((4, 4), dtype=np.float32)) + f.attrs["content_hash"] = compute_content_hash(f) + + +def _find_and_rename(directory: Path, target_name: str) -> Path: + """Find the single .h5 file created by fd5.create() and rename it.""" + h5_files = list(directory.glob("*.h5")) + unnamed = [f for f in h5_files if not f.stem.endswith(".fd5")] + if not unnamed: + unnamed = h5_files + newest = max(unnamed, key=lambda f: f.stat().st_mtime) + target = directory / target_name + if target.exists(): + target.unlink() + newest.rename(target) + return target + + +def generate_all(fixtures_dir: Path, invalid_dir: Path) -> None: + """Generate all conformance fixture files.""" + _register_schemas() + + fixtures_dir.mkdir(parents=True, exist_ok=True) + invalid_dir.mkdir(parents=True, exist_ok=True) + + for existing in fixtures_dir.glob("*.fd5"): + existing.unlink() + for existing in fixtures_dir.glob("*.h5"): + existing.unlink() + for existing in invalid_dir.glob("*.fd5"): + existing.unlink() + + try: + _create_minimal(fixtures_dir) + _create_sealed(fixtures_dir) + _create_with_provenance(fixtures_dir) + _create_multiscale(fixtures_dir) + _create_tabular(fixtures_dir) + _create_complex_metadata(fixtures_dir) + + _create_invalid_missing_id(invalid_dir) + _create_invalid_bad_hash(invalid_dir) + _create_invalid_no_schema(invalid_dir) + finally: + _unregister_schemas() + + +if __name__ == "__main__": + conformance_dir = Path(__file__).parent + generate_all(conformance_dir / "fixtures", conformance_dir / "invalid") + print("All conformance fixtures generated.") diff --git a/tests/conformance/invalid/.gitignore b/tests/conformance/invalid/.gitignore new file mode 100644 index 0000000..d8d9d7c --- /dev/null +++ b/tests/conformance/invalid/.gitignore @@ -0,0 +1,2 @@ +*.fd5 +*.h5 diff --git a/tests/conformance/invalid/expected-errors.json b/tests/conformance/invalid/expected-errors.json new file mode 100644 index 0000000..efd2835 --- /dev/null +++ b/tests/conformance/invalid/expected-errors.json @@ -0,0 +1,16 @@ +{ + "missing-id.fd5": { + "description": "Missing required root 'id' attribute", + "error_type": "KeyError", + "error_pattern": "id" + }, + "bad-hash.fd5": { + "description": "Content hash does not match actual content", + "verify_returns": false + }, + "no-schema.fd5": { + "description": "Missing _schema attribute", + "error_type": "KeyError", + "error_pattern": "_schema" + } +} diff --git a/tests/conformance/test_conformance.py b/tests/conformance/test_conformance.py new file mode 100644 index 0000000..8ce1bd9 --- /dev/null +++ b/tests/conformance/test_conformance.py @@ -0,0 +1,406 @@ +"""Cross-language conformance tests for the fd5 format. + +Validates that the Python reference implementation produces files matching +the canonical expected-result JSON files. Any fd5 implementation must pass +equivalent tests to prove format conformance. + +See tests/conformance/README.md for details. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import h5py +import numpy as np +import pytest + +from fd5.hash import verify +from fd5.schema import validate + +CONFORMANCE_DIR = Path(__file__).parent +FIXTURES_DIR = CONFORMANCE_DIR / "fixtures" +EXPECTED_DIR = CONFORMANCE_DIR / "expected" +INVALID_DIR = CONFORMANCE_DIR / "invalid" + + +def _load_expected(name: str) -> dict: + path = EXPECTED_DIR / f"{name}.json" + return json.loads(path.read_text()) + + +def _fixture_path(name: str) -> Path: + return FIXTURES_DIR / f"{name}.fd5" + + +@pytest.fixture(scope="session", autouse=True) +def _generate_fixtures(): + """Generate all fixture files before any conformance test runs.""" + from tests.conformance.generate_fixtures import generate_all + + generate_all(FIXTURES_DIR, INVALID_DIR) + + from tests.conformance.generate_fixtures import _ConformanceSchema + + from fd5.registry import register_schema + + register_schema("test/conformance", _ConformanceSchema()) + + +# --------------------------------------------------------------------------- +# Structure tests — minimal fixture +# --------------------------------------------------------------------------- + + +class TestStructure: + """Correct group hierarchy and required attributes present.""" + + def test_root_attrs_match(self): + expected = _load_expected("minimal") + path = _fixture_path("minimal") + with h5py.File(path, "r") as f: + for key, value in expected["root_attrs"].items(): + actual = f.attrs[key] + if isinstance(actual, bytes): + actual = actual.decode("utf-8") + if isinstance(actual, np.integer): + actual = int(actual) + assert actual == value, f"Attr {key!r}: {actual!r} != {value!r}" + + def test_root_attrs_prefixed(self): + expected = _load_expected("minimal") + path = _fixture_path("minimal") + with h5py.File(path, "r") as f: + for key, prefix in expected["root_attrs_prefixed"].items(): + actual = f.attrs[key] + if isinstance(actual, bytes): + actual = actual.decode("utf-8") + assert actual.startswith(prefix), ( + f"Attr {key!r} should start with {prefix!r}, got {actual!r}" + ) + + def test_datasets_present(self): + expected = _load_expected("minimal") + path = _fixture_path("minimal") + with h5py.File(path, "r") as f: + for ds_spec in expected["datasets"]: + ds = f[ds_spec["path"]] + assert isinstance(ds, h5py.Dataset) + assert list(ds.shape) == ds_spec["shape"] + assert ds.dtype == np.dtype(ds_spec["dtype"]) + + def test_groups_present(self): + expected = _load_expected("minimal") + path = _fixture_path("minimal") + with h5py.File(path, "r") as f: + for grp_path in expected["groups"]: + assert grp_path in f or grp_path == "/" + + def test_verify_true(self): + expected = _load_expected("minimal") + path = _fixture_path("minimal") + assert verify(path) is expected["verify"] + + def test_schema_valid(self): + expected = _load_expected("minimal") + path = _fixture_path("minimal") + if expected.get("schema_valid"): + errors = validate(path) + assert errors == [], [e.message for e in errors] + + +# --------------------------------------------------------------------------- +# Hash verification tests — sealed fixture +# --------------------------------------------------------------------------- + + +class TestHashVerification: + """Sealed files verify correctly, tampered files fail.""" + + def test_intact_verifies(self): + path = _fixture_path("sealed") + assert verify(path) is True + + def test_tampered_attr_fails(self, tmp_path): + import shutil + + src = _fixture_path("sealed") + tampered = tmp_path / "tampered_attr.fd5" + shutil.copy2(src, tampered) + + with h5py.File(tampered, "a") as f: + f.attrs["name"] = "tampered-value" + + assert verify(tampered) is False + + def test_tampered_data_fails(self, tmp_path): + import shutil + + src = _fixture_path("sealed") + tampered = tmp_path / "tampered_data.fd5" + shutil.copy2(src, tampered) + + with h5py.File(tampered, "a") as f: + ds = f["volume"] + ds[0, 0] = 999.0 + + assert verify(tampered) is False + + def test_content_hash_format(self): + path = _fixture_path("sealed") + with h5py.File(path, "r") as f: + ch = f.attrs["content_hash"] + if isinstance(ch, bytes): + ch = ch.decode("utf-8") + assert ch.startswith("sha256:") + assert len(ch) == len("sha256:") + 64 + + +# --------------------------------------------------------------------------- +# Provenance tests — with-provenance fixture +# --------------------------------------------------------------------------- + + +class TestProvenance: + """DAG traversal returns expected source chain.""" + + def test_sources_group_exists(self): + path = _fixture_path("with-provenance") + with h5py.File(path, "r") as f: + assert "sources" in f + + def test_source_attrs(self): + expected = _load_expected("with-provenance") + path = _fixture_path("with-provenance") + with h5py.File(path, "r") as f: + for src_spec in expected["provenance"]["sources"]: + name = src_spec["name"] + grp = f[f"sources/{name}"] + assert grp.attrs["id"] == src_spec["id"] + assert grp.attrs["product"] == src_spec["product"] + assert grp.attrs["role"] == src_spec["role"] + assert grp.attrs["description"] == src_spec["description"] + + def test_source_has_external_link(self): + expected = _load_expected("with-provenance") + path = _fixture_path("with-provenance") + with h5py.File(path, "r") as f: + for src_spec in expected["provenance"]["sources"]: + name = src_spec["name"] + link = f[f"sources/{name}"].get("link", getlink=True) + assert isinstance(link, h5py.ExternalLink) + + def test_original_files_exist(self): + expected = _load_expected("with-provenance") + path = _fixture_path("with-provenance") + with h5py.File(path, "r") as f: + assert "provenance" in f + if expected["provenance"]["has_original_files"]: + assert "original_files" in f["provenance"] + ds = f["provenance/original_files"] + assert len(ds) == expected["provenance"]["original_files_count"] + + def test_ingest_attrs(self): + expected = _load_expected("with-provenance") + path = _fixture_path("with-provenance") + with h5py.File(path, "r") as f: + ingest = f["provenance/ingest"] + ingest_spec = expected["provenance"]["ingest"] + assert ingest.attrs["tool"] == ingest_spec["tool"] + assert ingest.attrs["tool_version"] == ingest_spec["tool_version"] + + def test_groups_present(self): + expected = _load_expected("with-provenance") + path = _fixture_path("with-provenance") + with h5py.File(path, "r") as f: + for grp_path in expected["groups"]: + if grp_path == "/": + continue + assert grp_path in f, f"Missing group {grp_path!r}" + + def test_verify_matches_expected(self): + expected = _load_expected("with-provenance") + path = _fixture_path("with-provenance") + assert verify(path) is expected["verify"] + + +# --------------------------------------------------------------------------- +# Multiscale tests — multiscale fixture +# --------------------------------------------------------------------------- + + +class TestMultiscale: + """Pyramid levels and shapes match expected.""" + + def test_pyramid_group_exists(self): + path = _fixture_path("multiscale") + with h5py.File(path, "r") as f: + assert "pyramid" in f + + def test_pyramid_attrs(self): + expected = _load_expected("multiscale") + path = _fixture_path("multiscale") + with h5py.File(path, "r") as f: + pyr = f["pyramid"] + assert int(pyr.attrs["n_levels"]) == expected["pyramid"]["n_levels"] + actual_factors = list(pyr.attrs["scale_factors"]) + assert actual_factors == expected["pyramid"]["scale_factors"] + + def test_pyramid_level_shapes(self): + expected = _load_expected("multiscale") + path = _fixture_path("multiscale") + with h5py.File(path, "r") as f: + for level_name, expected_shape in expected["pyramid"][ + "level_shapes" + ].items(): + ds = f[f"pyramid/{level_name}/volume"] + assert list(ds.shape) == expected_shape + + def test_groups_present(self): + expected = _load_expected("multiscale") + path = _fixture_path("multiscale") + with h5py.File(path, "r") as f: + for grp_path in expected["groups"]: + if grp_path == "/": + continue + assert grp_path in f, f"Missing group {grp_path!r}" + + def test_mip_datasets_present(self): + expected = _load_expected("multiscale") + path = _fixture_path("multiscale") + with h5py.File(path, "r") as f: + for ds_spec in expected["datasets"]: + ds = f[ds_spec["path"]] + assert isinstance(ds, h5py.Dataset) + assert ds.dtype == np.dtype(ds_spec["dtype"]) + + def test_verify_true(self): + path = _fixture_path("multiscale") + assert verify(path) is True + + +# --------------------------------------------------------------------------- +# Tabular tests — tabular fixture +# --------------------------------------------------------------------------- + + +class TestTabular: + """Compound dataset with expected columns, dtypes, and row count.""" + + def test_events_dataset_exists(self): + path = _fixture_path("tabular") + with h5py.File(path, "r") as f: + assert "events" in f + + def test_row_count(self): + expected = _load_expected("tabular") + path = _fixture_path("tabular") + with h5py.File(path, "r") as f: + ds = f["events"] + assert len(ds) == expected["tabular"]["row_count"] + + def test_column_names(self): + expected = _load_expected("tabular") + path = _fixture_path("tabular") + with h5py.File(path, "r") as f: + ds = f["events"] + actual_names = list(ds.dtype.names) + assert actual_names == expected["tabular"]["column_names"] + + def test_column_dtypes(self): + expected = _load_expected("tabular") + path = _fixture_path("tabular") + with h5py.File(path, "r") as f: + ds = f["events"] + for col, expected_dtype in expected["tabular"]["column_dtypes"].items(): + actual = ds.dtype[col] + assert actual == np.dtype(expected_dtype), ( + f"Column {col!r}: {actual} != {expected_dtype}" + ) + + def test_verify_true(self): + path = _fixture_path("tabular") + assert verify(path) is True + + +# --------------------------------------------------------------------------- +# Complex metadata tests — complex-metadata fixture +# --------------------------------------------------------------------------- + + +class TestComplexMetadata: + """Deeply nested metadata groups match expected tree.""" + + def test_groups_present(self): + expected = _load_expected("complex-metadata") + path = _fixture_path("complex-metadata") + with h5py.File(path, "r") as f: + for grp_path in expected["groups"]: + if grp_path == "/": + continue + assert grp_path in f, f"Missing group {grp_path!r}" + + def test_metadata_tree(self): + expected = _load_expected("complex-metadata") + path = _fixture_path("complex-metadata") + with h5py.File(path, "r") as f: + from fd5.h5io import h5_to_dict + + actual = h5_to_dict(f["metadata"]) + expected_tree = expected["metadata_tree"]["metadata"] + assert actual == expected_tree + + def test_verify_true(self): + path = _fixture_path("complex-metadata") + assert verify(path) is True + + +# --------------------------------------------------------------------------- +# Schema validation tests — across all valid fixtures +# --------------------------------------------------------------------------- + + +class TestSchemaValidation: + """Embedded schema validates the file's own structure.""" + + @pytest.mark.parametrize( + "fixture_name", + ["minimal", "sealed", "tabular", "complex-metadata"], + ) + def test_schema_validates(self, fixture_name): + expected = _load_expected(fixture_name) + if not expected.get("schema_valid", True): + pytest.skip("Fixture not expected to pass schema validation") + path = _fixture_path(fixture_name) + errors = validate(path) + assert errors == [], [e.message for e in errors] + + +# --------------------------------------------------------------------------- +# Negative tests — invalid fixtures +# --------------------------------------------------------------------------- + + +class TestInvalid: + """Invalid files are rejected with appropriate errors.""" + + def test_missing_id_raises(self): + path = INVALID_DIR / "missing-id.fd5" + with h5py.File(path, "r") as f: + assert "id" not in f.attrs + + def test_bad_hash_fails_verify(self): + path = INVALID_DIR / "bad-hash.fd5" + assert verify(path) is False + + def test_no_schema_raises_on_validate(self): + path = INVALID_DIR / "no-schema.fd5" + with pytest.raises(KeyError, match="_schema"): + validate(path) + + def test_expected_errors_json_matches(self): + """Ensure expected-errors.json covers all invalid fixtures.""" + errors_json = json.loads((INVALID_DIR / "expected-errors.json").read_text()) + for filename in ["missing-id.fd5", "bad-hash.fd5", "no-schema.fd5"]: + assert filename in errors_json, f"Missing entry for {filename}"