From de37086534aedbd882de579c38e228e52a8ffedf Mon Sep 17 00:00:00 2001 From: gerchowl Date: Mon, 30 Mar 2026 11:11:31 +0200 Subject: [PATCH] feat(generic): add timeseries, tabular, and ndarray product schemas Three domain-agnostic product schemas for non-medical-imaging users: - timeseries: signal arrays + time + sampling_rate + channels + events - tabular: column-per-dataset with column_metadata + row_labels - ndarray: arbitrary N-D array + dimension_order + affine + per-axis metadata Registered via pyproject.toml entry points. Co-Authored-By: Claude Opus 4.6 (1M context) --- pyproject.toml | 3 + src/fd5/generic/__init__.py | 5 + src/fd5/generic/ndarray.py | 101 ++++++++++ src/fd5/generic/tabular.py | 109 +++++++++++ src/fd5/generic/timeseries.py | 150 +++++++++++++++ tests/test_ndarray.py | 345 ++++++++++++++++++++++++++++++++++ tests/test_tabular.py | 319 +++++++++++++++++++++++++++++++ tests/test_timeseries.py | 333 ++++++++++++++++++++++++++++++++ 8 files changed, 1365 insertions(+) create mode 100644 src/fd5/generic/__init__.py create mode 100644 src/fd5/generic/ndarray.py create mode 100644 src/fd5/generic/tabular.py create mode 100644 src/fd5/generic/timeseries.py create mode 100644 tests/test_ndarray.py create mode 100644 tests/test_tabular.py create mode 100644 tests/test_timeseries.py diff --git a/pyproject.toml b/pyproject.toml index c86d026..21f1311 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,6 +61,9 @@ calibration = "fd5.imaging.calibration:CalibrationSchema" spectrum = "fd5.imaging.spectrum:SpectrumSchema" roi = "fd5.imaging.roi:RoiSchema" device_data = "fd5.imaging.device_data:DeviceDataSchema" +timeseries = "fd5.generic.timeseries:TimeseriesSchema" +tabular = "fd5.generic.tabular:TabularSchema" +ndarray = "fd5.generic.ndarray:NdarraySchema" [tool.coverage.run] source = ["src/fd5"] diff --git a/src/fd5/generic/__init__.py b/src/fd5/generic/__init__.py new file mode 100644 index 0000000..3a6e08d --- /dev/null +++ b/src/fd5/generic/__init__.py @@ -0,0 +1,5 @@ +from fd5.generic.ndarray import NdarraySchema +from fd5.generic.tabular import TabularSchema +from fd5.generic.timeseries import TimeseriesSchema + +__all__ = ["NdarraySchema", "TabularSchema", "TimeseriesSchema"] diff --git a/src/fd5/generic/ndarray.py b/src/fd5/generic/ndarray.py new file mode 100644 index 0000000..0136695 --- /dev/null +++ b/src/fd5/generic/ndarray.py @@ -0,0 +1,101 @@ +"""fd5.generic.ndarray — N-dimensional array product schema. + +Implements the ``ndarray`` product schema for arbitrary N-dimensional arrays: +sensor grids, image stacks, simulation output, and similar dense numeric data. +""" + +from __future__ import annotations + +from typing import Any + +import h5py +import numpy as np + +_SCHEMA_VERSION = "1.0.0" + +_GZIP_LEVEL = 4 + +_ID_INPUTS = ["product", "name", "timestamp"] + + +class NdarraySchema: + """Product schema for arbitrary N-dimensional arrays (``ndarray``).""" + + product_type: str = "ndarray" + schema_version: str = _SCHEMA_VERSION + + def json_schema(self) -> dict[str, Any]: + return { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "properties": { + "_schema_version": {"type": "integer"}, + "product": {"type": "string", "const": "ndarray"}, + "name": {"type": "string"}, + "description": {"type": "string"}, + "dimension_order": {"type": "string"}, + "reference_frame": {"type": "string"}, + }, + "required": ["_schema_version", "product", "name", "description"], + } + + def required_root_attrs(self) -> dict[str, Any]: + return {"product": "ndarray"} + + def id_inputs(self) -> list[str]: + return list(_ID_INPUTS) + + def write(self, target: h5py.File | h5py.Group, data: dict[str, Any]) -> None: + """Write N-dimensional array data to *target*. + + *data* must contain: + - ``array``: numpy ndarray (any shape, any numeric dtype) + - ``dimension_order``: str (e.g. "TZYX", "XY", "ChannelHeightWidth") + - ``description``: str + + Optional keys: + - ``affine``: (4,4) float64 array + - ``reference_frame``: str (e.g. "LPS", "RAS") + - ``dimensions``: dict mapping axis_name to + ``{"label": str, "units": str, "spacing": float}`` + """ + arr = np.asarray(data["array"]) + target.create_dataset( + "array", + data=arr, + compression="gzip", + compression_opts=_GZIP_LEVEL, + ) + + target.attrs["dimension_order"] = data["dimension_order"] + + if "reference_frame" in data: + target.attrs["reference_frame"] = data["reference_frame"] + + if "affine" in data: + target.create_dataset( + "affine", + data=np.asarray(data["affine"], dtype=np.float64), + ) + + if "dimensions" in data: + self._write_dimensions(target, data["dimensions"]) + + # ------------------------------------------------------------------ + # Dimensions + # ------------------------------------------------------------------ + + def _write_dimensions( + self, + target: h5py.File | h5py.Group, + dimensions: dict[str, dict[str, Any]], + ) -> None: + dims_grp = target.create_group("dimensions") + for axis_name, dim_info in dimensions.items(): + ax_grp = dims_grp.create_group(axis_name) + if "label" in dim_info: + ax_grp.attrs["label"] = dim_info["label"] + if "units" in dim_info: + ax_grp.attrs["units"] = dim_info["units"] + if "spacing" in dim_info: + ax_grp.attrs["spacing"] = np.float64(dim_info["spacing"]) diff --git a/src/fd5/generic/tabular.py b/src/fd5/generic/tabular.py new file mode 100644 index 0000000..5c2b973 --- /dev/null +++ b/src/fd5/generic/tabular.py @@ -0,0 +1,109 @@ +"""fd5.generic.tabular — Tabular product schema for spreadsheet-like data. + +Implements the ``tabular`` product schema for lab measurements, clinical +records, parameter tables, and similar column-oriented data. +""" + +from __future__ import annotations + +from typing import Any + +import h5py +import numpy as np + +_SCHEMA_VERSION = "1.0.0" + +_GZIP_LEVEL = 4 + +_ID_INPUTS = ["product", "name", "timestamp"] + + +class TabularSchema: + """Product schema for column-oriented tabular data (``tabular``).""" + + product_type: str = "tabular" + schema_version: str = _SCHEMA_VERSION + + def json_schema(self) -> dict[str, Any]: + return { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "properties": { + "_schema_version": {"type": "integer"}, + "product": {"type": "string", "const": "tabular"}, + "name": {"type": "string"}, + "description": {"type": "string"}, + "column_count": {"type": "integer"}, + "row_count": {"type": "integer"}, + }, + "required": ["_schema_version", "product", "name", "description"], + } + + def required_root_attrs(self) -> dict[str, Any]: + return {"product": "tabular"} + + def id_inputs(self) -> list[str]: + return list(_ID_INPUTS) + + def write(self, target: h5py.File | h5py.Group, data: dict[str, Any]) -> None: + """Write tabular data to *target*. + + *data* must contain: + - ``columns``: dict mapping column_name to numpy array (all same length) + - ``description``: str + + Optional keys: + - ``column_metadata``: dict mapping column_name to + ``{"units": str, "description": str}`` + - ``row_labels``: numpy array or list of str (row identifiers) + """ + columns = data["columns"] + column_metadata = data.get("column_metadata", {}) + + row_count = len(next(iter(columns.values()))) + target.attrs["column_count"] = np.int64(len(columns)) + target.attrs["row_count"] = np.int64(row_count) + + self._write_table(target, columns, column_metadata) + + if "row_labels" in data: + self._write_row_labels(target, data["row_labels"]) + + # ------------------------------------------------------------------ + # Table + # ------------------------------------------------------------------ + + def _write_table( + self, + target: h5py.File | h5py.Group, + columns: dict[str, Any], + column_metadata: dict[str, dict[str, str]], + ) -> None: + table_grp = target.create_group("table") + for col_name, arr in columns.items(): + arr = np.asarray(arr) + ds = table_grp.create_dataset( + col_name, + data=arr, + compression="gzip", + compression_opts=_GZIP_LEVEL, + ) + meta = column_metadata.get(col_name, {}) + if "units" in meta: + ds.attrs["units"] = meta["units"] + if "description" in meta: + ds.attrs["description"] = meta["description"] + + # ------------------------------------------------------------------ + # Row labels + # ------------------------------------------------------------------ + + def _write_row_labels( + self, + target: h5py.File | h5py.Group, + row_labels: Any, + ) -> None: + encoded = [s.encode("utf-8") if isinstance(s, str) else s for s in row_labels] + max_len = max(len(b) for b in encoded) if encoded else 1 + labels_arr = np.array(encoded, dtype=f"S{max_len}") + target.create_dataset("row_labels", data=labels_arr) diff --git a/src/fd5/generic/timeseries.py b/src/fd5/generic/timeseries.py new file mode 100644 index 0000000..88c6eec --- /dev/null +++ b/src/fd5/generic/timeseries.py @@ -0,0 +1,150 @@ +"""fd5.generic.timeseries — Timeseries product schema for continuous sensor data. + +Implements the ``timeseries`` product schema for continuous sensor data, +physiological monitoring, IoT streams, and similar time-domain signals. +""" + +from __future__ import annotations + +from typing import Any + +import h5py +import numpy as np + +_SCHEMA_VERSION = "1.0.0" + +_GZIP_LEVEL = 4 + +_ID_INPUTS = ["product", "name", "timestamp"] + + +class TimeseriesSchema: + """Product schema for continuous time-series data (``timeseries``).""" + + product_type: str = "timeseries" + schema_version: str = _SCHEMA_VERSION + + def json_schema(self) -> dict[str, Any]: + return { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "properties": { + "_schema_version": {"type": "integer"}, + "product": {"type": "string", "const": "timeseries"}, + "name": {"type": "string"}, + "description": {"type": "string"}, + "sampling_rate": {"type": "number"}, + "sampling_rate_units": {"type": "string", "const": "Hz"}, + }, + "required": ["_schema_version", "product", "name", "description"], + } + + def required_root_attrs(self) -> dict[str, Any]: + return {"product": "timeseries"} + + def id_inputs(self) -> list[str]: + return list(_ID_INPUTS) + + def write(self, target: h5py.File | h5py.Group, data: dict[str, Any]) -> None: + """Write timeseries data to *target*. + + *data* must contain: + - ``signals``: dict mapping channel_name to numpy 1D float array + - ``time``: numpy 1D float array (timestamps in seconds) + - ``description``: str + + Optional keys: + - ``sampling_rate``: float (Hz) + - ``events``: dict with ``timestamps`` (array) and ``labels`` (list[str]) + - ``metadata``: dict of additional attrs + """ + self._write_signals(target, data["signals"], data.get("description", "")) + self._write_time(target, data["time"]) + + if "sampling_rate" in data: + target.attrs["sampling_rate"] = np.float64(data["sampling_rate"]) + target.attrs["sampling_rate_units"] = "Hz" + + if "events" in data: + self._write_events(target, data["events"]) + + if "metadata" in data: + self._write_metadata(target, data["metadata"]) + + # ------------------------------------------------------------------ + # Signals + # ------------------------------------------------------------------ + + def _write_signals( + self, + target: h5py.File | h5py.Group, + signals: dict[str, Any], + description: str, + ) -> None: + signals_grp = target.create_group("signals") + for name, arr in signals.items(): + ds = signals_grp.create_dataset( + name, + data=np.asarray(arr, dtype=np.float64), + compression="gzip", + compression_opts=_GZIP_LEVEL, + ) + ds.attrs["units"] = "a.u." + ds.attrs["description"] = description + + # ------------------------------------------------------------------ + # Time + # ------------------------------------------------------------------ + + def _write_time( + self, + target: h5py.File | h5py.Group, + time: Any, + ) -> None: + ds = target.create_dataset( + "time", + data=np.asarray(time, dtype=np.float64), + compression="gzip", + compression_opts=_GZIP_LEVEL, + ) + ds.attrs["units"] = "s" + + # ------------------------------------------------------------------ + # Events + # ------------------------------------------------------------------ + + def _write_events( + self, + target: h5py.File | h5py.Group, + events: dict[str, Any], + ) -> None: + events_grp = target.create_group("events") + events_grp.create_dataset( + "timestamps", + data=np.asarray(events["timestamps"], dtype=np.float64), + compression="gzip", + compression_opts=_GZIP_LEVEL, + ) + encoded = [ + s.encode("utf-8") if isinstance(s, str) else s for s in events["labels"] + ] + max_len = max(len(b) for b in encoded) if encoded else 1 + labels_arr = np.array(encoded, dtype=f"S{max_len}") + events_grp.create_dataset("labels", data=labels_arr) + + # ------------------------------------------------------------------ + # Metadata + # ------------------------------------------------------------------ + + def _write_metadata( + self, + target: h5py.File | h5py.Group, + metadata: dict[str, Any], + ) -> None: + for key, value in metadata.items(): + if isinstance(value, float): + target.attrs[key] = np.float64(value) + elif isinstance(value, int): + target.attrs[key] = np.int64(value) + elif isinstance(value, str): + target.attrs[key] = value diff --git a/tests/test_ndarray.py b/tests/test_ndarray.py new file mode 100644 index 0000000..7a86fc6 --- /dev/null +++ b/tests/test_ndarray.py @@ -0,0 +1,345 @@ +"""Tests for fd5.generic.ndarray — NdarraySchema product schema.""" + +from __future__ import annotations + +import h5py +import numpy as np +import pytest + +from fd5.registry import ProductSchema, register_schema + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def schema(): + from fd5.generic.ndarray import NdarraySchema + + return NdarraySchema() + + +@pytest.fixture() +def h5file(tmp_path): + path = tmp_path / "ndarray.h5" + with h5py.File(path, "w") as f: + yield f + + +@pytest.fixture() +def h5path(tmp_path): + return tmp_path / "ndarray.h5" + + +def _minimal_data(): + rng = np.random.default_rng(42) + return { + "array": rng.standard_normal((16, 32, 32)).astype(np.float32), + "dimension_order": "ZYX", + "description": "Test 3D array", + } + + +def _data_with_affine(): + data = _minimal_data() + data["affine"] = np.eye(4, dtype=np.float64) + data["affine"][0, 3] = 10.0 # translation + data["reference_frame"] = "LPS" + return data + + +def _data_with_dimensions(): + data = _minimal_data() + data["dimensions"] = { + "Z": {"label": "slice", "units": "mm", "spacing": 2.0}, + "Y": {"label": "row", "units": "mm", "spacing": 1.0}, + "X": {"label": "column", "units": "mm", "spacing": 1.0}, + } + return data + + +def _2d_data(): + rng = np.random.default_rng(99) + return { + "array": rng.standard_normal((64, 64)).astype(np.float64), + "dimension_order": "YX", + "description": "Test 2D array", + } + + +def _integer_data(): + return { + "array": np.arange(24, dtype=np.int32).reshape(2, 3, 4), + "dimension_order": "CYX", + "description": "Integer 3D array", + } + + +# --------------------------------------------------------------------------- +# Protocol conformance +# --------------------------------------------------------------------------- + + +class TestProtocolConformance: + def test_satisfies_product_schema_protocol(self, schema): + assert isinstance(schema, ProductSchema) + + def test_product_type_is_ndarray(self, schema): + assert schema.product_type == "ndarray" + + def test_schema_version_is_string(self, schema): + assert isinstance(schema.schema_version, str) + + def test_has_required_methods(self, schema): + assert callable(schema.json_schema) + assert callable(schema.required_root_attrs) + assert callable(schema.write) + assert callable(schema.id_inputs) + + +# --------------------------------------------------------------------------- +# json_schema() +# --------------------------------------------------------------------------- + + +class TestJsonSchema: + def test_returns_dict(self, schema): + result = schema.json_schema() + assert isinstance(result, dict) + + def test_has_draft_2020_12_meta(self, schema): + result = schema.json_schema() + assert result["$schema"] == "https://json-schema.org/draft/2020-12/schema" + + def test_product_const_is_ndarray(self, schema): + result = schema.json_schema() + assert result["properties"]["product"]["const"] == "ndarray" + + def test_valid_json_schema(self, schema): + import jsonschema + + result = schema.json_schema() + jsonschema.Draft202012Validator.check_schema(result) + + +# --------------------------------------------------------------------------- +# required_root_attrs() +# --------------------------------------------------------------------------- + + +class TestRequiredRootAttrs: + def test_returns_dict(self, schema): + result = schema.required_root_attrs() + assert isinstance(result, dict) + + def test_contains_product_ndarray(self, schema): + result = schema.required_root_attrs() + assert result["product"] == "ndarray" + + +# --------------------------------------------------------------------------- +# id_inputs() +# --------------------------------------------------------------------------- + + +class TestIdInputs: + def test_returns_list_of_strings(self, schema): + result = schema.id_inputs() + assert isinstance(result, list) + assert all(isinstance(s, str) for s in result) + + def test_contains_expected_keys(self, schema): + result = schema.id_inputs() + assert "product" in result + assert "name" in result + assert "timestamp" in result + + def test_returns_fresh_list(self, schema): + a = schema.id_inputs() + b = schema.id_inputs() + assert a is not b + + def test_deterministic(self, schema): + assert schema.id_inputs() == schema.id_inputs() + + +# --------------------------------------------------------------------------- +# write() — minimal data +# --------------------------------------------------------------------------- + + +class TestWriteMinimal: + def test_writes_array_dataset(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert "array" in h5file + + def test_array_shape(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert h5file["array"].shape == (16, 32, 32) + + def test_array_dtype(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert h5file["array"].dtype == np.float32 + + def test_array_gzip_compressed(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert h5file["array"].compression == "gzip" + assert h5file["array"].compression_opts == 4 + + def test_dimension_order_attr(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert h5file.attrs["dimension_order"] == "ZYX" + + def test_no_reference_frame_when_absent(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert "reference_frame" not in h5file.attrs + + def test_no_affine_when_absent(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert "affine" not in h5file + + def test_no_dimensions_when_absent(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert "dimensions" not in h5file + + def test_roundtrip_array_data(self, schema, h5file): + data = _minimal_data() + schema.write(h5file, data) + np.testing.assert_array_almost_equal(h5file["array"][:], data["array"]) + + +# --------------------------------------------------------------------------- +# write() — with affine and reference_frame +# --------------------------------------------------------------------------- + + +class TestWriteAffine: + def test_affine_dataset_created(self, schema, h5file): + schema.write(h5file, _data_with_affine()) + assert "affine" in h5file + + def test_affine_shape(self, schema, h5file): + schema.write(h5file, _data_with_affine()) + assert h5file["affine"].shape == (4, 4) + + def test_affine_dtype(self, schema, h5file): + schema.write(h5file, _data_with_affine()) + assert h5file["affine"].dtype == np.float64 + + def test_affine_translation(self, schema, h5file): + schema.write(h5file, _data_with_affine()) + assert h5file["affine"][0, 3] == pytest.approx(10.0) + + def test_reference_frame_attr(self, schema, h5file): + schema.write(h5file, _data_with_affine()) + assert h5file.attrs["reference_frame"] == "LPS" + + +# --------------------------------------------------------------------------- +# write() — with dimensions +# --------------------------------------------------------------------------- + + +class TestWriteDimensions: + def test_dimensions_group_created(self, schema, h5file): + schema.write(h5file, _data_with_dimensions()) + assert "dimensions" in h5file + + def test_dimension_axis_groups(self, schema, h5file): + schema.write(h5file, _data_with_dimensions()) + assert "dimensions/Z" in h5file + assert "dimensions/Y" in h5file + assert "dimensions/X" in h5file + + def test_dimension_attrs(self, schema, h5file): + schema.write(h5file, _data_with_dimensions()) + z = h5file["dimensions/Z"] + assert z.attrs["label"] == "slice" + assert z.attrs["units"] == "mm" + assert z.attrs["spacing"] == pytest.approx(2.0) + + def test_all_dims_have_attrs(self, schema, h5file): + schema.write(h5file, _data_with_dimensions()) + for axis in ("Z", "Y", "X"): + grp = h5file[f"dimensions/{axis}"] + assert "label" in grp.attrs + assert "units" in grp.attrs + assert "spacing" in grp.attrs + + +# --------------------------------------------------------------------------- +# write() — 2D and integer data +# --------------------------------------------------------------------------- + + +class TestWriteVariousShapes: + def test_2d_array(self, schema, h5file): + schema.write(h5file, _2d_data()) + assert h5file["array"].shape == (64, 64) + assert h5file.attrs["dimension_order"] == "YX" + + def test_integer_array(self, schema, h5file): + schema.write(h5file, _integer_data()) + assert h5file["array"].dtype == np.int32 + assert h5file["array"].shape == (2, 3, 4) + + +# --------------------------------------------------------------------------- +# Integration — round-trip +# --------------------------------------------------------------------------- + + +class TestIntegration: + def test_create_verify_roundtrip(self, schema, tmp_path): + import fd5 + + register_schema("ndarray", schema) + data = _minimal_data() + with fd5.create( + tmp_path, + product="ndarray", + name="roundtrip-nda", + description="Ndarray round-trip test", + timestamp="2026-01-01T00:00:00Z", + ) as b: + b.write_product(data) + + files = list(tmp_path.glob("*.h5")) + assert len(files) == 1 + assert fd5.verify(str(files[0])) + + def test_create_validate_roundtrip(self, schema, h5path): + from fd5.schema import embed_schema, validate + + register_schema("ndarray", schema) + data = _data_with_affine() + with h5py.File(h5path, "w") as f: + for k, v in schema.required_root_attrs().items(): + f.attrs[k] = v + f.attrs["name"] = "integration-test-ndarray" + f.attrs["description"] = "Integration test ndarray" + embed_schema(f, schema.json_schema()) + schema.write(f, data) + + errors = validate(h5path) + assert errors == [], [e.message for e in errors] + + def test_roundtrip_with_dimensions(self, schema, tmp_path): + import fd5 + + register_schema("ndarray", schema) + data = _data_with_dimensions() + with fd5.create( + tmp_path, + product="ndarray", + name="dims-nda", + description="Ndarray with dimensions", + timestamp="2026-01-01T00:00:00Z", + ) as b: + b.write_product(data) + + files = list(tmp_path.glob("*.h5")) + assert len(files) == 1 + assert fd5.verify(str(files[0])) diff --git a/tests/test_tabular.py b/tests/test_tabular.py new file mode 100644 index 0000000..e972202 --- /dev/null +++ b/tests/test_tabular.py @@ -0,0 +1,319 @@ +"""Tests for fd5.generic.tabular — TabularSchema product schema.""" + +from __future__ import annotations + +import h5py +import numpy as np +import pytest + +from fd5.registry import ProductSchema, register_schema + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def schema(): + from fd5.generic.tabular import TabularSchema + + return TabularSchema() + + +@pytest.fixture() +def h5file(tmp_path): + path = tmp_path / "tabular.h5" + with h5py.File(path, "w") as f: + yield f + + +@pytest.fixture() +def h5path(tmp_path): + return tmp_path / "tabular.h5" + + +def _minimal_data(): + rng = np.random.default_rng(42) + return { + "columns": { + "temperature": rng.uniform(20, 30, size=50).astype(np.float64), + "pressure": rng.uniform(990, 1020, size=50).astype(np.float64), + "humidity": rng.uniform(30, 80, size=50).astype(np.float64), + }, + "description": "Test tabular data", + } + + +def _data_with_column_metadata(): + data = _minimal_data() + data["column_metadata"] = { + "temperature": {"units": "C", "description": "Ambient temperature"}, + "pressure": {"units": "hPa", "description": "Atmospheric pressure"}, + "humidity": {"units": "%", "description": "Relative humidity"}, + } + return data + + +def _data_with_row_labels(): + data = _minimal_data() + data["row_labels"] = [f"sample_{i}" for i in range(50)] + return data + + +def _integer_data(): + return { + "columns": { + "count": np.arange(20, dtype=np.int64), + "flag": np.ones(20, dtype=np.int32), + }, + "description": "Integer tabular data", + } + + +# --------------------------------------------------------------------------- +# Protocol conformance +# --------------------------------------------------------------------------- + + +class TestProtocolConformance: + def test_satisfies_product_schema_protocol(self, schema): + assert isinstance(schema, ProductSchema) + + def test_product_type_is_tabular(self, schema): + assert schema.product_type == "tabular" + + def test_schema_version_is_string(self, schema): + assert isinstance(schema.schema_version, str) + + def test_has_required_methods(self, schema): + assert callable(schema.json_schema) + assert callable(schema.required_root_attrs) + assert callable(schema.write) + assert callable(schema.id_inputs) + + +# --------------------------------------------------------------------------- +# json_schema() +# --------------------------------------------------------------------------- + + +class TestJsonSchema: + def test_returns_dict(self, schema): + result = schema.json_schema() + assert isinstance(result, dict) + + def test_has_draft_2020_12_meta(self, schema): + result = schema.json_schema() + assert result["$schema"] == "https://json-schema.org/draft/2020-12/schema" + + def test_product_const_is_tabular(self, schema): + result = schema.json_schema() + assert result["properties"]["product"]["const"] == "tabular" + + def test_valid_json_schema(self, schema): + import jsonschema + + result = schema.json_schema() + jsonschema.Draft202012Validator.check_schema(result) + + +# --------------------------------------------------------------------------- +# required_root_attrs() +# --------------------------------------------------------------------------- + + +class TestRequiredRootAttrs: + def test_returns_dict(self, schema): + result = schema.required_root_attrs() + assert isinstance(result, dict) + + def test_contains_product_tabular(self, schema): + result = schema.required_root_attrs() + assert result["product"] == "tabular" + + +# --------------------------------------------------------------------------- +# id_inputs() +# --------------------------------------------------------------------------- + + +class TestIdInputs: + def test_returns_list_of_strings(self, schema): + result = schema.id_inputs() + assert isinstance(result, list) + assert all(isinstance(s, str) for s in result) + + def test_contains_expected_keys(self, schema): + result = schema.id_inputs() + assert "product" in result + assert "name" in result + assert "timestamp" in result + + def test_returns_fresh_list(self, schema): + a = schema.id_inputs() + b = schema.id_inputs() + assert a is not b + + def test_deterministic(self, schema): + assert schema.id_inputs() == schema.id_inputs() + + +# --------------------------------------------------------------------------- +# write() — minimal data +# --------------------------------------------------------------------------- + + +class TestWriteMinimal: + def test_writes_table_group(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert "table" in h5file + + def test_writes_columns(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert "table/temperature" in h5file + assert "table/pressure" in h5file + assert "table/humidity" in h5file + + def test_column_shape(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert h5file["table/temperature"].shape == (50,) + + def test_column_gzip_compressed(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert h5file["table/temperature"].compression == "gzip" + assert h5file["table/temperature"].compression_opts == 4 + + def test_column_count_attr(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert int(h5file.attrs["column_count"]) == 3 + + def test_row_count_attr(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert int(h5file.attrs["row_count"]) == 50 + + def test_no_row_labels_when_absent(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert "row_labels" not in h5file + + def test_roundtrip_column_data(self, schema, h5file): + data = _minimal_data() + schema.write(h5file, data) + np.testing.assert_array_almost_equal( + h5file["table/temperature"][:], data["columns"]["temperature"] + ) + + +# --------------------------------------------------------------------------- +# write() — with column metadata +# --------------------------------------------------------------------------- + + +class TestWriteColumnMetadata: + def test_column_units_attr(self, schema, h5file): + schema.write(h5file, _data_with_column_metadata()) + assert h5file["table/temperature"].attrs["units"] == "C" + + def test_column_description_attr(self, schema, h5file): + schema.write(h5file, _data_with_column_metadata()) + assert h5file["table/temperature"].attrs["description"] == "Ambient temperature" + + def test_all_columns_have_metadata(self, schema, h5file): + schema.write(h5file, _data_with_column_metadata()) + for col in ("temperature", "pressure", "humidity"): + assert "units" in h5file[f"table/{col}"].attrs + assert "description" in h5file[f"table/{col}"].attrs + + +# --------------------------------------------------------------------------- +# write() — with row labels +# --------------------------------------------------------------------------- + + +class TestWriteRowLabels: + def test_row_labels_created(self, schema, h5file): + schema.write(h5file, _data_with_row_labels()) + assert "row_labels" in h5file + + def test_row_labels_values(self, schema, h5file): + schema.write(h5file, _data_with_row_labels()) + raw = h5file["row_labels"][:] + labels = [v.decode("utf-8") if isinstance(v, bytes) else str(v) for v in raw] + assert labels[0] == "sample_0" + assert labels[-1] == "sample_49" + assert len(labels) == 50 + + +# --------------------------------------------------------------------------- +# write() — integer data +# --------------------------------------------------------------------------- + + +class TestWriteIntegerData: + def test_integer_columns(self, schema, h5file): + schema.write(h5file, _integer_data()) + assert h5file["table/count"].dtype == np.int64 + assert h5file["table/flag"].dtype == np.int32 + + def test_integer_row_count(self, schema, h5file): + schema.write(h5file, _integer_data()) + assert int(h5file.attrs["row_count"]) == 20 + + +# --------------------------------------------------------------------------- +# Integration — round-trip +# --------------------------------------------------------------------------- + + +class TestIntegration: + def test_create_verify_roundtrip(self, schema, tmp_path): + import fd5 + + register_schema("tabular", schema) + data = _minimal_data() + with fd5.create( + tmp_path, + product="tabular", + name="roundtrip-tab", + description="Tabular round-trip test", + timestamp="2026-01-01T00:00:00Z", + ) as b: + b.write_product(data) + + files = list(tmp_path.glob("*.h5")) + assert len(files) == 1 + assert fd5.verify(str(files[0])) + + def test_create_validate_roundtrip(self, schema, h5path): + from fd5.schema import embed_schema, validate + + register_schema("tabular", schema) + data = _data_with_column_metadata() + with h5py.File(h5path, "w") as f: + for k, v in schema.required_root_attrs().items(): + f.attrs[k] = v + f.attrs["name"] = "integration-test-tabular" + f.attrs["description"] = "Integration test tabular" + embed_schema(f, schema.json_schema()) + schema.write(f, data) + + errors = validate(h5path) + assert errors == [], [e.message for e in errors] + + def test_roundtrip_with_row_labels(self, schema, tmp_path): + import fd5 + + register_schema("tabular", schema) + data = _data_with_row_labels() + with fd5.create( + tmp_path, + product="tabular", + name="labels-tab", + description="Tabular with row labels", + timestamp="2026-01-01T00:00:00Z", + ) as b: + b.write_product(data) + + files = list(tmp_path.glob("*.h5")) + assert len(files) == 1 + assert fd5.verify(str(files[0])) diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py new file mode 100644 index 0000000..dac3d3f --- /dev/null +++ b/tests/test_timeseries.py @@ -0,0 +1,333 @@ +"""Tests for fd5.generic.timeseries — TimeseriesSchema product schema.""" + +from __future__ import annotations + +import h5py +import numpy as np +import pytest + +from fd5.registry import ProductSchema, register_schema + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def schema(): + from fd5.generic.timeseries import TimeseriesSchema + + return TimeseriesSchema() + + +@pytest.fixture() +def h5file(tmp_path): + path = tmp_path / "timeseries.h5" + with h5py.File(path, "w") as f: + yield f + + +@pytest.fixture() +def h5path(tmp_path): + return tmp_path / "timeseries.h5" + + +def _minimal_data(): + rng = np.random.default_rng(42) + return { + "signals": { + "ecg": rng.standard_normal(1000).astype(np.float64), + "resp": rng.standard_normal(1000).astype(np.float64), + }, + "time": np.linspace(0, 10, 1000), + "sampling_rate": 100.0, + "description": "Test timeseries", + } + + +def _data_with_events(): + data = _minimal_data() + data["events"] = { + "timestamps": np.array([1.0, 3.5, 7.2]), + "labels": ["start", "peak", "end"], + } + return data + + +def _data_with_metadata(): + data = _minimal_data() + data["metadata"] = { + "sensor_type": "optical", + "gain": 2.5, + "channel_count": 2, + } + return data + + +# --------------------------------------------------------------------------- +# Protocol conformance +# --------------------------------------------------------------------------- + + +class TestProtocolConformance: + def test_satisfies_product_schema_protocol(self, schema): + assert isinstance(schema, ProductSchema) + + def test_product_type_is_timeseries(self, schema): + assert schema.product_type == "timeseries" + + def test_schema_version_is_string(self, schema): + assert isinstance(schema.schema_version, str) + + def test_has_required_methods(self, schema): + assert callable(schema.json_schema) + assert callable(schema.required_root_attrs) + assert callable(schema.write) + assert callable(schema.id_inputs) + + +# --------------------------------------------------------------------------- +# json_schema() +# --------------------------------------------------------------------------- + + +class TestJsonSchema: + def test_returns_dict(self, schema): + result = schema.json_schema() + assert isinstance(result, dict) + + def test_has_draft_2020_12_meta(self, schema): + result = schema.json_schema() + assert result["$schema"] == "https://json-schema.org/draft/2020-12/schema" + + def test_product_const_is_timeseries(self, schema): + result = schema.json_schema() + assert result["properties"]["product"]["const"] == "timeseries" + + def test_valid_json_schema(self, schema): + import jsonschema + + result = schema.json_schema() + jsonschema.Draft202012Validator.check_schema(result) + + +# --------------------------------------------------------------------------- +# required_root_attrs() +# --------------------------------------------------------------------------- + + +class TestRequiredRootAttrs: + def test_returns_dict(self, schema): + result = schema.required_root_attrs() + assert isinstance(result, dict) + + def test_contains_product_timeseries(self, schema): + result = schema.required_root_attrs() + assert result["product"] == "timeseries" + + +# --------------------------------------------------------------------------- +# id_inputs() +# --------------------------------------------------------------------------- + + +class TestIdInputs: + def test_returns_list_of_strings(self, schema): + result = schema.id_inputs() + assert isinstance(result, list) + assert all(isinstance(s, str) for s in result) + + def test_contains_expected_keys(self, schema): + result = schema.id_inputs() + assert "product" in result + assert "name" in result + assert "timestamp" in result + + def test_returns_fresh_list(self, schema): + a = schema.id_inputs() + b = schema.id_inputs() + assert a is not b + + def test_deterministic(self, schema): + assert schema.id_inputs() == schema.id_inputs() + + +# --------------------------------------------------------------------------- +# write() — minimal data +# --------------------------------------------------------------------------- + + +class TestWriteMinimal: + def test_writes_signals_group(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert "signals" in h5file + + def test_writes_signal_channels(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert "signals/ecg" in h5file + assert "signals/resp" in h5file + + def test_signal_dtype(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert h5file["signals/ecg"].dtype == np.float64 + + def test_signal_shape(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert h5file["signals/ecg"].shape == (1000,) + + def test_signal_gzip_compressed(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert h5file["signals/ecg"].compression == "gzip" + assert h5file["signals/ecg"].compression_opts == 4 + + def test_signal_has_units_attr(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert "units" in h5file["signals/ecg"].attrs + + def test_signal_has_description_attr(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert "description" in h5file["signals/ecg"].attrs + + def test_writes_time_dataset(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert "time" in h5file + assert h5file["time"].dtype == np.float64 + + def test_time_units_attr(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert h5file["time"].attrs["units"] == "s" + + def test_time_shape(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert h5file["time"].shape == (1000,) + + def test_sampling_rate_attr(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert h5file.attrs["sampling_rate"] == pytest.approx(100.0) + + def test_sampling_rate_units_attr(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert h5file.attrs["sampling_rate_units"] == "Hz" + + def test_roundtrip_time_data(self, schema, h5file): + data = _minimal_data() + schema.write(h5file, data) + np.testing.assert_array_almost_equal(h5file["time"][:], data["time"]) + + def test_no_events_when_absent(self, schema, h5file): + schema.write(h5file, _minimal_data()) + assert "events" not in h5file + + +# --------------------------------------------------------------------------- +# write() — with events +# --------------------------------------------------------------------------- + + +class TestWriteEvents: + def test_events_group_created(self, schema, h5file): + schema.write(h5file, _data_with_events()) + assert "events" in h5file + + def test_events_timestamps(self, schema, h5file): + data = _data_with_events() + schema.write(h5file, data) + np.testing.assert_array_almost_equal( + h5file["events/timestamps"][:], [1.0, 3.5, 7.2] + ) + + def test_events_labels(self, schema, h5file): + schema.write(h5file, _data_with_events()) + raw = h5file["events/labels"][:] + labels = [v.decode("utf-8") if isinstance(v, bytes) else str(v) for v in raw] + assert labels == ["start", "peak", "end"] + + +# --------------------------------------------------------------------------- +# write() — with metadata +# --------------------------------------------------------------------------- + + +class TestWriteMetadata: + def test_metadata_attrs_written(self, schema, h5file): + schema.write(h5file, _data_with_metadata()) + assert h5file.attrs["sensor_type"] == "optical" + assert float(h5file.attrs["gain"]) == pytest.approx(2.5) + assert int(h5file.attrs["channel_count"]) == 2 + + +# --------------------------------------------------------------------------- +# write() — single channel +# --------------------------------------------------------------------------- + + +class TestWriteSingleChannel: + def test_single_channel(self, schema, h5file): + data = { + "signals": {"temperature": np.ones(100)}, + "time": np.arange(100, dtype=np.float64), + "description": "Single channel", + } + schema.write(h5file, data) + assert "signals/temperature" in h5file + assert h5file["signals/temperature"].shape == (100,) + + +# --------------------------------------------------------------------------- +# Integration — round-trip +# --------------------------------------------------------------------------- + + +class TestIntegration: + def test_create_verify_roundtrip(self, schema, tmp_path): + import fd5 + + register_schema("timeseries", schema) + data = _minimal_data() + with fd5.create( + tmp_path, + product="timeseries", + name="roundtrip-ts", + description="Timeseries round-trip test", + timestamp="2026-01-01T00:00:00Z", + ) as b: + b.write_product(data) + + files = list(tmp_path.glob("*.h5")) + assert len(files) == 1 + assert fd5.verify(str(files[0])) + + def test_create_validate_roundtrip(self, schema, h5path): + from fd5.schema import embed_schema, validate + + register_schema("timeseries", schema) + data = _minimal_data() + with h5py.File(h5path, "w") as f: + for k, v in schema.required_root_attrs().items(): + f.attrs[k] = v + f.attrs["name"] = "integration-test-timeseries" + f.attrs["description"] = "Integration test timeseries" + embed_schema(f, schema.json_schema()) + schema.write(f, data) + + errors = validate(h5path) + assert errors == [], [e.message for e in errors] + + def test_roundtrip_with_events(self, schema, tmp_path): + import fd5 + + register_schema("timeseries", schema) + data = _data_with_events() + with fd5.create( + tmp_path, + product="timeseries", + name="events-ts", + description="Timeseries with events", + timestamp="2026-01-01T00:00:00Z", + ) as b: + b.write_product(data) + + files = list(tmp_path.glob("*.h5")) + assert len(files) == 1 + assert fd5.verify(str(files[0]))