diff --git a/src/fd5/create.py b/src/fd5/create.py index 6fb6ed2..8ba24c9 100644 --- a/src/fd5/create.py +++ b/src/fd5/create.py @@ -11,15 +11,19 @@ import hashlib import itertools import os +import warnings +from collections.abc import Callable from contextlib import contextmanager from datetime import datetime, timezone from pathlib import Path from typing import Any import h5py +import jsonschema import numpy as np from fd5.h5io import dict_to_h5 +from fd5.h5io import _read_attr from fd5.hash import ( ChunkHasher, _CHUNK_HASHES_SUFFIX, @@ -124,6 +128,21 @@ def _iter_chunks( hasher.update(arr[slices]) +def _safe_h5_to_dict(group: h5py.Group) -> dict[str, Any]: + """Read HDF5 attrs and groups to a dict, skipping unresolvable items (e.g. external links).""" + result: dict[str, Any] = {} + for key in sorted(group.attrs.keys()): + result[key] = _read_attr(group.attrs[key]) + for key in sorted(group.keys()): + try: + item = group[key] + except (KeyError, OSError): + continue + if isinstance(item, h5py.Group): + result[key] = _safe_h5_to_dict(item) + return result + + class Fd5Builder: """Context-managed builder that orchestrates fd5 file creation. @@ -139,6 +158,7 @@ def __init__( name: str, description: str, timestamp: str, + pre_seal_hooks: list[Callable[[h5py.File], None]] | None = None, ) -> None: self._file = file self._tmp_path = tmp_path @@ -150,6 +170,7 @@ def __init__( self._schema = get_schema(product_type) self._data_hash_cache: dict[str, str] = {} self._chunk_digest_cache: dict[str, list[str]] = {} + self._pre_seal_hooks = pre_seal_hooks or [] @property def file(self) -> h5py.File: @@ -254,6 +275,27 @@ def _validate(self) -> None: f"Required attribute {attr!r} is missing or empty" ) + # Schema compliance check + schema_dict = self._schema.json_schema() + file_dict = _safe_h5_to_dict(self._file) + try: + jsonschema.validate(instance=file_dict, schema=schema_dict) + except jsonschema.ValidationError as exc: + raise Fd5ValidationError( + f"Schema validation failed: {exc.message}" + ) from exc + + # Description quality warnings (non-blocking) + from fd5.quality import check_descriptions_file + + quality_issues = check_descriptions_file(self._file) + for issue in quality_issues: + warnings.warn(issue, stacklevel=2) + + # Product-specific validation + if hasattr(self._schema, "validate"): + self._schema.validate(self._file) + def _write_chunk_hashes(self) -> None: """Store ``_chunk_hashes`` datasets for every inline-hashed dataset.""" dt = h5py.special_dtype(vlen=str) @@ -270,6 +312,9 @@ def _write_chunk_hashes(self) -> None: def _seal(self) -> Path: """Embed schema, compute hashes, write id, rename to final path.""" + for hook in self._pre_seal_hooks: + hook(self._file) + self._validate() self._write_chunk_hashes() @@ -319,6 +364,7 @@ def create( description: str, timestamp: str, schema_version: int = 1, + pre_seal_hooks: list[Callable[[h5py.File], None]] | None = None, ): """Context manager that creates a sealed fd5 file. @@ -348,6 +394,7 @@ def create( name=name, description=description, timestamp=timestamp, + pre_seal_hooks=pre_seal_hooks, ) yield builder builder._seal() diff --git a/src/fd5/quality.py b/src/fd5/quality.py index 9d3f4ee..e40fdc7 100644 --- a/src/fd5/quality.py +++ b/src/fd5/quality.py @@ -33,6 +33,15 @@ class Warning: severity: str # "error" | "warning" +def check_descriptions_file(f: h5py.File) -> list[str]: + """Check description quality on an open HDF5 file. Returns list of warning messages.""" + warns: list[Warning] = [] + _check_root(f, warns) + seen: dict[str, str] = {} + _walk(f, "/", warns, seen) + return [f"{w.path}: {w.message}" for w in warns] + + def check_descriptions(path: Path) -> list[Warning]: """Validate description attributes in an fd5 HDF5 file. @@ -71,7 +80,10 @@ def _walk( seen: dict[str, str], ) -> None: for key in sorted(group.keys()): - item = group[key] + try: + item = group[key] + except (KeyError, OSError): + continue # skip external links or broken references full_path = f"{prefix}{key}" if prefix == "/" else f"{prefix}/{key}" desc = item.attrs.get("description") diff --git a/tests/test_preseal_validation.py b/tests/test_preseal_validation.py new file mode 100644 index 0000000..eb1cd6b --- /dev/null +++ b/tests/test_preseal_validation.py @@ -0,0 +1,298 @@ +"""Tests for pre-seal validation in fd5.create.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import h5py +import numpy as np +import pytest + +from fd5.create import Fd5ValidationError, create +from fd5.registry import register_schema + + +# --------------------------------------------------------------------------- +# Stub schemas +# --------------------------------------------------------------------------- + + +class _ValidatingStubSchema: + """Schema that requires a 'values' dataset in its JSON schema.""" + + product_type: str = "test/validating" + schema_version: str = "1.0.0" + + def json_schema(self) -> dict[str, Any]: + return { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "properties": { + "_schema_version": {"type": "integer"}, + "product": {"type": "string", "const": "test/validating"}, + "name": {"type": "string"}, + "description": {"type": "string"}, + "timestamp": {"type": "string"}, + "values": {"type": "object"}, + }, + "required": ["_schema_version", "product", "name", "values"], + } + + def required_root_attrs(self) -> dict[str, Any]: + return {"product": "test/validating"} + + def write(self, target: Any, data: Any) -> None: + target.create_dataset("volume", data=data) + + def id_inputs(self) -> list[str]: + return ["product", "name", "timestamp"] + + +class _SimpleStubSchema: + """Minimal schema without custom validate -- for baseline tests.""" + + product_type: str = "test/simple" + schema_version: str = "1.0.0" + + def json_schema(self) -> dict[str, Any]: + return { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "properties": { + "_schema_version": {"type": "integer"}, + "product": {"type": "string", "const": "test/simple"}, + "name": {"type": "string"}, + "description": {"type": "string"}, + "timestamp": {"type": "string"}, + }, + "required": ["_schema_version", "product", "name"], + } + + def required_root_attrs(self) -> dict[str, Any]: + return {"product": "test/simple"} + + def write(self, target: Any, data: Any) -> None: + if data is not None: + target.create_dataset("volume", data=data) + + def id_inputs(self) -> list[str]: + return ["product", "name", "timestamp"] + + +class _CustomValidateSchema(_SimpleStubSchema): + """Schema with a custom validate() method.""" + + product_type: str = "test/custom-validate" + call_count: int = 0 + + def json_schema(self) -> dict[str, Any]: + schema = super().json_schema() + schema["properties"]["product"]["const"] = "test/custom-validate" + return schema + + def required_root_attrs(self) -> dict[str, Any]: + return {"product": "test/custom-validate"} + + def validate(self, target: Any) -> None: + self.call_count += 1 + + +class _RejectingValidateSchema(_SimpleStubSchema): + """Schema whose validate() always raises.""" + + product_type: str = "test/rejecting" + + def json_schema(self) -> dict[str, Any]: + schema = super().json_schema() + schema["properties"]["product"]["const"] = "test/rejecting" + return schema + + def required_root_attrs(self) -> dict[str, Any]: + return {"product": "test/rejecting"} + + def validate(self, target: Any) -> None: + raise Fd5ValidationError("Product-specific validation failed") + + +@pytest.fixture(autouse=True) +def _register_stubs(): + import fd5.registry as reg + + register_schema("test/validating", _ValidatingStubSchema()) + register_schema("test/simple", _SimpleStubSchema()) + _custom = _CustomValidateSchema() + register_schema("test/custom-validate", _custom) + register_schema("test/rejecting", _RejectingValidateSchema()) + reg._ep_loaded = True + yield _custom + + +@pytest.fixture() +def out_dir(tmp_path: Path) -> Path: + return tmp_path + + +# --------------------------------------------------------------------------- +# Schema validation +# --------------------------------------------------------------------------- + + +class TestSchemaValidation: + def test_schema_validation_catches_missing_dataset(self, out_dir: Path): + """Schema requires 'values' group but we don't write it.""" + with pytest.raises(Fd5ValidationError, match="Schema validation failed"): + with create( + out_dir, + product="test/validating", + name="sample", + description="A test file for validation", + timestamp="2026-02-25T12:00:00Z", + ): + pass # don't write the required 'values' group + + def test_schema_validation_passes_valid_file(self, out_dir: Path): + """Normal create + seal works without errors.""" + with create( + out_dir, + product="test/simple", + name="sample", + description="A valid test file with enough description", + timestamp="2026-02-25T12:00:00Z", + ) as builder: + builder.write_product(np.zeros((4, 4), dtype=np.float32)) + + finals = list(out_dir.glob("*.h5")) + assert len(finals) == 1 + + +# --------------------------------------------------------------------------- +# Description quality warnings +# --------------------------------------------------------------------------- + + +class TestDescriptionQuality: + def test_description_quality_warns_short(self, out_dir: Path): + """File with a group whose description is short triggers a warning.""" + with pytest.warns(UserWarning, match="Short description"): + with create( + out_dir, + product="test/simple", + name="sample", + description="A sufficiently long description for the root", + timestamp="2026-02-25T12:00:00Z", + ) as builder: + grp = builder.file.create_group("mygroup") + grp.attrs["description"] = "tiny" + + def test_description_quality_does_not_block(self, out_dir: Path): + """File with quality warnings still seals successfully.""" + with pytest.warns(UserWarning, match="Short description"): + with create( + out_dir, + product="test/simple", + name="sample", + description="A sufficiently long description for the root", + timestamp="2026-02-25T12:00:00Z", + ) as builder: + grp = builder.file.create_group("mygroup") + grp.attrs["description"] = "tiny" + + finals = list(out_dir.glob("*.h5")) + assert len(finals) == 1 + + +# --------------------------------------------------------------------------- +# Product-specific validation +# --------------------------------------------------------------------------- + + +class TestProductValidate: + def test_product_validate_method_called(self, out_dir: Path, _register_stubs): + """Schema with custom validate() method is called during seal.""" + schema = _register_stubs + schema.call_count = 0 + with create( + out_dir, + product="test/custom-validate", + name="sample", + description="A test file with custom validation", + timestamp="2026-02-25T12:00:00Z", + ): + pass + assert schema.call_count == 1 + + def test_product_validate_can_reject(self, out_dir: Path): + """Schema validate() raises -> seal aborts.""" + with pytest.raises(Fd5ValidationError, match="Product-specific validation"): + with create( + out_dir, + product="test/rejecting", + name="sample", + description="A test file that will be rejected", + timestamp="2026-02-25T12:00:00Z", + ): + pass + + +# --------------------------------------------------------------------------- +# Pre-seal hooks +# --------------------------------------------------------------------------- + + +class TestPreSealHooks: + def test_pre_seal_hooks_called(self, out_dir: Path): + """Hook is called with the open h5py.File.""" + called_with = [] + + def hook(f: h5py.File) -> None: + called_with.append(f.filename) + + with create( + out_dir, + product="test/simple", + name="sample", + description="A test file with hooks enabled", + timestamp="2026-02-25T12:00:00Z", + pre_seal_hooks=[hook], + ): + pass + + assert len(called_with) == 1 + + def test_pre_seal_hook_can_abort(self, out_dir: Path): + """Hook raises exception -> seal aborts, temp file cleaned up.""" + + def bad_hook(f: h5py.File) -> None: + raise RuntimeError("hook abort") + + with pytest.raises(RuntimeError, match="hook abort"): + with create( + out_dir, + product="test/simple", + name="sample", + description="A test file that hook will abort", + timestamp="2026-02-25T12:00:00Z", + pre_seal_hooks=[bad_hook], + ): + pass + + # Temp file should be cleaned up + h5_files = list(out_dir.glob("*.h5")) + tmp_files = list(out_dir.glob("*.h5.tmp")) + assert len(h5_files) == 0 + assert len(tmp_files) == 0 + + def test_no_hooks_default(self, out_dir: Path): + """Default behavior unchanged -- no hooks, no warnings for good files.""" + with create( + out_dir, + product="test/simple", + name="sample", + description="A sufficiently long description for quality checks", + timestamp="2026-02-25T12:00:00Z", + ) as builder: + builder.write_product(np.zeros((4, 4), dtype=np.float32)) + + finals = list(out_dir.glob("*.h5")) + assert len(finals) == 1