Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ calibration = "fd5.imaging.calibration:CalibrationSchema"
spectrum = "fd5.imaging.spectrum:SpectrumSchema"
roi = "fd5.imaging.roi:RoiSchema"
device_data = "fd5.imaging.device_data:DeviceDataSchema"
timeseries = "fd5.generic.timeseries:TimeseriesSchema"
tabular = "fd5.generic.tabular:TabularSchema"
ndarray = "fd5.generic.ndarray:NdarraySchema"

[tool.coverage.run]
source = ["src/fd5"]
Expand Down
5 changes: 5 additions & 0 deletions src/fd5/generic/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from fd5.generic.ndarray import NdarraySchema
from fd5.generic.tabular import TabularSchema
from fd5.generic.timeseries import TimeseriesSchema

__all__ = ["NdarraySchema", "TabularSchema", "TimeseriesSchema"]
101 changes: 101 additions & 0 deletions src/fd5/generic/ndarray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""fd5.generic.ndarray — N-dimensional array product schema.

Implements the ``ndarray`` product schema for arbitrary N-dimensional arrays:
sensor grids, image stacks, simulation output, and similar dense numeric data.
"""

from __future__ import annotations

from typing import Any

import h5py
import numpy as np

_SCHEMA_VERSION = "1.0.0"

_GZIP_LEVEL = 4

_ID_INPUTS = ["product", "name", "timestamp"]


class NdarraySchema:
"""Product schema for arbitrary N-dimensional arrays (``ndarray``)."""

product_type: str = "ndarray"
schema_version: str = _SCHEMA_VERSION

def json_schema(self) -> dict[str, Any]:
return {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"properties": {
"_schema_version": {"type": "integer"},
"product": {"type": "string", "const": "ndarray"},
"name": {"type": "string"},
"description": {"type": "string"},
"dimension_order": {"type": "string"},
"reference_frame": {"type": "string"},
},
"required": ["_schema_version", "product", "name", "description"],
}

def required_root_attrs(self) -> dict[str, Any]:
return {"product": "ndarray"}

def id_inputs(self) -> list[str]:
return list(_ID_INPUTS)

def write(self, target: h5py.File | h5py.Group, data: dict[str, Any]) -> None:
"""Write N-dimensional array data to *target*.

*data* must contain:
- ``array``: numpy ndarray (any shape, any numeric dtype)
- ``dimension_order``: str (e.g. "TZYX", "XY", "ChannelHeightWidth")
- ``description``: str

Optional keys:
- ``affine``: (4,4) float64 array
- ``reference_frame``: str (e.g. "LPS", "RAS")
- ``dimensions``: dict mapping axis_name to
``{"label": str, "units": str, "spacing": float}``
"""
arr = np.asarray(data["array"])
target.create_dataset(
"array",
data=arr,
compression="gzip",
compression_opts=_GZIP_LEVEL,
)

target.attrs["dimension_order"] = data["dimension_order"]

if "reference_frame" in data:
target.attrs["reference_frame"] = data["reference_frame"]

if "affine" in data:
target.create_dataset(
"affine",
data=np.asarray(data["affine"], dtype=np.float64),
)

if "dimensions" in data:
self._write_dimensions(target, data["dimensions"])

# ------------------------------------------------------------------
# Dimensions
# ------------------------------------------------------------------

def _write_dimensions(
self,
target: h5py.File | h5py.Group,
dimensions: dict[str, dict[str, Any]],
) -> None:
dims_grp = target.create_group("dimensions")
for axis_name, dim_info in dimensions.items():
ax_grp = dims_grp.create_group(axis_name)
if "label" in dim_info:
ax_grp.attrs["label"] = dim_info["label"]
if "units" in dim_info:
ax_grp.attrs["units"] = dim_info["units"]
if "spacing" in dim_info:
ax_grp.attrs["spacing"] = np.float64(dim_info["spacing"])
109 changes: 109 additions & 0 deletions src/fd5/generic/tabular.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""fd5.generic.tabular — Tabular product schema for spreadsheet-like data.

Implements the ``tabular`` product schema for lab measurements, clinical
records, parameter tables, and similar column-oriented data.
"""

from __future__ import annotations

from typing import Any

import h5py
import numpy as np

_SCHEMA_VERSION = "1.0.0"

_GZIP_LEVEL = 4

_ID_INPUTS = ["product", "name", "timestamp"]


class TabularSchema:
"""Product schema for column-oriented tabular data (``tabular``)."""

product_type: str = "tabular"
schema_version: str = _SCHEMA_VERSION

def json_schema(self) -> dict[str, Any]:
return {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"properties": {
"_schema_version": {"type": "integer"},
"product": {"type": "string", "const": "tabular"},
"name": {"type": "string"},
"description": {"type": "string"},
"column_count": {"type": "integer"},
"row_count": {"type": "integer"},
},
"required": ["_schema_version", "product", "name", "description"],
}

def required_root_attrs(self) -> dict[str, Any]:
return {"product": "tabular"}

def id_inputs(self) -> list[str]:
return list(_ID_INPUTS)

def write(self, target: h5py.File | h5py.Group, data: dict[str, Any]) -> None:
"""Write tabular data to *target*.

*data* must contain:
- ``columns``: dict mapping column_name to numpy array (all same length)
- ``description``: str

Optional keys:
- ``column_metadata``: dict mapping column_name to
``{"units": str, "description": str}``
- ``row_labels``: numpy array or list of str (row identifiers)
"""
columns = data["columns"]
column_metadata = data.get("column_metadata", {})

row_count = len(next(iter(columns.values())))
target.attrs["column_count"] = np.int64(len(columns))
target.attrs["row_count"] = np.int64(row_count)

self._write_table(target, columns, column_metadata)

if "row_labels" in data:
self._write_row_labels(target, data["row_labels"])

# ------------------------------------------------------------------
# Table
# ------------------------------------------------------------------

def _write_table(
self,
target: h5py.File | h5py.Group,
columns: dict[str, Any],
column_metadata: dict[str, dict[str, str]],
) -> None:
table_grp = target.create_group("table")
for col_name, arr in columns.items():
arr = np.asarray(arr)
ds = table_grp.create_dataset(
col_name,
data=arr,
compression="gzip",
compression_opts=_GZIP_LEVEL,
)
meta = column_metadata.get(col_name, {})
if "units" in meta:
ds.attrs["units"] = meta["units"]
if "description" in meta:
ds.attrs["description"] = meta["description"]

# ------------------------------------------------------------------
# Row labels
# ------------------------------------------------------------------

def _write_row_labels(
self,
target: h5py.File | h5py.Group,
row_labels: Any,
) -> None:
encoded = [s.encode("utf-8") if isinstance(s, str) else s for s in row_labels]
max_len = max(len(b) for b in encoded) if encoded else 1
labels_arr = np.array(encoded, dtype=f"S{max_len}")
target.create_dataset("row_labels", data=labels_arr)
150 changes: 150 additions & 0 deletions src/fd5/generic/timeseries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""fd5.generic.timeseries — Timeseries product schema for continuous sensor data.

Implements the ``timeseries`` product schema for continuous sensor data,
physiological monitoring, IoT streams, and similar time-domain signals.
"""

from __future__ import annotations

from typing import Any

import h5py
import numpy as np

_SCHEMA_VERSION = "1.0.0"

_GZIP_LEVEL = 4

_ID_INPUTS = ["product", "name", "timestamp"]


class TimeseriesSchema:
"""Product schema for continuous time-series data (``timeseries``)."""

product_type: str = "timeseries"
schema_version: str = _SCHEMA_VERSION

def json_schema(self) -> dict[str, Any]:
return {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"properties": {
"_schema_version": {"type": "integer"},
"product": {"type": "string", "const": "timeseries"},
"name": {"type": "string"},
"description": {"type": "string"},
"sampling_rate": {"type": "number"},
"sampling_rate_units": {"type": "string", "const": "Hz"},
},
"required": ["_schema_version", "product", "name", "description"],
}

def required_root_attrs(self) -> dict[str, Any]:
return {"product": "timeseries"}

def id_inputs(self) -> list[str]:
return list(_ID_INPUTS)

def write(self, target: h5py.File | h5py.Group, data: dict[str, Any]) -> None:
"""Write timeseries data to *target*.

*data* must contain:
- ``signals``: dict mapping channel_name to numpy 1D float array
- ``time``: numpy 1D float array (timestamps in seconds)
- ``description``: str

Optional keys:
- ``sampling_rate``: float (Hz)
- ``events``: dict with ``timestamps`` (array) and ``labels`` (list[str])
- ``metadata``: dict of additional attrs
"""
self._write_signals(target, data["signals"], data.get("description", ""))
self._write_time(target, data["time"])

if "sampling_rate" in data:
target.attrs["sampling_rate"] = np.float64(data["sampling_rate"])
target.attrs["sampling_rate_units"] = "Hz"

if "events" in data:
self._write_events(target, data["events"])

if "metadata" in data:
self._write_metadata(target, data["metadata"])

# ------------------------------------------------------------------
# Signals
# ------------------------------------------------------------------

def _write_signals(
self,
target: h5py.File | h5py.Group,
signals: dict[str, Any],
description: str,
) -> None:
signals_grp = target.create_group("signals")
for name, arr in signals.items():
ds = signals_grp.create_dataset(
name,
data=np.asarray(arr, dtype=np.float64),
compression="gzip",
compression_opts=_GZIP_LEVEL,
)
ds.attrs["units"] = "a.u."
ds.attrs["description"] = description

# ------------------------------------------------------------------
# Time
# ------------------------------------------------------------------

def _write_time(
self,
target: h5py.File | h5py.Group,
time: Any,
) -> None:
ds = target.create_dataset(
"time",
data=np.asarray(time, dtype=np.float64),
compression="gzip",
compression_opts=_GZIP_LEVEL,
)
ds.attrs["units"] = "s"

# ------------------------------------------------------------------
# Events
# ------------------------------------------------------------------

def _write_events(
self,
target: h5py.File | h5py.Group,
events: dict[str, Any],
) -> None:
events_grp = target.create_group("events")
events_grp.create_dataset(
"timestamps",
data=np.asarray(events["timestamps"], dtype=np.float64),
compression="gzip",
compression_opts=_GZIP_LEVEL,
)
encoded = [
s.encode("utf-8") if isinstance(s, str) else s for s in events["labels"]
]
max_len = max(len(b) for b in encoded) if encoded else 1
labels_arr = np.array(encoded, dtype=f"S{max_len}")
events_grp.create_dataset("labels", data=labels_arr)

# ------------------------------------------------------------------
# Metadata
# ------------------------------------------------------------------

def _write_metadata(
self,
target: h5py.File | h5py.Group,
metadata: dict[str, Any],
) -> None:
for key, value in metadata.items():
if isinstance(value, float):
target.attrs[key] = np.float64(value)
elif isinstance(value, int):
target.attrs[key] = np.int64(value)
elif isinstance(value, str):
target.attrs[key] = value
Loading
Loading