Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions src/fd5/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,19 @@
import hashlib
import itertools
import os
import warnings
from collections.abc import Callable
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

import h5py
import jsonschema
import numpy as np

from fd5.h5io import dict_to_h5
from fd5.h5io import _read_attr
from fd5.hash import (
ChunkHasher,
_CHUNK_HASHES_SUFFIX,
Expand Down Expand Up @@ -124,6 +128,21 @@ def _iter_chunks(
hasher.update(arr[slices])


def _safe_h5_to_dict(group: h5py.Group) -> dict[str, Any]:
"""Read HDF5 attrs and groups to a dict, skipping unresolvable items (e.g. external links)."""
result: dict[str, Any] = {}
for key in sorted(group.attrs.keys()):
result[key] = _read_attr(group.attrs[key])
for key in sorted(group.keys()):
try:
item = group[key]
except (KeyError, OSError):
continue
if isinstance(item, h5py.Group):
result[key] = _safe_h5_to_dict(item)
return result


class Fd5Builder:
"""Context-managed builder that orchestrates fd5 file creation.

Expand All @@ -139,6 +158,7 @@ def __init__(
name: str,
description: str,
timestamp: str,
pre_seal_hooks: list[Callable[[h5py.File], None]] | None = None,
) -> None:
self._file = file
self._tmp_path = tmp_path
Expand All @@ -150,6 +170,7 @@ def __init__(
self._schema = get_schema(product_type)
self._data_hash_cache: dict[str, str] = {}
self._chunk_digest_cache: dict[str, list[str]] = {}
self._pre_seal_hooks = pre_seal_hooks or []

@property
def file(self) -> h5py.File:
Expand Down Expand Up @@ -254,6 +275,27 @@ def _validate(self) -> None:
f"Required attribute {attr!r} is missing or empty"
)

# Schema compliance check
schema_dict = self._schema.json_schema()
file_dict = _safe_h5_to_dict(self._file)
try:
jsonschema.validate(instance=file_dict, schema=schema_dict)
except jsonschema.ValidationError as exc:
raise Fd5ValidationError(
f"Schema validation failed: {exc.message}"
) from exc

# Description quality warnings (non-blocking)
from fd5.quality import check_descriptions_file

quality_issues = check_descriptions_file(self._file)
for issue in quality_issues:
warnings.warn(issue, stacklevel=2)

# Product-specific validation
if hasattr(self._schema, "validate"):
self._schema.validate(self._file)

def _write_chunk_hashes(self) -> None:
"""Store ``_chunk_hashes`` datasets for every inline-hashed dataset."""
dt = h5py.special_dtype(vlen=str)
Expand All @@ -270,6 +312,9 @@ def _write_chunk_hashes(self) -> None:

def _seal(self) -> Path:
"""Embed schema, compute hashes, write id, rename to final path."""
for hook in self._pre_seal_hooks:
hook(self._file)

self._validate()

self._write_chunk_hashes()
Expand Down Expand Up @@ -319,6 +364,7 @@ def create(
description: str,
timestamp: str,
schema_version: int = 1,
pre_seal_hooks: list[Callable[[h5py.File], None]] | None = None,
):
"""Context manager that creates a sealed fd5 file.

Expand Down Expand Up @@ -348,6 +394,7 @@ def create(
name=name,
description=description,
timestamp=timestamp,
pre_seal_hooks=pre_seal_hooks,
)
yield builder
builder._seal()
Expand Down
14 changes: 13 additions & 1 deletion src/fd5/quality.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ class Warning:
severity: str # "error" | "warning"


def check_descriptions_file(f: h5py.File) -> list[str]:
"""Check description quality on an open HDF5 file. Returns list of warning messages."""
warns: list[Warning] = []
_check_root(f, warns)
seen: dict[str, str] = {}
_walk(f, "/", warns, seen)
return [f"{w.path}: {w.message}" for w in warns]


def check_descriptions(path: Path) -> list[Warning]:
"""Validate description attributes in an fd5 HDF5 file.

Expand Down Expand Up @@ -71,7 +80,10 @@ def _walk(
seen: dict[str, str],
) -> None:
for key in sorted(group.keys()):
item = group[key]
try:
item = group[key]
except (KeyError, OSError):
continue # skip external links or broken references
full_path = f"{prefix}{key}" if prefix == "/" else f"{prefix}/{key}"
desc = item.attrs.get("description")

Expand Down
Loading
Loading