From 810045a4ddd8f2e6b2298254074b9416b9bb8552 Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Thu, 10 Jul 2025 16:39:36 -0600 Subject: [PATCH 01/20] check for array of arrays and convert to ndarray --- tiled/adapters/array.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tiled/adapters/array.py b/tiled/adapters/array.py index 36628c1e5..eee583024 100644 --- a/tiled/adapters/array.py +++ b/tiled/adapters/array.py @@ -82,6 +82,15 @@ def from_array( if not hasattr(array, "__array__"): array = numpy.asanyarray(array) + # Convert array of arrays to ND array to expose the underlying dtype + is_array_of_arrays = ( + array.dtype == "object" + and array.shape[0] + and isinstance(array[0], numpy.ndarray) + ) + if is_array_of_arrays: + array = numpy.vstack(array) + # Convert (experimental) pandas.StringDtype to numpy's unicode string dtype is_likely_string_dtype = isinstance(array.dtype, pandas.StringDtype) or ( array.dtype == "object" and array.dtype.fields is None From 7b48ee9ddbfec74e374d9c6062be3a89ae170a19 Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Tue, 5 Aug 2025 10:53:52 -0600 Subject: [PATCH 02/20] Add ragged dependency --- pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 4a996548d..bb1b152aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,6 +92,7 @@ all = [ "python-dateutil", "python-jose[cryptography]", "python-multipart", + "ragged", "rich", "sparse", "sqlalchemy[asyncio] >=2", @@ -121,6 +122,7 @@ client = [ "numpy", "pandas", "pyarrow", + "ragged", "rich", "sparse", "stamina", @@ -242,6 +244,7 @@ server = [ "python-dateutil", "python-jose[cryptography]", "python-multipart", + "ragged", "sparse", "sqlalchemy[asyncio] >=2", "starlette >=0.38.0", From 4543b7329a85d68026256984ed5fefe3e3e9ae7a Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Thu, 14 Aug 2025 18:59:56 -0600 Subject: [PATCH 03/20] From SQLAdapter, test Array-, Ragged-, then AwkwardAdapter --- tiled/adapters/array.py | 4 +-- tiled/adapters/ragged.py | 72 ++++++++++++++++++++++++++++++++++++++ tiled/adapters/sql.py | 65 +++++++++++++++++++++++++++++++--- tiled/structures/core.py | 4 +++ tiled/structures/ragged.py | 42 ++++++++++++++++++++++ 5 files changed, 180 insertions(+), 7 deletions(-) create mode 100644 tiled/adapters/ragged.py create mode 100644 tiled/structures/ragged.py diff --git a/tiled/adapters/array.py b/tiled/adapters/array.py index 7c8b21a1e..e86ce67fe 100644 --- a/tiled/adapters/array.py +++ b/tiled/adapters/array.py @@ -1,5 +1,5 @@ import contextlib -from typing import Any, List, Optional, Set, Tuple +from typing import Any, List, Optional, Sequence, Set, Tuple, cast import dask.array import numpy @@ -92,7 +92,7 @@ def from_array( if is_array_of_arrays: with contextlib.suppress(ValueError): # only uniform arrays (with same dimensions) are stackable - array = numpy.vstack(array) + array = numpy.vstack(cast("Sequence[numpy.ndarray]", array)) # Convert (experimental) pandas.StringDtype to numpy's unicode string dtype is_likely_string_dtype = isinstance(array.dtype, pandas.StringDtype) or ( diff --git a/tiled/adapters/ragged.py b/tiled/adapters/ragged.py new file mode 100644 index 000000000..f501f1da8 --- /dev/null +++ b/tiled/adapters/ragged.py @@ -0,0 +1,72 @@ +from typing import Any, List, Optional, Set + +import ragged +from numpy.typing import NDArray + +from tiled.ndslice import NDSlice +from tiled.storage import Storage +from tiled.structures.core import Spec, StructureFamily +from tiled.structures.ragged import RaggedStructure +from tiled.type_aliases import JSON + + +class RaggedAdapter: + structure_family = StructureFamily.ragged + supported_storage: Set[type[Storage]] = set() + + def __init__( + self, + array: ragged.array, + structure: RaggedStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + ) -> None: + """ + + Parameters + ---------- + array : + structure : + metadata : + specs : + """ + self._array = array + self._structure = structure + self._metadata = metadata or {} + self.specs = list(specs or []) + + @classmethod + def from_array( + cls, + array: NDArray[Any], + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + ) -> "RaggedAdapter": + """ + + Parameters + ---------- + array : + metadata : + specs : + + Returns + ------- + + """ + structure = RaggedStructure.from_array(array) + return cls( + array, + structure, + metadata=metadata, + specs=specs, + ) + + def metadata(self) -> JSON: + return self._metadata + + def __repr__(self) -> str: + return f"{type(self).__name__}({self._structure})" + + def read(self, slice: NDSlice = NDSlice(...)) -> ragged.array: + return self._array[tuple(slice)] if slice else self._array diff --git a/tiled/adapters/sql.py b/tiled/adapters/sql.py index 96b7d4daf..03aeb34dc 100644 --- a/tiled/adapters/sql.py +++ b/tiled/adapters/sql.py @@ -1,8 +1,20 @@ import copy import hashlib +import logging import re from contextlib import closing -from typing import Any, Callable, Iterator, List, Literal, Optional, Tuple, Union, cast +from typing import ( + Any, + Callable, + Iterator, + List, + Literal, + Optional, + Sequence, + Tuple, + Union, + cast, +) import numpy import pandas @@ -16,6 +28,8 @@ from ..structures.table import TableStructure from ..type_aliases import JSON from .array import ArrayAdapter +from .awkward import AwkwardAdapter +from .ragged import RaggedAdapter from .utils import init_adapter_from_catalog DIALECTS = Literal["postgresql", "sqlite", "duckdb"] @@ -47,6 +61,8 @@ # e.g. "A" and "a" will raise an error. # Furthermore, user-specified table names can only be in lower case. +logger = logging.getLogger(__name__) + class SQLAdapter: """SQLAdapter Class @@ -202,7 +218,7 @@ def structure(self) -> TableStructure: """ return self._structure - def get(self, key: str) -> Union[ArrayAdapter, None]: + def get(self, key: str) -> Union[ArrayAdapter, AwkwardAdapter, RaggedAdapter, None]: """Get the data for a specific key Parameters @@ -217,7 +233,9 @@ def get(self, key: str) -> Union[ArrayAdapter, None]: return None return self[key] - def __getitem__(self, key: str) -> ArrayAdapter: + def __getitem__( + self, key: str + ) -> Union[ArrayAdapter, AwkwardAdapter, RaggedAdapter]: """Get the data for a specific key. Parameters @@ -230,9 +248,46 @@ def __getitem__(self, key: str) -> ArrayAdapter: """ # Must compute to determine shape. - return ArrayAdapter.from_array(self.read([key])[key].values) + array = self.read([key])[key].to_numpy() + if array.dtype != "object": + return ArrayAdapter.from_array(array) + + if ( + array.dtype == "object" + and len(array) + and isinstance(array[0], numpy.ndarray) + ): + # accumulate errors until an attempt succeeds + errors: List[Exception] = [] + + try: + array = numpy.vstack(cast("Sequence[numpy.ndarray]", array)) + return ArrayAdapter.from_array(array) + except ValueError as err: + errors.append(err) + + try: + return RaggedAdapter.from_array(array) + except Exception as err: + errors.append(err) + + try: + return AwkwardAdapter.from_array(array) + except Exception as err: + errors.append(err) + + logger.error( + "No adapter found that accepts object-array at key %s. (%s)", + key, + errors, + ) + # fallback to string representation conversion in ArrayAdapter - def items(self) -> Iterator[Tuple[str, ArrayAdapter]]: + return ArrayAdapter.from_array(array) + + def items( + self, + ) -> Iterator[Tuple[str, Union[ArrayAdapter, AwkwardAdapter, RaggedAdapter]]]: """Iterate over the SQLAdapter data. Returns diff --git a/tiled/structures/core.py b/tiled/structures/core.py index e812a5e17..8cae558ac 100644 --- a/tiled/structures/core.py +++ b/tiled/structures/core.py @@ -17,6 +17,7 @@ class StructureFamily(str, enum.Enum): awkward = "awkward" composite = "composite" container = "container" + ragged = "ragged" sparse = "sparse" table = "table" @@ -61,5 +62,8 @@ def dict(self) -> Dict[str, Optional[str]]: StructureFamily.sparse: lambda: importlib.import_module( "...structures.sparse", StructureFamily.__module__ ).SparseStructure, + StructureFamily.ragged: lambda: importlib.import_module( + "...structures.ragged", StructureFamily.__module__ + ).RaggedStructure, } ) diff --git a/tiled/structures/ragged.py b/tiled/structures/ragged.py new file mode 100644 index 000000000..a7ff32c89 --- /dev/null +++ b/tiled/structures/ragged.py @@ -0,0 +1,42 @@ +from dataclasses import dataclass +from typing import Tuple, Union + +import ragged + +from tiled.structures.array import ArrayStructure, BuiltinDtype, StructDtype + + +@dataclass() +class RaggedStructure(ArrayStructure): + shape: Tuple[Union[int, None], ...] # type: ignore[reportIncompatibleVariableOverride] + + @classmethod + def from_array(cls, array, shape=None, chunks=None, dims=None) -> "RaggedStructure": + from dask.array.core import normalize_chunks + + # TODO: test, or implement conversion from, AwkwardArrays + array = ragged.asarray(array) + + if shape is None: + shape = array.shape + if chunks is None: + chunks = ("auto",) * len(shape) + + # TODO test chunking: I think this should default to the largest superset of "shapes" + normalized_chunks = normalize_chunks( + chunks, + shape=shape, + dtype=array.dtype, + ) + if array.dtype.fields is not None: + data_type = StructDtype.from_numpy_dtype(array.dtype) + else: + data_type = BuiltinDtype.from_numpy_dtype(array.dtype) + + return cls( + data_type=data_type, + chunks=normalized_chunks, + shape=shape, + dims=dims, + resizable=False, + ) From 62ff00e1db350456ea6f40cd7b5eba25db96a70b Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Tue, 19 Aug 2025 13:14:12 -0600 Subject: [PATCH 04/20] Test returned adapters, without nullable data types --- tiled/_tests/adapters/test_sql.py | 51 +++++++++++++++++-- tiled/_tests/adapters/test_sql_arrays.py | 64 +++++++++++++++++++++--- 2 files changed, 104 insertions(+), 11 deletions(-) diff --git a/tiled/_tests/adapters/test_sql.py b/tiled/_tests/adapters/test_sql.py index 9b23f1b51..3ccbc72d8 100644 --- a/tiled/_tests/adapters/test_sql.py +++ b/tiled/_tests/adapters/test_sql.py @@ -5,6 +5,7 @@ import pyarrow as pa import pytest +from tiled.adapters.array import ArrayAdapter from tiled.adapters.sql import ( COLUMN_NAME_PATTERN, TABLE_NAME_PATTERN, @@ -21,20 +22,25 @@ data0 = [ pa.array([1, 2, 3, 4, 5]), pa.array([1.0, 2.0, 3.0, 4.0, 5.0]), - pa.array(["foo0", "bar0", "baz0", None, "goo0"]), - pa.array([True, None, False, True, None]), + # pa.array(["foo0", "bar0", "baz0", None, "goo0"]), + # pa.array([True, None, False, True, None]), + pa.array(["foo0", "bar0", "baz0", "None", "goo0"]), + pa.array([True, bool(None), False, True, bool(None)]), ] data1 = [ pa.array([6, 7, 8, 9, 10, 11, 12]), pa.array([6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0]), - pa.array(["foo1", "bar1", None, "baz1", "biz", None, "goo"]), - pa.array([None, True, True, False, False, None, True]), + # pa.array(["foo1", "bar1", None, "baz1", "biz", None, "goo"]), + # pa.array([None, True, True, False, False, None, True]), + pa.array(["foo1", "bar1", "None", "baz1", "biz", "None", "goo"]), + pa.array([bool(None), True, True, False, False, bool(None), True]), ] data2 = [ pa.array([13, 14]), pa.array([13.0, 14.0]), pa.array(["foo2", "baz2"]), - pa.array([False, None]), + # pa.array([False, None]), + pa.array([False, bool(None)]), ] batch0 = pa.record_batch(data0, names=names) @@ -797,3 +803,38 @@ def deep_array_equal(a1: Any, a2: Any) -> bool: assert deep_array_equal(result_part, result_full) storage.dispose() # Close all connections + + +@pytest.mark.parametrize( + "sql_adapter_name", + [ + "adapter_duckdb_many_partitions", + "adapter_psql_many_partitions", + "adapter_sqlite_many_partitions", + ], +) +@pytest.mark.parametrize("field", names) +def test_compare_field_data_from_array_adapter( + sql_adapter_name: str, + field: str, + request: pytest.FixtureRequest, +) -> None: + # get adapter from fixture + sql_adapter: SQLAdapter = request.getfixturevalue(sql_adapter_name) + + table = pa.Table.from_batches([batch0, batch1, batch2]) + sql_adapter.append_partition(table, 0) + + array_adapter = sql_adapter[field] + assert isinstance(array_adapter, ArrayAdapter) + + result_read = array_adapter.read() + field_index = names.index(field) + assert np.array_equal( + [ + *data0[field_index].tolist(), + *data1[field_index].tolist(), + *data2[field_index].tolist(), + ], + result_read.tolist(), + ) diff --git a/tiled/_tests/adapters/test_sql_arrays.py b/tiled/_tests/adapters/test_sql_arrays.py index efaedb3d9..218c426f5 100644 --- a/tiled/_tests/adapters/test_sql_arrays.py +++ b/tiled/_tests/adapters/test_sql_arrays.py @@ -1,5 +1,6 @@ -from typing import Callable, cast +from typing import Callable, Dict, Type, Union, cast +import awkward as ak import numpy as np import pyarrow as pa import pytest @@ -9,6 +10,9 @@ from tiled._tests.adapters.test_sql import adapter_psql_many_partitions # noqa: F401 from tiled._tests.adapters.test_sql import adapter_psql_one_partition # noqa: F401 from tiled._tests.adapters.test_sql import assert_same_rows +from tiled.adapters.array import ArrayAdapter +from tiled.adapters.awkward import AwkwardAdapter +from tiled.adapters.ragged import RaggedAdapter from tiled.adapters.sql import SQLAdapter from tiled.storage import SQLStorage, parse_storage, register_storage from tiled.structures.core import StructureFamily @@ -17,7 +21,16 @@ rng = np.random.default_rng(42) -names = ["i0", "i1", "i2", "i3", "f4", "f5"] +names_adapters: Dict[str, Type[Union[ArrayAdapter, AwkwardAdapter, RaggedAdapter]]] = { + "i0": ArrayAdapter, + "i1": ArrayAdapter, + "i2": ArrayAdapter, + "i3": ArrayAdapter, + "f4": ArrayAdapter, + "f5": ArrayAdapter, + "ragged": RaggedAdapter, +} +names = list(names_adapters.keys()) batch_size = 5 data0 = [ pa.array( @@ -34,6 +47,7 @@ ), pa.array([rng.random(size=14, dtype=np.float32) for _ in range(batch_size)]), pa.array([rng.random(size=15, dtype=np.float64) for _ in range(batch_size)]), + pa.array([rng.random(size=rng.integers(1, 10)) for _ in range(batch_size)]), ] batch_size = 8 data1 = [ @@ -51,6 +65,7 @@ ), pa.array([rng.random(size=14, dtype=np.float32) for _ in range(batch_size)]), pa.array([rng.random(size=15, dtype=np.float64) for _ in range(batch_size)]), + pa.array([rng.random(size=rng.integers(1, 10)) for _ in range(batch_size)]), ] batch_size = 3 data2 = [ @@ -68,6 +83,7 @@ ), pa.array([rng.random(size=14, dtype=np.float32) for _ in range(batch_size)]), pa.array([rng.random(size=15, dtype=np.float64) for _ in range(batch_size)]), + pa.array([rng.random(size=rng.integers(1, 10)) for _ in range(batch_size)]), ] batch0 = pa.record_batch(data0, names=names) @@ -90,7 +106,7 @@ def _data_source_from_init_storage( assets=[], ) - storage = cast(SQLStorage, parse_storage(data_uri)) + storage = cast("SQLStorage", parse_storage(data_uri)) register_storage(storage) return SQLAdapter.init_storage(data_source=data_source, storage=storage) @@ -240,17 +256,53 @@ def test_write_read_one_batch_many_part( # read a specific field result_read = adapter.read_partition(0, fields=[field]) field_index = names.index(field) - assert np.array_equal( + assert ak.array_equal( [*data0[field_index].tolist(), *data2[field_index].tolist()], result_read[field].tolist(), ) result_read = adapter.read_partition(1, fields=[field]) - assert np.array_equal( + assert ak.array_equal( [*data1[field_index].tolist(), *data0[field_index].tolist()], result_read[field].tolist(), ) result_read = adapter.read_partition(2, fields=[field]) - assert np.array_equal( + assert ak.array_equal( [*data2[field_index].tolist(), *data1[field_index].tolist()], result_read[field].tolist(), ) + + +@pytest.mark.parametrize( + "sql_adapter_name", + [("adapter_duckdb_many_partitions"), ("adapter_psql_many_partitions")], +) +@pytest.mark.parametrize(("field", "array_adapter_type"), [*names_adapters.items()]) +def test_compare_field_data_from_array_adapter( + sql_adapter_name: str, + field: str, + array_adapter_type: type, + request: pytest.FixtureRequest, +) -> None: + # get adapter from fixture + sql_adapter: SQLAdapter = request.getfixturevalue(sql_adapter_name) + + table = pa.Table.from_batches([batch0, batch1, batch2]) + sql_adapter.append_partition(table, 0) + + array_adapter = sql_adapter[field] + assert isinstance(array_adapter, array_adapter_type) + + field_index = names.index(field) + if isinstance(array_adapter, AwkwardAdapter): + result_read = array_adapter.read() # smoke test + raise NotImplementedError + else: + result_read = array_adapter.read() + assert ak.array_equal( + [ + *data0[field_index].tolist(), + *data1[field_index].tolist(), + *data2[field_index].tolist(), + ], + result_read.tolist(), # type: ignore[attr-defined] + ) From 4235a68f1de263a71a4542b687237f8d831787e2 Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Tue, 19 Aug 2025 14:19:12 -0600 Subject: [PATCH 05/20] remove normalize_chunks from ragged adapter --- tiled/structures/ragged.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/tiled/structures/ragged.py b/tiled/structures/ragged.py index a7ff32c89..c2f174815 100644 --- a/tiled/structures/ragged.py +++ b/tiled/structures/ragged.py @@ -12,22 +12,17 @@ class RaggedStructure(ArrayStructure): @classmethod def from_array(cls, array, shape=None, chunks=None, dims=None) -> "RaggedStructure": - from dask.array.core import normalize_chunks - - # TODO: test, or implement conversion from, AwkwardArrays - array = ragged.asarray(array) + array = ( + ragged.asarray(array.tolist()) + if hasattr(array, "tolist") + else ragged.array(list(array)) + ) if shape is None: shape = array.shape if chunks is None: chunks = ("auto",) * len(shape) - # TODO test chunking: I think this should default to the largest superset of "shapes" - normalized_chunks = normalize_chunks( - chunks, - shape=shape, - dtype=array.dtype, - ) if array.dtype.fields is not None: data_type = StructDtype.from_numpy_dtype(array.dtype) else: @@ -35,7 +30,7 @@ def from_array(cls, array, shape=None, chunks=None, dims=None) -> "RaggedStructu return cls( data_type=data_type, - chunks=normalized_chunks, + chunks=chunks, shape=shape, dims=dims, resizable=False, From fb4d71fc4f1614f94729ede33e77ac6b8655b4ff Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Wed, 20 Aug 2025 17:40:45 -0600 Subject: [PATCH 06/20] Add schema tests for irregular arrays --- tiled/_tests/adapters/test_sql_types.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tiled/_tests/adapters/test_sql_types.py b/tiled/_tests/adapters/test_sql_types.py index e9bc0923e..18e435147 100644 --- a/tiled/_tests/adapters/test_sql_types.py +++ b/tiled/_tests/adapters/test_sql_types.py @@ -206,6 +206,25 @@ def duckdb_uri(tmp_path: Path) -> Generator[str, None, None]: "duckdb": (["DECIMAL(5, 2) NULL"], pa.schema([("x", pa.decimal128(5, 2))])), }, ), + "ragged_lists": ( + pa.Table.from_arrays( + [ + pa.array([[1], [2, 3], [4, 5, 6]], pa.list_(pa.int32())), + pa.array([[1.1, 2.2, 3.3], [4.4, 5.5], [6.6]], pa.list_(pa.float32())), + ], + names=["x", "y"], + ), + { + "duckdb": ( + ["INTEGER[] NULL", "REAL[] NULL"], + pa.schema([("x", pa.list_(pa.int32())), ("y", pa.list_(pa.float32()))]), + ), + "postgresql": ( + ["INTEGER ARRAY NULL", "REAL ARRAY NULL"], + pa.schema([("x", pa.list_(pa.int32())), ("y", pa.list_(pa.float32()))]), + ), + }, + ), } From fc242e1a97efbc1a9b3b4cae472d52cc28b13baa Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Wed, 20 Aug 2025 17:42:29 -0600 Subject: [PATCH 07/20] No need to test every datatype, already done elsewhere --- tiled/_tests/adapters/test_sql_arrays.py | 58 ++++-------------------- 1 file changed, 9 insertions(+), 49 deletions(-) diff --git a/tiled/_tests/adapters/test_sql_arrays.py b/tiled/_tests/adapters/test_sql_arrays.py index 218c426f5..afd7b3181 100644 --- a/tiled/_tests/adapters/test_sql_arrays.py +++ b/tiled/_tests/adapters/test_sql_arrays.py @@ -22,67 +22,27 @@ rng = np.random.default_rng(42) names_adapters: Dict[str, Type[Union[ArrayAdapter, AwkwardAdapter, RaggedAdapter]]] = { - "i0": ArrayAdapter, - "i1": ArrayAdapter, - "i2": ArrayAdapter, - "i3": ArrayAdapter, - "f4": ArrayAdapter, - "f5": ArrayAdapter, - "ragged": RaggedAdapter, + "integers": ArrayAdapter, + "floats": ArrayAdapter, + "ragged_floats": RaggedAdapter, } names = list(names_adapters.keys()) batch_size = 5 data0 = [ - pa.array( - [rng.integers(-100, 100, size=10, dtype=np.int8) for _ in range(batch_size)] - ), - pa.array( - [rng.integers(-100, 100, size=11, dtype=np.int16) for _ in range(batch_size)] - ), - pa.array( - [rng.integers(-100, 100, size=12, dtype=np.int32) for _ in range(batch_size)] - ), - pa.array( - [rng.integers(-100, 100, size=13, dtype=np.int64) for _ in range(batch_size)] - ), - pa.array([rng.random(size=14, dtype=np.float32) for _ in range(batch_size)]), - pa.array([rng.random(size=15, dtype=np.float64) for _ in range(batch_size)]), + pa.array([rng.integers(-100, 100, size=10) for _ in range(batch_size)]), + pa.array([rng.random(size=15) for _ in range(batch_size)]), pa.array([rng.random(size=rng.integers(1, 10)) for _ in range(batch_size)]), ] batch_size = 8 data1 = [ - pa.array( - [rng.integers(-100, 100, size=10, dtype=np.int8) for _ in range(batch_size)] - ), - pa.array( - [rng.integers(-100, 100, size=11, dtype=np.int16) for _ in range(batch_size)] - ), - pa.array( - [rng.integers(-100, 100, size=12, dtype=np.int32) for _ in range(batch_size)] - ), - pa.array( - [rng.integers(-100, 100, size=13, dtype=np.int64) for _ in range(batch_size)] - ), - pa.array([rng.random(size=14, dtype=np.float32) for _ in range(batch_size)]), - pa.array([rng.random(size=15, dtype=np.float64) for _ in range(batch_size)]), + pa.array([rng.integers(-100, 100, size=10) for _ in range(batch_size)]), + pa.array([rng.random(size=15) for _ in range(batch_size)]), pa.array([rng.random(size=rng.integers(1, 10)) for _ in range(batch_size)]), ] batch_size = 3 data2 = [ - pa.array( - [rng.integers(-100, 100, size=10, dtype=np.int8) for _ in range(batch_size)] - ), - pa.array( - [rng.integers(-100, 100, size=11, dtype=np.int16) for _ in range(batch_size)] - ), - pa.array( - [rng.integers(-100, 100, size=12, dtype=np.int32) for _ in range(batch_size)] - ), - pa.array( - [rng.integers(-100, 100, size=13, dtype=np.int64) for _ in range(batch_size)] - ), - pa.array([rng.random(size=14, dtype=np.float32) for _ in range(batch_size)]), - pa.array([rng.random(size=15, dtype=np.float64) for _ in range(batch_size)]), + pa.array([rng.integers(-100, 100, size=10) for _ in range(batch_size)]), + pa.array([rng.random(size=15) for _ in range(batch_size)]), pa.array([rng.random(size=rng.integers(1, 10)) for _ in range(batch_size)]), ] From 548b0f312c91a61e7e9eed3245414d67bed4eb7d Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Fri, 22 Aug 2025 17:19:24 -0600 Subject: [PATCH 08/20] write + read full ragged arrays --- tiled/_tests/test_ragged.py | 123 ++++++++++++++++++++++++++++ tiled/adapters/protocols.py | 26 +++++- tiled/adapters/ragged.py | 24 ++++-- tiled/adapters/sql.py | 6 +- tiled/catalog/adapter.py | 6 ++ tiled/client/container.py | 64 +++++++++++++++ tiled/client/ragged.py | 78 ++++++++++++++++++ tiled/serialization/__init__.py | 4 + tiled/serialization/ragged.py | 51 ++++++++++++ tiled/server/core.py | 1 + tiled/server/links.py | 11 ++- tiled/server/router.py | 138 +++++++++++++++++++++++++++++++- tiled/server/schemas.py | 10 +++ tiled/structures/ragged.py | 39 +++++++-- 14 files changed, 559 insertions(+), 22 deletions(-) create mode 100644 tiled/_tests/test_ragged.py create mode 100644 tiled/client/ragged.py create mode 100644 tiled/serialization/ragged.py diff --git a/tiled/_tests/test_ragged.py b/tiled/_tests/test_ragged.py new file mode 100644 index 000000000..60e097754 --- /dev/null +++ b/tiled/_tests/test_ragged.py @@ -0,0 +1,123 @@ +import awkward as ak +import numpy as np +import pyarrow.feather +import pyarrow.parquet +import pytest +import ragged + +from tiled.catalog import in_memory +from tiled.client import Context, from_context, record_history +from tiled.server.app import build_app +from tiled.utils import APACHE_ARROW_FILE_MIME_TYPE + + +@pytest.fixture +def catalog(tmpdir): + catalog = in_memory(writable_storage=str(tmpdir)) + yield catalog + + +@pytest.fixture +def app(catalog): + app = build_app(catalog) + yield app + + +@pytest.fixture +def context(app): + with Context.from_app(app) as context: + yield context + + +@pytest.fixture +def client(context): + client = from_context(context) + yield client + + +RNG = np.random.default_rng(42) + + +def test_slicing(client): + # Write data into catalog. + array = ragged.array( + [ + list(RNG.random(10).tolist()), + list(RNG.random(3).tolist()), + list(RNG.random(8).tolist()), + ] + ) + returned = client.write_ragged(array, key="test") + # Test with client returned, and with client from lookup. + for rac in [returned, client["test"]]: + # Read the data back out from the RaggedClient, progressively sliced. + result = rac.read() + # ragged does not have an array_equal(a, b) equivalent. Use awkward. + assert ak.array_equal(result._impl, array._impl) + + # When sliced, the server sends less data. + with record_history() as h: + full_result = rac[:] + assert ak.array_equal(full_result._impl, array._impl) # noqa: SLF001 + assert len(h.responses) == 1 # sanity check + full_response_size = len(h.responses[0].content) + with record_history() as h: + sliced_result = rac[1] + assert ak.array_equal(sliced_result._impl, array[1]._impl) # noqa: SLF001 + assert len(h.responses) == 1 # sanity check + sliced_response_size = len(h.responses[0].content) + assert sliced_response_size < full_response_size + + +def test_export_json(client, buffer): + array = ragged.array( + [ + RNG.random(10).tolist(), + RNG.random(3).tolist(), + RNG.random(8).tolist(), + ] + ) + rac = client.write_ragged(array, key="test") + + file = buffer + rac.export(file, format="application/json") + actual = bytes(file.getbuffer()).decode() + assert actual == ak.to_json(array._impl) # noqa: SLF001 + + +def test_export_arrow(tmpdir, client): + # Write data into catalog. It will be stored as directory of buffers + # named like 'node0-offsets' and 'node2-data'. + array = ragged.array( + [ + RNG.random(10).tolist(), + RNG.random(3).tolist(), + RNG.random(8).tolist(), + ] + ) + rac = client.write_ragged(array, key="test") + + filepath = tmpdir / "actual.arrow" + rac.export(str(filepath), format=APACHE_ARROW_FILE_MIME_TYPE) + actual = pyarrow.feather.read_table(filepath) + expected = ak.to_arrow_table(array._impl) # noqa: SLF001 + assert actual == expected + + +def test_export_parquet(tmpdir, client): + # Write data into catalog. It will be stored as directory of buffers + # named like 'node0-offsets' and 'node2-data'. + array = ragged.array( + [ + RNG.random(10).tolist(), + RNG.random(3).tolist(), + RNG.random(8).tolist(), + ] + ) + rac = client.write_ragged(array, key="test") + + filepath = tmpdir / "actual.parquet" + rac.export(str(filepath), format="application/x-parquet") + actual = pyarrow.parquet.read_table(filepath) + expected = ak.to_arrow_table(array._impl) # noqa: SLF001 + assert actual == expected diff --git a/tiled/adapters/protocols.py b/tiled/adapters/protocols.py index 2fbd522a1..83301ba8a 100644 --- a/tiled/adapters/protocols.py +++ b/tiled/adapters/protocols.py @@ -4,9 +4,12 @@ import dask.dataframe import pandas +import ragged import sparse from numpy.typing import NDArray +from tiled.structures.ragged import RaggedStructure + from ..ndslice import NDSlice from ..server.schemas import Principal from ..storage import Storage @@ -83,6 +86,22 @@ def write(self, container: DirectoryContainer) -> None: pass +class RaggedAdapter(BaseAdapter, Protocol): + structure_family: Literal[StructureFamily.ragged] + + @abstractmethod + def structure(self) -> RaggedStructure: + pass + + @abstractmethod + def read(self, slice: NDSlice) -> ragged.array: + pass + + @abstractmethod + def read_block(self, block: Tuple[int, ...]) -> ragged.array: + pass + + class SparseAdapter(BaseAdapter, Protocol): structure_family: Literal[StructureFamily.sparse] = StructureFamily.sparse @@ -128,7 +147,12 @@ def __getitem__(self, key: str) -> ArrayAdapter: AnyAdapter = Union[ - ArrayAdapter, AwkwardAdapter, ContainerAdapter, SparseAdapter, TableAdapter + ArrayAdapter, + AwkwardAdapter, + ContainerAdapter, + RaggedAdapter, + SparseAdapter, + TableAdapter, ] diff --git a/tiled/adapters/ragged.py b/tiled/adapters/ragged.py index f501f1da8..d6c606ecc 100644 --- a/tiled/adapters/ragged.py +++ b/tiled/adapters/ragged.py @@ -1,10 +1,12 @@ -from typing import Any, List, Optional, Set +from collections.abc import Iterable +from typing import Any, ClassVar, List, Optional, Set, Union +import awkward +import numpy as np import ragged from numpy.typing import NDArray -from tiled.ndslice import NDSlice -from tiled.storage import Storage +from tiled.storage import FileStorage, Storage from tiled.structures.core import Spec, StructureFamily from tiled.structures.ragged import RaggedStructure from tiled.type_aliases import JSON @@ -12,7 +14,7 @@ class RaggedAdapter: structure_family = StructureFamily.ragged - supported_storage: Set[type[Storage]] = set() + supported_storage: ClassVar[Set[type[Storage]]] = {FileStorage} def __init__( self, @@ -38,7 +40,9 @@ def __init__( @classmethod def from_array( cls, - array: NDArray[Any], + array: Union[ + ragged.array, awkward.Array, NDArray[Any], Iterable[Iterable[Any]] + ], metadata: Optional[JSON] = None, specs: Optional[List[Spec]] = None, ) -> "RaggedAdapter": @@ -54,6 +58,12 @@ def from_array( ------- """ + array = ( + ragged.array(list(array)) + if isinstance(array, np.ndarray) + else ragged.asarray(array) + ) + structure = RaggedStructure.from_array(array) return cls( array, @@ -68,5 +78,5 @@ def metadata(self) -> JSON: def __repr__(self) -> str: return f"{type(self).__name__}({self._structure})" - def read(self, slice: NDSlice = NDSlice(...)) -> ragged.array: - return self._array[tuple(slice)] if slice else self._array + def structure(self) -> RaggedStructure: + return self._structure diff --git a/tiled/adapters/sql.py b/tiled/adapters/sql.py index 03aeb34dc..4208b9290 100644 --- a/tiled/adapters/sql.py +++ b/tiled/adapters/sql.py @@ -248,12 +248,12 @@ def __getitem__( """ # Must compute to determine shape. - array = self.read([key])[key].to_numpy() - if array.dtype != "object": + array = self.read([key])[key].infer_objects().to_numpy() + if array.dtype.name != "object": return ArrayAdapter.from_array(array) if ( - array.dtype == "object" + array.dtype.name == "object" and len(array) and isinstance(array[0], numpy.ndarray) ): diff --git a/tiled/catalog/adapter.py b/tiled/catalog/adapter.py index ed7ff07af..d9e68a0b2 100644 --- a/tiled/catalog/adapter.py +++ b/tiled/catalog/adapter.py @@ -106,6 +106,7 @@ StructureFamily.array: ZARR_MIMETYPE, StructureFamily.awkward: AWKWARD_BUFFERS_MIMETYPE, StructureFamily.table: PARQUET_MIMETYPE, + StructureFamily.ragged: AWKWARD_BUFFERS_MIMETYPE, StructureFamily.sparse: SPARSE_BLOCKS_PARQUET_MIMETYPE, } STORAGE_ADAPTERS_BY_MIMETYPE = OneShotCachedMap( @@ -1133,6 +1134,10 @@ async def write(self, *args, **kwargs): return await ensure_awaitable((await self.get_adapter()).write, *args, **kwargs) +class CatalogRaggedAdapter(CatalogArrayAdapter): + pass + + class CatalogSparseAdapter(CatalogArrayAdapter): pass @@ -1653,6 +1658,7 @@ def node_from_segments(segments, root_id=0): StructureFamily.awkward: CatalogAwkwardAdapter, StructureFamily.composite: CatalogCompositeAdapter, StructureFamily.container: CatalogContainerAdapter, + StructureFamily.ragged: CatalogRaggedAdapter, StructureFamily.sparse: CatalogSparseAdapter, StructureFamily.table: CatalogTableAdapter, } diff --git a/tiled/client/container.py b/tiled/client/container.py index 96151aa7b..e35cb13b8 100644 --- a/tiled/client/container.py +++ b/tiled/client/container.py @@ -1011,6 +1011,69 @@ def write_awkward( client.write(container) return client + def write_ragged( + self, + array, + *, + key=None, + metadata=None, + dims=None, + specs=None, + access_tags=None, + ): + import awkward + import ragged + + from tiled.structures.ragged import BuiltinDtype, RaggedStructure + + if not (hasattr(array, "shape") and hasattr(array, "dtype")): + # This does not implement enough of the array-like interface. + # Coerce to numpy-like ragged array. + array = ragged.asarray(array) + + # TODO + from dask.array.core import normalize_chunks + + if hasattr(array, "chunks"): + chunks = normalize_chunks( + array.chunks, + limit=self._SUGGESTED_MAX_UPLOAD_SIZE, + dtype=array.dtype, + shape=None, + ) + else: + chunks = normalize_chunks( + tuple("auto" for _ in array.shape), + limit=self._SUGGESTED_MAX_UPLOAD_SIZE, + dtype=array.dtype, + shape=tuple(d if d is not None else array.size for d in array.shape), + ) + + form, length, container = awkward.to_buffers(array._impl) # noqa: SLF001 + + structure = RaggedStructure( + shape=array.shape, + chunks=chunks, + dims=dims, + data_type=BuiltinDtype.from_numpy_dtype(array.dtype), + form=form.to_dict(), + length=length, + ) + client = self.new( + StructureFamily.ragged, + [ + DataSource( + structure=structure, structure_family=StructureFamily.ragged + ), + ], + key=key, + metadata=metadata, + specs=specs, + access_tags=access_tags, + ) + client.write(array) + return client + def write_sparse( self, coords, @@ -1296,6 +1359,7 @@ def _write_partition(x, partition_info, client): "dataframe": _LazyLoad( ("..dataframe", Container.__module__), "DataFrameClient" ), + "ragged": _LazyLoad(("..ragged", Container.__module__), "RaggedClient"), "sparse": _LazyLoad(("..sparse", Container.__module__), "SparseClient"), "table": _LazyLoad( ("..dataframe", Container.__module__), "DataFrameClient" diff --git a/tiled/client/ragged.py b/tiled/client/ragged.py new file mode 100644 index 000000000..e7b8c939d --- /dev/null +++ b/tiled/client/ragged.py @@ -0,0 +1,78 @@ +from typing import TYPE_CHECKING, Any, Dict, Union, cast +from urllib.parse import parse_qs, urlparse + +import awkward as ak +import ragged + +from tiled.client.base import BaseClient +from tiled.client.utils import export_util, handle_error, retry_context +from tiled.ndslice import NDSlice +from tiled.serialization.awkward import from_zipped_buffers, to_zipped_buffers +from tiled.structures.awkward import project_form + +if TYPE_CHECKING: + from tiled.structures.ragged import RaggedStructure + + +class RaggedClient(BaseClient): + def write(self, array: Union[ragged.array, ak.Array]): + structure = cast("RaggedStructure", self.structure()) + if isinstance(array, ragged.array): + packed = ak.to_packed(array._impl) # noqa: SLF001 + _, _, container = ak.to_buffers(packed) + components = (structure.form, structure.length, container) + for attempt in retry_context(): + with attempt: + handle_error( + self.context.http_client.put( + self.item["links"]["full"], + content=bytes( + to_zipped_buffers("application/zip", components, {}) + ), + headers={"Content-Type": "application/zip"}, + ) + ) + + def read(self, slice: NDSlice = ...): + structure = cast("RaggedStructure", self.structure()) + form = ak.forms.from_dict(structure.form) + typetracer, report = ak.typetracer.typetracer_with_report(form) + proxy_array = ak.Array(typetracer) + ak.typetracer.touch_data(proxy_array[slice]) + form_keys_touched = set(report.data_touched) + projected_form = project_form(form, form_keys_touched) + url_path = self.item["links"]["full"] + url_params: Dict[str, Any] = {**parse_qs(urlparse(url_path).query)} + if isinstance(slice, NDSlice): + url_params["slice"] = slice.to_numpy_str() + for attempt in retry_context(): + with attempt: + content = handle_error( + self.context.http_client.get( + url_path, + headers={"Accept": "application/zip"}, + params=url_params, + ) + ).read() + container = from_zipped_buffers(content, projected_form, structure.length) + projected_array = ragged.array( + ak.from_buffers( + projected_form, + structure.length, + container, + allow_noncanonical_form=True, + ) + ) + return projected_array[slice] + + def __getitem__(self, slice): + return self.read(slice=slice) + + def export(self, filepath, *, format=None): + return export_util( + filepath, + format, + self.context.http_client.get, + self.item["links"]["full"], + params={}, + ) diff --git a/tiled/serialization/__init__.py b/tiled/serialization/__init__.py index 368e09a14..19a9b17a7 100644 --- a/tiled/serialization/__init__.py +++ b/tiled/serialization/__init__.py @@ -22,6 +22,10 @@ def register_builtin_serializers(): from ..serialization import table as _table # noqa: F401 del _table + if modules_available("ragged"): + from ..serialization import ragged as _ragged # noqa: F401 + + del _ragged if modules_available("sparse"): from ..serialization import sparse as _sparse # noqa: F401 diff --git a/tiled/serialization/ragged.py b/tiled/serialization/ragged.py new file mode 100644 index 000000000..defe711ed --- /dev/null +++ b/tiled/serialization/ragged.py @@ -0,0 +1,51 @@ +from typing import Union + +import awkward +import orjson +import ragged + +from tiled.media_type_registration import ( + default_deserialization_registry, + default_serialization_registry, +) +from tiled.mimetypes import APACHE_ARROW_FILE_MIME_TYPE, PARQUET_MIMETYPE +from tiled.serialization import awkward as awkward_serialization +from tiled.structures.core import StructureFamily +from tiled.utils import modules_available, safe_json_dump + + +@default_serialization_registry.register(StructureFamily.ragged, "application/json") +def to_json(mimetype: str, array: ragged.array, metadata: dict): # noqa: ARG001 + return safe_json_dump(array.tolist()) + + +@default_deserialization_registry.register(StructureFamily.ragged, "application/json") +def from_json(contents: Union[str, bytes]): + return ragged.array(orjson.loads(contents)) + + +@default_serialization_registry.register(StructureFamily.ragged, "application/zip") +def to_zipped_buffers(mimetype: str, array: ragged.array, metadata: dict): + components = awkward.to_buffers(array._impl) # noqa: SLF001 + return awkward_serialization.to_zipped_buffers(mimetype, components, metadata) + + +@default_deserialization_registry.register(StructureFamily.ragged, "application/zip") +def from_zipped_buffers(buffer: bytes, form: dict, length: int): + # this should return the container dict immediately, to be used by `AwkwardBuffersAdapter`. + return awkward_serialization.from_zipped_buffers(buffer, form, length) + + +if modules_available("pyarrow"): + + @default_serialization_registry.register( + StructureFamily.ragged, APACHE_ARROW_FILE_MIME_TYPE + ) + def to_arrow(mimetype: str, array: ragged.array, metadata: dict): + components = awkward.to_buffers(array._impl) # noqa: SLF001 + return awkward_serialization.to_arrow(mimetype, components, metadata) + + @default_serialization_registry.register(StructureFamily.ragged, PARQUET_MIMETYPE) + def to_parquet(mimetype: str, array: ragged.array, metadata: dict): + components = awkward.to_buffers(array._impl) # noqa: SLF001 + return awkward_serialization.to_parquet(mimetype, components, metadata) diff --git a/tiled/server/core.py b/tiled/server/core.py index dc36ecd3c..4eaaa9075 100644 --- a/tiled/server/core.py +++ b/tiled/server/core.py @@ -261,6 +261,7 @@ async def construct_entries_response( DEFAULT_MEDIA_TYPES = { StructureFamily.array: {"*/*": "application/octet-stream", "image/*": "image/png"}, StructureFamily.awkward: {"*/*": "application/zip"}, + StructureFamily.ragged: {"*/*": "application/zip"}, StructureFamily.table: {"*/*": APACHE_ARROW_FILE_MIME_TYPE}, StructureFamily.container: {"*/*": "application/x-hdf5"}, StructureFamily.composite: {"*/*": "application/x-hdf5"}, diff --git a/tiled/server/links.py b/tiled/server/links.py index 2120c3741..9389f97ef 100644 --- a/tiled/server/links.py +++ b/tiled/server/links.py @@ -37,6 +37,14 @@ def links_for_container(structure_family, structure, base_url, path_str): return links +def links_for_ragged(structure_family, structure, base_url, path_str): + links = {} + block_template = ",".join(f"{{{index}}}" for index in range(len(structure.shape))) + links["full"] = f"{base_url}/ragged/full/{path_str}" + links["block"] = f"{base_url}/ragged/block/{path_str}?block={block_template}" + return links + + def links_for_table(structure_family, structure, base_url, path_str): links = {} links["partition"] = f"{base_url}/table/partition/{path_str}?partition={{index}}" @@ -49,6 +57,7 @@ def links_for_table(structure_family, structure, base_url, path_str): StructureFamily.awkward: links_for_awkward, StructureFamily.container: links_for_container, StructureFamily.composite: links_for_container, - StructureFamily.sparse: links_for_array, # spare and array are the same + StructureFamily.ragged: links_for_ragged, + StructureFamily.sparse: links_for_array, # sparse and array are the same StructureFamily.table: links_for_table, } diff --git a/tiled/server/router.py b/tiled/server/router.py index 0ef7314fb..9066f4d73 100644 --- a/tiled/server/router.py +++ b/tiled/server/router.py @@ -7,7 +7,7 @@ from datetime import datetime, timedelta, timezone from functools import partial from pathlib import Path -from typing import Callable, List, Optional, TypeVar, Union +from typing import Callable, List, Optional, TypeVar, Union, cast import anyio import packaging @@ -34,6 +34,7 @@ from tiled.schemas import About from tiled.server.protocols import ExternalAuthenticator, InternalAuthenticator from tiled.server.schemas import Principal +from tiled.structures.ragged import RaggedStructure from .. import __version__ from ..ndslice import NDSlice @@ -494,7 +495,7 @@ async def array_block( root_tree, session_state, request.state.metrics, - {StructureFamily.array, StructureFamily.sparse}, + {StructureFamily.array, StructureFamily.ragged, StructureFamily.sparse}, getattr(request.app.state, "access_policy", None), ) shape = entry.structure().shape @@ -583,7 +584,7 @@ async def array_full( root_tree, session_state, request.state.metrics, - {StructureFamily.array, StructureFamily.sparse}, + {StructureFamily.array, StructureFamily.ragged, StructureFamily.sparse}, getattr(request.app.state, "access_policy", None), ) structure_family = entry.structure_family @@ -629,6 +630,129 @@ async def array_full( except UnsupportedMediaTypes as err: raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail=err.args[0]) + @router.get( + "/ragged/full/{path:path}", response_model=schemas.Response, name="ragged full" + ) + async def ragged_full( + request: Request, + path: str, + slice=Depends(NDSlice.from_query), + expected_shape=Depends(expected_shape), + format: Optional[str] = None, + filename: Optional[str] = None, + settings: Settings = Depends(get_settings), + principal: Union[Principal, SpecialUsers] = Depends(get_current_principal), + root_tree=Depends(get_root_tree), + session_state: dict = Depends(get_session_state), + authn_scopes: Scopes = Depends(get_current_scopes), + _=Security(check_scopes, scopes=["read:data"]), + ): + entry = await get_entry( + path, + ["read:data"], + principal, + authn_scopes, + root_tree, + session_state, + request.state.metrics, + {StructureFamily.ragged}, + getattr(request.app.state, "access_policy", None), + ) + structure_family = entry.structure_family + + import awkward + import ragged + + with record_timing(request.state.metrics, "read"): + container = await ensure_awaitable( + entry.read # this is AwkwardAdapter.read() + ) + structure = cast("RaggedStructure", entry.structure()) + components = (structure.form, structure.length, container) + awk_array = awkward.from_buffers(*components) + sliced_ragged_array = ragged.array(awk_array[slice], copy=True)[slice] + if sliced_ragged_array._impl.nbytes > settings.response_bytesize_limit: + raise HTTPException( + status_code=HTTP_400_BAD_REQUEST, + detail=( + f"Response would exceed {settings.response_bytesize_limit}. " + "Use slicing ('?slice=...') to request smaller chunks." + ), + ) + try: + with record_timing(request.state.metrics, "pack"): + return await construct_data_response( + structure_family, + serialization_registry, + sliced_ragged_array, + entry.metadata(), + request, + format, + specs=getattr(entry, "specs", []), + expires=getattr(entry, "content_stale_at", None), + filename=filename, + ) + except UnsupportedMediaTypes as err: + raise HTTPException( + status_code=HTTP_406_NOT_ACCEPTABLE, detail=err.args[0] + ) from err + + @router.get( + "/ragged/block/{path:path}", + response_model=schemas.Response, + name="ragged block", + ) + async def ragged_block( + request: Request, + path: str, + block=Depends(block), + slice=Depends(NDSlice.from_query), + expected_shape=Depends(expected_shape), + format: Optional[str] = None, + filename: Optional[str] = None, + settings: Settings = Depends(get_settings), + principal: Union[Principal, SpecialUsers] = Depends(get_current_principal), + root_tree=Depends(get_root_tree), + session_state: dict = Depends(get_session_state), + authn_scopes: Scopes = Depends(get_current_scopes), + _=Security(check_scopes, scopes=["read:data"]), + ): + return await array_block( + request, + path, + block, + slice, + expected_shape, + format, + filename, + settings, + principal, + root_tree, + session_state, + authn_scopes, + _, + ) + + @router.put("/ragged/full/{path:path}") + async def put_ragged_full( + request: Request, + path: str, + principal: Union[Principal, SpecialUsers] = Depends(get_current_principal), + root_tree=Depends(get_root_tree), + session_state: dict = Depends(get_session_state), + authn_scopes: Scopes = Depends(get_current_scopes), + _=Security(check_scopes, scopes=["write:data"]), + ): + return await put_array_full( + request, + path, + principal, + root_tree, + session_state, + authn_scopes, + _, + ) + @router.get( "/table/partition/{path:path}", response_model=schemas.Response, @@ -1564,7 +1688,7 @@ async def put_array_full( root_tree, session_state, request.state.metrics, - {StructureFamily.array, StructureFamily.sparse}, + {StructureFamily.array, StructureFamily.ragged, StructureFamily.sparse}, getattr(request.app.state, "access_policy", None), ) body = await request.body() @@ -1582,6 +1706,12 @@ async def put_array_full( elif entry.structure_family == "sparse": deserializer = deserialization_registry.dispatch("sparse", media_type) data = await ensure_awaitable(deserializer, body) + elif entry.structure_family == "ragged": + structure = cast("RaggedStructure", entry.structure()) + deserializer = deserialization_registry.dispatch("ragged", media_type) + data = await ensure_awaitable( + deserializer, body, structure.form, structure.length + ) else: raise NotImplementedError(entry.structure_family) await ensure_awaitable(entry.write, data) diff --git a/tiled/server/schemas.py b/tiled/server/schemas.py index 0c4c82f3c..469bd3373 100644 --- a/tiled/server/schemas.py +++ b/tiled/server/schemas.py @@ -10,6 +10,8 @@ from pydantic_core import PydanticCustomError from typing_extensions import Annotated, TypedDict +from tiled.structures.ragged import RaggedStructure + from ..structures.array import ArrayStructure from ..structures.awkward import AwkwardStructure from ..structures.core import STRUCTURE_TYPES, StructureFamily @@ -181,6 +183,7 @@ class NodeAttributes(pydantic.BaseModel): Union[ ArrayStructure, AwkwardStructure, + RaggedStructure, SparseStructure, NodeStructure, TableStructure, @@ -227,6 +230,12 @@ class DataFrameLinks(pydantic.BaseModel): partition: str +class RaggedLinks(pydantic.BaseModel): + self: str + full: str + block: str + + class SparseLinks(pydantic.BaseModel): self: str full: str @@ -238,6 +247,7 @@ class SparseLinks(pydantic.BaseModel): StructureFamily.awkward: AwkwardLinks, StructureFamily.composite: ContainerLinks, StructureFamily.container: ContainerLinks, + StructureFamily.ragged: RaggedLinks, StructureFamily.sparse: SparseLinks, StructureFamily.table: DataFrameLinks, } diff --git a/tiled/structures/ragged.py b/tiled/structures/ragged.py index c2f174815..2c3a4fd38 100644 --- a/tiled/structures/ragged.py +++ b/tiled/structures/ragged.py @@ -6,17 +6,20 @@ from tiled.structures.array import ArrayStructure, BuiltinDtype, StructDtype -@dataclass() +@dataclass(kw_only=True) class RaggedStructure(ArrayStructure): shape: Tuple[Union[int, None], ...] # type: ignore[reportIncompatibleVariableOverride] + length: int + form: dict @classmethod def from_array(cls, array, shape=None, chunks=None, dims=None) -> "RaggedStructure": - array = ( - ragged.asarray(array.tolist()) - if hasattr(array, "tolist") - else ragged.array(list(array)) - ) + if not isinstance(array, ragged.array): + array = ( + ragged.asarray(array.tolist()) + if hasattr(array, "tolist") + else ragged.array(list(array)) + ) if shape is None: shape = array.shape @@ -28,10 +31,34 @@ def from_array(cls, array, shape=None, chunks=None, dims=None) -> "RaggedStructu else: data_type = BuiltinDtype.from_numpy_dtype(array.dtype) + length = array._impl.layout.length + form = array._impl.layout.form.to_dict() + return cls( data_type=data_type, chunks=chunks, shape=shape, dims=dims, resizable=False, + length=length, + form=form, + ) + + @classmethod + def from_json(cls, structure): + if "fields" in structure["data_type"]: + data_type = StructDtype.from_json(structure["data_type"]) + else: + data_type = BuiltinDtype.from_json(structure["data_type"]) + dims = structure["dims"] + if dims is not None: + dims = tuple(dims) + return cls( + data_type=data_type, + chunks=tuple(map(tuple, structure["chunks"])), + shape=tuple(structure["shape"]), + dims=dims, + resizable=structure.get("resizable", False), + length=structure["length"], + form=dict(structure["form"]), ) From 18cfd67be1e54b629d44eefea3a34617cebe5d09 Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Mon, 25 Aug 2025 09:18:18 -0600 Subject: [PATCH 09/20] fix lack of `read()` --- tiled/adapters/ragged.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tiled/adapters/ragged.py b/tiled/adapters/ragged.py index d6c606ecc..54bd49f68 100644 --- a/tiled/adapters/ragged.py +++ b/tiled/adapters/ragged.py @@ -6,6 +6,7 @@ import ragged from numpy.typing import NDArray +from tiled.ndslice import NDSlice from tiled.storage import FileStorage, Storage from tiled.structures.core import Spec, StructureFamily from tiled.structures.ragged import RaggedStructure @@ -72,6 +73,23 @@ def from_array( specs=specs, ) + def read( + self, + slice: NDSlice = NDSlice(...), + ) -> ragged.array: + """ + + Parameters + ---------- + slice : + + Returns + ------- + + """ + # _array[...] requires an actual tuple, not just a subclass of tuple + return self._array[tuple(slice)] if slice else self._array + def metadata(self) -> JSON: return self._metadata From bebdc680be0037e1974c1c5803edc4c439b34a54 Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Mon, 25 Aug 2025 09:36:24 -0600 Subject: [PATCH 10/20] add more complexity to tests --- tiled/_tests/test_ragged.py | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/tiled/_tests/test_ragged.py b/tiled/_tests/test_ragged.py index 60e097754..8eda512b2 100644 --- a/tiled/_tests/test_ragged.py +++ b/tiled/_tests/test_ragged.py @@ -42,9 +42,10 @@ def test_slicing(client): # Write data into catalog. array = ragged.array( [ - list(RNG.random(10).tolist()), - list(RNG.random(3).tolist()), - list(RNG.random(8).tolist()), + [RNG.random(10).tolist()], + [RNG.random(8).tolist(), []], + [RNG.random(5).tolist(), RNG.random(2).tolist()], + [[], RNG.random(7).tolist()], ] ) returned = client.write_ragged(array, key="test") @@ -53,7 +54,7 @@ def test_slicing(client): # Read the data back out from the RaggedClient, progressively sliced. result = rac.read() # ragged does not have an array_equal(a, b) equivalent. Use awkward. - assert ak.array_equal(result._impl, array._impl) + assert ak.array_equal(result._impl, array._impl) # noqa: SLF001 # When sliced, the server sends less data. with record_history() as h: @@ -72,9 +73,10 @@ def test_slicing(client): def test_export_json(client, buffer): array = ragged.array( [ - RNG.random(10).tolist(), - RNG.random(3).tolist(), - RNG.random(8).tolist(), + [RNG.random(10).tolist()], + [RNG.random(8).tolist(), []], + [RNG.random(5).tolist(), RNG.random(2).tolist()], + [[], RNG.random(7).tolist()], ] ) rac = client.write_ragged(array, key="test") @@ -90,9 +92,10 @@ def test_export_arrow(tmpdir, client): # named like 'node0-offsets' and 'node2-data'. array = ragged.array( [ - RNG.random(10).tolist(), - RNG.random(3).tolist(), - RNG.random(8).tolist(), + [RNG.random(10).tolist()], + [RNG.random(8).tolist(), []], + [RNG.random(5).tolist(), RNG.random(2).tolist()], + [[], RNG.random(7).tolist()], ] ) rac = client.write_ragged(array, key="test") @@ -109,9 +112,10 @@ def test_export_parquet(tmpdir, client): # named like 'node0-offsets' and 'node2-data'. array = ragged.array( [ - RNG.random(10).tolist(), - RNG.random(3).tolist(), - RNG.random(8).tolist(), + [RNG.random(10).tolist()], + [RNG.random(8).tolist(), []], + [RNG.random(5).tolist(), RNG.random(2).tolist()], + [[], RNG.random(7).tolist()], ] ) rac = client.write_ragged(array, key="test") From f3ba2ea7b99ad868c5f26108de164a57a6053d2c Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Wed, 27 Aug 2025 17:55:05 -0600 Subject: [PATCH 11/20] test simple to complex arrays --- tiled/_tests/test_ragged.py | 68 ++++++++++++++++++++----------------- 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/tiled/_tests/test_ragged.py b/tiled/_tests/test_ragged.py index 8eda512b2..f6d12b458 100644 --- a/tiled/_tests/test_ragged.py +++ b/tiled/_tests/test_ragged.py @@ -37,17 +37,39 @@ def client(context): RNG = np.random.default_rng(42) - -def test_slicing(client): - # Write data into catalog. - array = ragged.array( +arrays = { + "regular_1d": ragged.array(RNG.random(10)), + "regular_nd": ragged.array(RNG.random((2, 3, 4))), + "ragged_simple": ragged.array( + [ + RNG.random(3).tolist(), + RNG.random(5).tolist(), + RNG.random(8).tolist(), + ] + ), + "ragged_complex": ragged.array( [ [RNG.random(10).tolist()], [RNG.random(8).tolist(), []], [RNG.random(5).tolist(), RNG.random(2).tolist()], [[], RNG.random(7).tolist()], ] - ) + ), + "ragged_complex_nd": ragged.array( + [ + [RNG.random((4, 3)).tolist()], + [RNG.random((2, 8)).tolist(), []], + [RNG.random((5, 2)).tolist(), RNG.random((3, 3)).tolist()], + [[], RNG.random((7, 1)).tolist()], + ] + ), +} + + +@pytest.mark.parametrize("name", arrays.keys()) +def test_slicing(client, name): + # Write data into catalog. + array = arrays[name] returned = client.write_ragged(array, key="test") # Test with client returned, and with client from lookup. for rac in [returned, client["test"]]: @@ -70,15 +92,9 @@ def test_slicing(client): assert sliced_response_size < full_response_size -def test_export_json(client, buffer): - array = ragged.array( - [ - [RNG.random(10).tolist()], - [RNG.random(8).tolist(), []], - [RNG.random(5).tolist(), RNG.random(2).tolist()], - [[], RNG.random(7).tolist()], - ] - ) +@pytest.mark.parametrize("name", arrays.keys()) +def test_export_json(client, buffer, name): + array = arrays[name] rac = client.write_ragged(array, key="test") file = buffer @@ -87,17 +103,11 @@ def test_export_json(client, buffer): assert actual == ak.to_json(array._impl) # noqa: SLF001 -def test_export_arrow(tmpdir, client): +@pytest.mark.parametrize("name", arrays.keys()) +def test_export_arrow(tmpdir, client, name): # Write data into catalog. It will be stored as directory of buffers # named like 'node0-offsets' and 'node2-data'. - array = ragged.array( - [ - [RNG.random(10).tolist()], - [RNG.random(8).tolist(), []], - [RNG.random(5).tolist(), RNG.random(2).tolist()], - [[], RNG.random(7).tolist()], - ] - ) + array = arrays[name] rac = client.write_ragged(array, key="test") filepath = tmpdir / "actual.arrow" @@ -107,17 +117,11 @@ def test_export_arrow(tmpdir, client): assert actual == expected -def test_export_parquet(tmpdir, client): +@pytest.mark.parametrize("name", arrays.keys()) +def test_export_parquet(tmpdir, client, name): # Write data into catalog. It will be stored as directory of buffers # named like 'node0-offsets' and 'node2-data'. - array = ragged.array( - [ - [RNG.random(10).tolist()], - [RNG.random(8).tolist(), []], - [RNG.random(5).tolist(), RNG.random(2).tolist()], - [[], RNG.random(7).tolist()], - ] - ) + array = arrays[name] rac = client.write_ragged(array, key="test") filepath = tmpdir / "actual.parquet" From 596a10432201e7e0723b7921cdecde89c8da087c Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Wed, 12 Nov 2025 11:43:56 -0600 Subject: [PATCH 12/20] Update structure to store offsets --- tiled/adapters/ragged.py | 6 +++ tiled/client/container.py | 10 +---- tiled/client/ragged.py | 81 ++++++++++++++++++----------------- tiled/serialization/ragged.py | 47 ++++++++++++++++---- tiled/structures/ragged.py | 44 ++++++++++++++----- 5 files changed, 121 insertions(+), 67 deletions(-) diff --git a/tiled/adapters/ragged.py b/tiled/adapters/ragged.py index 54bd49f68..838fa56c5 100644 --- a/tiled/adapters/ragged.py +++ b/tiled/adapters/ragged.py @@ -90,6 +90,12 @@ def read( # _array[...] requires an actual tuple, not just a subclass of tuple return self._array[tuple(slice)] if slice else self._array + def write( + self, + array: ragged.array, + ) -> None: + raise Exception + def metadata(self) -> JSON: return self._metadata diff --git a/tiled/client/container.py b/tiled/client/container.py index 753cd388d..f694c98b6 100644 --- a/tiled/client/container.py +++ b/tiled/client/container.py @@ -1011,16 +1011,8 @@ def write_ragged( shape=tuple(d if d is not None else array.size for d in array.shape), ) - form, length, container = awkward.to_buffers(array._impl) # noqa: SLF001 + structure = RaggedStructure.from_array(array, chunks=chunks, dims=dims) - structure = RaggedStructure( - shape=array.shape, - chunks=chunks, - dims=dims, - data_type=BuiltinDtype.from_numpy_dtype(array.dtype), - form=form.to_dict(), - length=length, - ) client = self.new( StructureFamily.ragged, [ diff --git a/tiled/client/ragged.py b/tiled/client/ragged.py index e7b8c939d..fa3d80749 100644 --- a/tiled/client/ragged.py +++ b/tiled/client/ragged.py @@ -7,7 +7,7 @@ from tiled.client.base import BaseClient from tiled.client.utils import export_util, handle_error, retry_context from tiled.ndslice import NDSlice -from tiled.serialization.awkward import from_zipped_buffers, to_zipped_buffers +from tiled.serialization.ragged import to_flattened_octet_stream, to_json from tiled.structures.awkward import project_form if TYPE_CHECKING: @@ -16,56 +16,59 @@ class RaggedClient(BaseClient): def write(self, array: Union[ragged.array, ak.Array]): - structure = cast("RaggedStructure", self.structure()) - if isinstance(array, ragged.array): - packed = ak.to_packed(array._impl) # noqa: SLF001 - _, _, container = ak.to_buffers(packed) - components = (structure.form, structure.length, container) for attempt in retry_context(): with attempt: handle_error( self.context.http_client.put( self.item["links"]["full"], content=bytes( - to_zipped_buffers("application/zip", components, {}) + # to_zipped_buffers("application/zip", ragged.asarray(array), {}) + # to_json("application/json", ragged.asarray(array), {}) + to_flattened_octet_stream( + "application/octet-stream", ragged.asarray(array), {} + ) ), - headers={"Content-Type": "application/zip"}, + headers={"Content-Type": "application/octet-stream"}, ) ) - def read(self, slice: NDSlice = ...): + def read(self, slice: NDSlice = ...) -> ragged.array: structure = cast("RaggedStructure", self.structure()) - form = ak.forms.from_dict(structure.form) - typetracer, report = ak.typetracer.typetracer_with_report(form) - proxy_array = ak.Array(typetracer) - ak.typetracer.touch_data(proxy_array[slice]) - form_keys_touched = set(report.data_touched) - projected_form = project_form(form, form_keys_touched) - url_path = self.item["links"]["full"] - url_params: Dict[str, Any] = {**parse_qs(urlparse(url_path).query)} - if isinstance(slice, NDSlice): - url_params["slice"] = slice.to_numpy_str() - for attempt in retry_context(): - with attempt: - content = handle_error( - self.context.http_client.get( - url_path, - headers={"Accept": "application/zip"}, - params=url_params, - ) - ).read() - container = from_zipped_buffers(content, projected_form, structure.length) - projected_array = ragged.array( - ak.from_buffers( - projected_form, - structure.length, - container, - allow_noncanonical_form=True, - ) - ) - return projected_array[slice] - def __getitem__(self, slice): + # form = ak.forms.from_dict(structure.form) + # typetracer, report = ak.typetracer.typetracer_with_report(form) + # proxy_array = ak.Array(typetracer) + # ak.typetracer.touch_data(proxy_array[slice]) + # form_keys_touched = set(report.data_touched) + # projected_form = project_form(form, form_keys_touched) + # url_path = self.item["links"]["full"] + # url_params: Dict[str, Any] = {**parse_qs(urlparse(url_path).query)} + # if isinstance(slice, NDSlice): + # url_params["slice"] = slice.to_numpy_str() + # for attempt in retry_context(): + # with attempt: + # content = handle_error( + # self.context.http_client.get( + # url_path, + # headers={"Accept": "application/zip"}, + # params=url_params, + # ) + # ).read() + # container = from_zipped_buffers(content, projected_form, structure.length) + # projected_array = ragged.array( + # ak.from_buffers( + # projected_form, + # structure.length, + # container, + # allow_noncanonical_form=True, + # ) + # ) + # return projected_array[slice] + + def __getitem__( + self, slice: NDSlice + ) -> ragged.array: # this is true even when slicing to return a single item + # TODO: should we be smarter, and return the scalar rather a singular array return self.read(slice=slice) def export(self, filepath, *, format=None): diff --git a/tiled/serialization/ragged.py b/tiled/serialization/ragged.py index defe711ed..3116c2461 100644 --- a/tiled/serialization/ragged.py +++ b/tiled/serialization/ragged.py @@ -1,6 +1,7 @@ -from typing import Union +from typing import List, Union import awkward +import numpy as np import orjson import ragged @@ -24,16 +25,44 @@ def from_json(contents: Union[str, bytes]): return ragged.array(orjson.loads(contents)) -@default_serialization_registry.register(StructureFamily.ragged, "application/zip") -def to_zipped_buffers(mimetype: str, array: ragged.array, metadata: dict): - components = awkward.to_buffers(array._impl) # noqa: SLF001 - return awkward_serialization.to_zipped_buffers(mimetype, components, metadata) +@default_serialization_registry.register( + StructureFamily.ragged, "application/octet-stream" +) +def to_flattened_octet_stream(mimetype: str, array: ragged.array, metadata: dict): + content = array._impl.layout # noqa: SLF001 + while isinstance(content, awkward.contents.ListOffsetArray): + content = content.content + return np.asarray(awkward.to_numpy(content)).tobytes() + + +@default_deserialization_registry.register( + StructureFamily.ragged, "application/octet-stream" +) +def from_flattened_octet_stream(buffer, dtype: type, offsets: List[List[int]]): + # return np.frombuffer(buffer, dtype=dtype) + def rebuild(offsets: List[List[int]], data: np.ndarray) -> awkward.contents.Content: + if not offsets: + return awkward.contents.NumpyArray(data) + return awkward.contents.ListOffsetArray( + offsets=awkward.index.Index(offsets[0]), + content=rebuild(offsets[1:], data), + ) + + data = np.frombuffer(buffer, dtype=dtype) + return ragged.array(rebuild(offsets, data), dtype=dtype) + + +# @default_serialization_registry.register(StructureFamily.ragged, "application/zip") +# def to_zipped_buffers(mimetype: str, array: ragged.array, metadata: dict): +# packed = awkward.to_packed(array._impl) # noqa: SLF001 +# components = awkward.to_buffers(packed) +# return awkward_serialization.to_zipped_buffers(mimetype, components, metadata) -@default_deserialization_registry.register(StructureFamily.ragged, "application/zip") -def from_zipped_buffers(buffer: bytes, form: dict, length: int): - # this should return the container dict immediately, to be used by `AwkwardBuffersAdapter`. - return awkward_serialization.from_zipped_buffers(buffer, form, length) +# @default_deserialization_registry.register(StructureFamily.ragged, "application/zip") +# def from_zipped_buffers(buffer: bytes, form: dict, length: int): +# # this should return the container dict immediately, to be used by `AwkwardBuffersAdapter`. +# return awkward_serialization.from_zipped_buffers(buffer, form, length) if modules_available("pyarrow"): diff --git a/tiled/structures/ragged.py b/tiled/structures/ragged.py index 2c3a4fd38..2b927e0d7 100644 --- a/tiled/structures/ragged.py +++ b/tiled/structures/ragged.py @@ -1,6 +1,9 @@ +from collections.abc import Iterable from dataclasses import dataclass -from typing import Tuple, Union +from typing import Any, Dict, List, SupportsInt, Tuple, Union +import awkward +import numpy as np import ragged from tiled.structures.array import ArrayStructure, BuiltinDtype, StructDtype @@ -9,8 +12,7 @@ @dataclass(kw_only=True) class RaggedStructure(ArrayStructure): shape: Tuple[Union[int, None], ...] # type: ignore[reportIncompatibleVariableOverride] - length: int - form: dict + offsets: List[List[int]] @classmethod def from_array(cls, array, shape=None, chunks=None, dims=None) -> "RaggedStructure": @@ -31,8 +33,12 @@ def from_array(cls, array, shape=None, chunks=None, dims=None) -> "RaggedStructu else: data_type = BuiltinDtype.from_numpy_dtype(array.dtype) - length = array._impl.layout.length - form = array._impl.layout.form.to_dict() + content = array._impl.layout # noqa: SLF001 + offsets = [] + + while isinstance(content, awkward.contents.ListOffsetArray): + offsets.append(np.array(content.offsets).tolist()) + content = content.content return cls( data_type=data_type, @@ -40,12 +46,11 @@ def from_array(cls, array, shape=None, chunks=None, dims=None) -> "RaggedStructu shape=shape, dims=dims, resizable=False, - length=length, - form=form, + offsets=offsets, ) @classmethod - def from_json(cls, structure): + def from_json(cls, structure: dict) -> "RaggedStructure": if "fields" in structure["data_type"]: data_type = StructDtype.from_json(structure["data_type"]) else: @@ -59,6 +64,25 @@ def from_json(cls, structure): shape=tuple(structure["shape"]), dims=dims, resizable=structure.get("resizable", False), - length=structure["length"], - form=dict(structure["form"]), + offsets=structure.get("offsets", []), ) + + @property + def npartitions(self) -> int: + return 1 + + @property + def form(self) -> Dict[str, Any]: + def build(depth: int): + if depth: + return { + "class": "NumpyArray", + "primitive": self.data_type.to_numpy_dtype().name, + } + return { + "class": "ListOffsetArray", + "offsets": "i64", + "content": build(depth - 1), + } + + return build(len(self.offsets)) From b477814bac45e2c6d6b9d06e4938eeaa56115737 Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Wed, 17 Dec 2025 17:23:26 -0600 Subject: [PATCH 13/20] fix exit clause logic --- tiled/_tests/test_ragged.py | 2 ++ tiled/structures/ragged.py | 9 +++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/tiled/_tests/test_ragged.py b/tiled/_tests/test_ragged.py index f6d12b458..6f669fb93 100644 --- a/tiled/_tests/test_ragged.py +++ b/tiled/_tests/test_ragged.py @@ -38,6 +38,8 @@ def client(context): RNG = np.random.default_rng(42) arrays = { + # "empty_1": ragged.array([]), + # "empty_2": ragged.array([[], [], []]), "regular_1d": ragged.array(RNG.random(10)), "regular_nd": ragged.array(RNG.random((2, 3, 4))), "ragged_simple": ragged.array( diff --git a/tiled/structures/ragged.py b/tiled/structures/ragged.py index 2b927e0d7..4b756ea93 100644 --- a/tiled/structures/ragged.py +++ b/tiled/structures/ragged.py @@ -1,6 +1,6 @@ -from collections.abc import Iterable +from collections.abc import Mapping from dataclasses import dataclass -from typing import Any, Dict, List, SupportsInt, Tuple, Union +from typing import Any, Dict, List, Tuple, Union import awkward import numpy as np @@ -50,7 +50,7 @@ def from_array(cls, array, shape=None, chunks=None, dims=None) -> "RaggedStructu ) @classmethod - def from_json(cls, structure: dict) -> "RaggedStructure": + def from_json(cls, structure: Mapping[str, Any]) -> "RaggedStructure": if "fields" in structure["data_type"]: data_type = StructDtype.from_json(structure["data_type"]) else: @@ -74,7 +74,8 @@ def npartitions(self) -> int: @property def form(self) -> Dict[str, Any]: def build(depth: int): - if depth: + if depth == 1: + # TODO: Handle EmptyArray, e.g. ragged.array([[], []]) return { "class": "NumpyArray", "primitive": self.data_type.to_numpy_dtype().name, From 7ddbe04e33eb9361e681eff2406c4d1fb8e17acc Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Mon, 22 Dec 2025 14:24:34 -0600 Subject: [PATCH 14/20] fix parameter order --- tiled/_tests/adapters/test_sql.py | 2 +- tiled/_tests/adapters/test_sql_arrays.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tiled/_tests/adapters/test_sql.py b/tiled/_tests/adapters/test_sql.py index dde45631f..91b98c9cf 100644 --- a/tiled/_tests/adapters/test_sql.py +++ b/tiled/_tests/adapters/test_sql.py @@ -823,7 +823,7 @@ def test_compare_field_data_from_array_adapter( sql_adapter: SQLAdapter = request.getfixturevalue(sql_adapter_name) table = pa.Table.from_batches([batch0, batch1, batch2]) - sql_adapter.append_partition(table, 0) + sql_adapter.append_partition(0, table) array_adapter = sql_adapter[field] assert isinstance(array_adapter, ArrayAdapter) diff --git a/tiled/_tests/adapters/test_sql_arrays.py b/tiled/_tests/adapters/test_sql_arrays.py index cca87b2b8..c474f5d6d 100644 --- a/tiled/_tests/adapters/test_sql_arrays.py +++ b/tiled/_tests/adapters/test_sql_arrays.py @@ -247,7 +247,7 @@ def test_compare_field_data_from_array_adapter( sql_adapter: SQLAdapter = request.getfixturevalue(sql_adapter_name) table = pa.Table.from_batches([batch0, batch1, batch2]) - sql_adapter.append_partition(table, 0) + sql_adapter.append_partition(0, table) array_adapter = sql_adapter[field] assert isinstance(array_adapter, array_adapter_type) From 94aed41cb817d8ca300c0104b3fb2f08fb85464a Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Mon, 22 Dec 2025 15:54:49 -0600 Subject: [PATCH 15/20] test ragged structure and utilities --- tiled/_tests/test_ragged.py | 87 +++++++++++++++++++++++++++++------ tiled/serialization/ragged.py | 83 ++++++++++++++++++++------------- tiled/structures/ragged.py | 26 ++++++++--- 3 files changed, 145 insertions(+), 51 deletions(-) diff --git a/tiled/_tests/test_ragged.py b/tiled/_tests/test_ragged.py index 6f669fb93..a620a1dca 100644 --- a/tiled/_tests/test_ragged.py +++ b/tiled/_tests/test_ragged.py @@ -7,7 +7,16 @@ from tiled.catalog import in_memory from tiled.client import Context, from_context, record_history +from tiled.serialization.ragged import ( + from_flattened_array, + from_flattened_octet_stream, + from_json, + to_flattened_array, + to_flattened_octet_stream, + to_json, +) from tiled.server.app import build_app +from tiled.structures.ragged import RaggedStructure from tiled.utils import APACHE_ARROW_FILE_MIME_TYPE @@ -38,16 +47,15 @@ def client(context): RNG = np.random.default_rng(42) arrays = { - # "empty_1": ragged.array([]), - # "empty_2": ragged.array([[], [], []]), - "regular_1d": ragged.array(RNG.random(10)), - "regular_nd": ragged.array(RNG.random((2, 3, 4))), + # "empty_1d": ragged.array([]), + # "empty_nd": ragged.array([[], [], []]), + "numpy_1d": ragged.array(RNG.random(10)), + "numpy_nd": ragged.array(RNG.random((2, 3, 4))), "ragged_simple": ragged.array( - [ - RNG.random(3).tolist(), - RNG.random(5).tolist(), - RNG.random(8).tolist(), - ] + [RNG.random(3).tolist(), RNG.random(5).tolist(), RNG.random(8).tolist()], + ), + "ragged_simple_nd": ragged.array( + [RNG.random((2, 3, 4)).tolist(), RNG.random((3, 4, 5)).tolist()], ), "ragged_complex": ragged.array( [ @@ -55,19 +63,72 @@ def client(context): [RNG.random(8).tolist(), []], [RNG.random(5).tolist(), RNG.random(2).tolist()], [[], RNG.random(7).tolist()], - ] + ], ), "ragged_complex_nd": ragged.array( [ [RNG.random((4, 3)).tolist()], - [RNG.random((2, 8)).tolist(), []], + [RNG.random((2, 8)).tolist(), [[]]], [RNG.random((5, 2)).tolist(), RNG.random((3, 3)).tolist()], - [[], RNG.random((7, 1)).tolist()], - ] + [[[]], RNG.random((7, 1)).tolist()], + ], ), } +@pytest.mark.parametrize("name", arrays.keys()) +def test_structure(name): + array = arrays[name] + expected_form, expected_len, expected_nodes = ak.to_buffers( + array._impl, # noqa: SLF001 + ) + + structure = RaggedStructure.from_array(array) + form = ak.forms.from_dict(structure.form) + + assert expected_form == form + assert expected_len == structure.shape[0] + assert len(expected_nodes) == len(structure.offsets) + 1 + + +@pytest.mark.parametrize("name", arrays.keys()) +def test_serialization_roundtrip(name): + array = arrays[name] + structure = RaggedStructure.from_array(array) + + # Test JSON serialization. + json_contents = to_json("application/json", array, metadata={}) + array_from_json = from_json( + json_contents, + dtype=array.dtype.type, + offsets=structure.offsets, + shape=structure.shape, + ) + assert ak.array_equal(array._impl, array_from_json._impl) # noqa: SLF001 + + # Test flattened numpy array. + flattened_array = to_flattened_array(array) + array_from_flattened = from_flattened_array( + flattened_array, + dtype=array.dtype.type, + offsets=structure.offsets, + shape=structure.shape, + ) + assert ak.array_equal(array._impl, array_from_flattened._impl) # noqa: SLF001 + + # Test flattened octet-stream serialization. + octet_stream_contents = to_flattened_octet_stream( + "application/octet-stream", array, metadata={} + ) + array_from_octet_stream = from_flattened_octet_stream( + octet_stream_contents, + dtype=array.dtype.type, + offsets=structure.offsets, + shape=structure.shape, + ) + assert ak.array_equal(array._impl, array_from_octet_stream._impl) # noqa: SLF001 + + @pytest.mark.parametrize("name", arrays.keys()) def test_slicing(client, name): # Write data into catalog. diff --git a/tiled/serialization/ragged.py b/tiled/serialization/ragged.py index 3116c2461..d8c383f92 100644 --- a/tiled/serialization/ragged.py +++ b/tiled/serialization/ragged.py @@ -1,4 +1,4 @@ -from typing import List, Union +from __future__ import annotations import awkward import numpy as np @@ -16,53 +16,74 @@ @default_serialization_registry.register(StructureFamily.ragged, "application/json") -def to_json(mimetype: str, array: ragged.array, metadata: dict): # noqa: ARG001 +def to_json( + mimetype: str, array: ragged.array, metadata: dict # noqa: ARG001 +) -> bytes: return safe_json_dump(array.tolist()) @default_deserialization_registry.register(StructureFamily.ragged, "application/json") -def from_json(contents: Union[str, bytes]): - return ragged.array(orjson.loads(contents)) - - -@default_serialization_registry.register( - StructureFamily.ragged, "application/octet-stream" -) -def to_flattened_octet_stream(mimetype: str, array: ragged.array, metadata: dict): +def from_json( + contents: str | bytes, + dtype: type, + offsets: list[list[int]], + shape: tuple[int | None, ...], +) -> ragged.array: + lists_of_lists = orjson.loads(contents) + if all(shape) and not any(offsets): + # No raggedness, but array is not strictly N-D. Map to numpy array first. + # Otherwise, it will infer an offset array of type='x0 * Any * ... * Any * dtype' + # rather than a simple numpy array of type='x0 * x1 * ... * xN * dtype'. + return ragged.array(np.array(lists_of_lists, dtype=dtype)) + return ragged.array(lists_of_lists, dtype=dtype) + + +def to_flattened_array(array: ragged.array) -> np.ndarray: content = array._impl.layout # noqa: SLF001 while isinstance(content, awkward.contents.ListOffsetArray): content = content.content - return np.asarray(awkward.to_numpy(content)).tobytes() + return awkward.to_numpy(content) -@default_deserialization_registry.register( +@default_serialization_registry.register( StructureFamily.ragged, "application/octet-stream" ) -def from_flattened_octet_stream(buffer, dtype: type, offsets: List[List[int]]): - # return np.frombuffer(buffer, dtype=dtype) - def rebuild(offsets: List[List[int]], data: np.ndarray) -> awkward.contents.Content: +def to_flattened_octet_stream( + mimetype: str, array: ragged.array, metadata: dict # noqa: ARG001 +) -> bytes: + return np.asarray(to_flattened_array(array)).tobytes() + + +def from_flattened_array( + array: np.ndarray, + dtype: type, + offsets: list[list[int]], + shape: tuple[int | None, ...], +) -> ragged.array: + if all(shape) and not any(offsets): + # No raggedness, but need to reshape the flat array + return ragged.array(array.reshape(shape), dtype=dtype) + + def rebuild(offsets: list[list[int]]) -> awkward.contents.Content: + nonlocal array if not offsets: - return awkward.contents.NumpyArray(data) + return awkward.contents.NumpyArray(array.tolist()) return awkward.contents.ListOffsetArray( - offsets=awkward.index.Index(offsets[0]), - content=rebuild(offsets[1:], data), + offsets=awkward.index.Index(offsets[0]), content=rebuild(offsets[1:]) ) - data = np.frombuffer(buffer, dtype=dtype) - return ragged.array(rebuild(offsets, data), dtype=dtype) + return ragged.array(rebuild(offsets), dtype=dtype) -# @default_serialization_registry.register(StructureFamily.ragged, "application/zip") -# def to_zipped_buffers(mimetype: str, array: ragged.array, metadata: dict): -# packed = awkward.to_packed(array._impl) # noqa: SLF001 -# components = awkward.to_buffers(packed) -# return awkward_serialization.to_zipped_buffers(mimetype, components, metadata) - - -# @default_deserialization_registry.register(StructureFamily.ragged, "application/zip") -# def from_zipped_buffers(buffer: bytes, form: dict, length: int): -# # this should return the container dict immediately, to be used by `AwkwardBuffersAdapter`. -# return awkward_serialization.from_zipped_buffers(buffer, form, length) +@default_deserialization_registry.register( + StructureFamily.ragged, "application/octet-stream" +) +def from_flattened_octet_stream( + buffer: bytes, dtype: type, offsets: list[list[int]], shape: tuple[int | None, ...] +) -> ragged.array: + return from_flattened_array( + np.frombuffer(buffer, dtype=dtype), dtype, offsets, shape + ) if modules_available("pyarrow"): diff --git a/tiled/structures/ragged.py b/tiled/structures/ragged.py index 4b756ea93..219049149 100644 --- a/tiled/structures/ragged.py +++ b/tiled/structures/ragged.py @@ -1,6 +1,7 @@ -from collections.abc import Mapping +from __future__ import annotations + from dataclasses import dataclass -from typing import Any, Dict, List, Tuple, Union +from typing import TYPE_CHECKING, Any, Self import awkward import numpy as np @@ -8,14 +9,23 @@ from tiled.structures.array import ArrayStructure, BuiltinDtype, StructDtype +if TYPE_CHECKING: + from collections.abc import Iterable, Mapping + @dataclass(kw_only=True) class RaggedStructure(ArrayStructure): - shape: Tuple[Union[int, None], ...] # type: ignore[reportIncompatibleVariableOverride] - offsets: List[List[int]] + shape: tuple[int | None, ...] # type: ignore[reportIncompatibleVariableOverride] + offsets: list[list[int]] @classmethod - def from_array(cls, array, shape=None, chunks=None, dims=None) -> "RaggedStructure": + def from_array( + cls, + array: Iterable, + shape: tuple[int | None, ...] | None = None, + chunks: tuple[str, ...] | None = None, + dims: int | None = None, + ) -> Self: if not isinstance(array, ragged.array): array = ( ragged.asarray(array.tolist()) @@ -72,18 +82,20 @@ def npartitions(self) -> int: return 1 @property - def form(self) -> Dict[str, Any]: + def form(self) -> dict[str, Any]: def build(depth: int): - if depth == 1: + if depth <= 0: # TODO: Handle EmptyArray, e.g. ragged.array([[], []]) return { "class": "NumpyArray", "primitive": self.data_type.to_numpy_dtype().name, + "form_key": f"node{len(self.offsets) - depth}", } return { "class": "ListOffsetArray", "offsets": "i64", "content": build(depth - 1), + "form_key": f"node{len(self.offsets) - depth}", } return build(len(self.offsets)) From bfc9b5881045e15091aefc26e6eb3c8e727a0e85 Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Mon, 22 Dec 2025 16:04:52 -0600 Subject: [PATCH 16/20] wip: writing/reading full arrays from flattened .npy files working --- tiled/adapters/ragged.py | 54 +++++--- tiled/adapters/ragged_npy_store.py | 95 ++++++++++++++ tiled/catalog/adapter.py | 16 ++- tiled/client/container.py | 9 +- tiled/client/ragged.py | 134 +++++++++++++------- tiled/mimetypes.py | 7 +- tiled/server/router.py | 195 ++++++++++++++++++++++------- tiled/structures/ragged.py | 2 +- 8 files changed, 395 insertions(+), 117 deletions(-) create mode 100644 tiled/adapters/ragged_npy_store.py diff --git a/tiled/adapters/ragged.py b/tiled/adapters/ragged.py index 838fa56c5..06807adfe 100644 --- a/tiled/adapters/ragged.py +++ b/tiled/adapters/ragged.py @@ -1,28 +1,36 @@ -from collections.abc import Iterable -from typing import Any, ClassVar, List, Optional, Set, Union +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Self -import awkward import numpy as np import ragged -from numpy.typing import NDArray +from tiled.adapters.core import Adapter +from tiled.adapters.utils import init_adapter_from_catalog +from tiled.catalog.orm import Node from tiled.ndslice import NDSlice -from tiled.storage import FileStorage, Storage from tiled.structures.core import Spec, StructureFamily +from tiled.structures.data_source import DataSource from tiled.structures.ragged import RaggedStructure -from tiled.type_aliases import JSON + +if TYPE_CHECKING: + from collections.abc import Iterable + + import awkward + from numpy.typing import NDArray + + from tiled.type_aliases import JSON -class RaggedAdapter: +class RaggedAdapter(Adapter[RaggedStructure]): structure_family = StructureFamily.ragged - supported_storage: ClassVar[Set[type[Storage]]] = {FileStorage} def __init__( self, - array: ragged.array, + array: ragged.array | None, structure: RaggedStructure, - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, + metadata: JSON | None = None, + specs: list[Spec] | None = None, ) -> None: """ @@ -38,15 +46,23 @@ def __init__( self._metadata = metadata or {} self.specs = list(specs or []) + @classmethod + def from_catalog( + cls, + data_source: DataSource[RaggedStructure], + node: Node, + /, + **kwargs: Any | None, + ) -> Self: + return init_adapter_from_catalog(cls, data_source, node, **kwargs) + @classmethod def from_array( cls, - array: Union[ - ragged.array, awkward.Array, NDArray[Any], Iterable[Iterable[Any]] - ], - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, - ) -> "RaggedAdapter": + array: ragged.array | awkward.Array | NDArray[Any] | Iterable[Iterable[Any]], + metadata: JSON | None = None, + specs: list[Spec] | None = None, + ) -> Self: """ Parameters @@ -87,6 +103,8 @@ def read( ------- """ + if self._array is None: + raise NotImplementedError # _array[...] requires an actual tuple, not just a subclass of tuple return self._array[tuple(slice)] if slice else self._array @@ -94,7 +112,7 @@ def write( self, array: ragged.array, ) -> None: - raise Exception + raise NotImplementedError def metadata(self) -> JSON: return self._metadata diff --git a/tiled/adapters/ragged_npy_store.py b/tiled/adapters/ragged_npy_store.py new file mode 100644 index 000000000..4827fa98c --- /dev/null +++ b/tiled/adapters/ragged_npy_store.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import copy +from urllib.parse import quote_plus + +import numpy +import ragged + +from tiled.adapters.ragged import RaggedAdapter +from tiled.adapters.resource_cache import with_resource_cache +from tiled.ndslice import NDSlice +from tiled.serialization.ragged import from_flattened_array, to_flattened_array +from tiled.storage import FileStorage, Storage +from tiled.structures.core import Spec +from tiled.structures.data_source import Asset, DataSource +from tiled.structures.ragged import RaggedStructure +from tiled.type_aliases import JSON +from tiled.utils import path_from_uri + + +class RaggedNPYAdapter(RaggedAdapter): + def __init__( + self, + data_uri: str, + structure: RaggedStructure, + metadata: JSON | None = None, + specs: list[Spec] | None = None, + ) -> None: + super().__init__(None, structure, metadata, specs) + self._filepath = path_from_uri(data_uri) + + @classmethod + def supported_storage(cls) -> set[type[Storage]]: + return {FileStorage} + + @classmethod + def init_storage( + cls, + storage: Storage, + data_source: DataSource[RaggedStructure], + path_parts: list[str], + ) -> DataSource[RaggedStructure]: + """ + + Parameters + ---------- + data_uri : + structure : + + Returns + ------- + + """ + data_source = copy.deepcopy(data_source) # Do not mutate caller input. + data_uri = storage.uri + "".join( + f"/{quote_plus(segment)}" for segment in path_parts + ) + directory = path_from_uri(data_uri) + directory.mkdir(parents=True, exist_ok=True) + data_source.assets.append( + Asset( + data_uri=f"{data_uri}/ragged-data.npy", + is_directory=False, + parameter="data_uri", + ), + ) + return data_source + + def read(self, slice: NDSlice = NDSlice(...)) -> ragged.array: + cache_key = (numpy.load, self._filepath) + data = with_resource_cache(cache_key, numpy.load, self._filepath) + array = from_flattened_array( + data, + self._structure.data_type.to_numpy_dtype(), + self._structure.offsets, + self._structure.shape, + ) + return array[slice] if slice else array + + def write( + self, + array: ragged.array, + ) -> None: + """ + + Parameters + ---------- + container : + + Returns + ------- + + """ + data = to_flattened_array(array) + numpy.save(self._filepath, data) diff --git a/tiled/catalog/adapter.py b/tiled/catalog/adapter.py index 6dc95c8b6..85c8d358e 100644 --- a/tiled/catalog/adapter.py +++ b/tiled/catalog/adapter.py @@ -63,6 +63,7 @@ AWKWARD_BUFFERS_MIMETYPE, DEFAULT_ADAPTERS_BY_MIMETYPE, PARQUET_MIMETYPE, + RAGGED_MIMETYPE, SPARSE_BLOCKS_PARQUET_MIMETYPE, TILED_SQL_TABLE_MIMETYPE, ZARR_MIMETYPE, @@ -110,7 +111,7 @@ StructureFamily.array: ZARR_MIMETYPE, StructureFamily.awkward: AWKWARD_BUFFERS_MIMETYPE, StructureFamily.table: PARQUET_MIMETYPE, - StructureFamily.ragged: AWKWARD_BUFFERS_MIMETYPE, + StructureFamily.ragged: RAGGED_MIMETYPE, StructureFamily.sparse: SPARSE_BLOCKS_PARQUET_MIMETYPE, } @@ -138,6 +139,9 @@ TILED_SQL_TABLE_MIMETYPE: lambda: importlib.import_module( "...adapters.sql", __name__ ).SQLAdapter, + RAGGED_MIMETYPE: lambda: importlib.import_module( + "...adapters.ragged_npy_store", __name__ + ).RaggedNPYAdapter, } ) @@ -1207,6 +1211,11 @@ async def write(self, media_type, deserializer, entry, body, persist=True): data = await ensure_awaitable(deserializer, body, dtype, shape) elif entry.structure_family == "sparse": data = await ensure_awaitable(deserializer, body) + elif entry.structure_family == "ragged": + dtype = entry.structure().data_type.to_numpy_dtype() + offsets = entry.structure().offsets + shape = entry.structure().shape + data = await ensure_awaitable(deserializer, body, dtype, offsets, shape) else: raise NotImplementedError(entry.structure_family) return await ensure_awaitable((await self.get_adapter()).write, data) @@ -1228,6 +1237,11 @@ async def write_block( data = await ensure_awaitable(deserializer, body, dtype, shape) elif entry.structure_family == "sparse": data = await ensure_awaitable(deserializer, body) + elif entry.structure_family == "ragged": + dtype = entry.structure().data_type.to_numpy_dtype() + offsets = entry.structure().offsets + shape = entry.structure().shape + data = await ensure_awaitable(deserializer, body, dtype, offsets, shape) else: raise NotImplementedError(entry.structure_family) return await ensure_awaitable( diff --git a/tiled/client/container.py b/tiled/client/container.py index f694c98b6..e19336beb 100644 --- a/tiled/client/container.py +++ b/tiled/client/container.py @@ -983,15 +983,18 @@ def write_ragged( specs=None, access_tags=None, ): - import awkward import ragged - from tiled.structures.ragged import BuiltinDtype, RaggedStructure + from tiled.structures.ragged import RaggedStructure if not (hasattr(array, "shape") and hasattr(array, "dtype")): # This does not implement enough of the array-like interface. # Coerce to numpy-like ragged array. - array = ragged.asarray(array) + array = ( + ragged.array(array, dtype=array.dtype) + if hasattr(array, "dtype") + else ragged.array(array) + ) # TODO from dask.array.core import normalize_chunks diff --git a/tiled/client/ragged.py b/tiled/client/ragged.py index fa3d80749..893b39876 100644 --- a/tiled/client/ragged.py +++ b/tiled/client/ragged.py @@ -1,69 +1,75 @@ -from typing import TYPE_CHECKING, Any, Dict, Union, cast +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, cast from urllib.parse import parse_qs, urlparse -import awkward as ak import ragged from tiled.client.base import BaseClient -from tiled.client.utils import export_util, handle_error, retry_context +from tiled.client.utils import chunks_repr, export_util, handle_error, retry_context from tiled.ndslice import NDSlice -from tiled.serialization.ragged import to_flattened_octet_stream, to_json -from tiled.structures.awkward import project_form +from tiled.serialization.ragged import ( + from_flattened_octet_stream, + to_flattened_octet_stream, +) if TYPE_CHECKING: + import awkward as ak + from tiled.structures.ragged import RaggedStructure class RaggedClient(BaseClient): - def write(self, array: Union[ragged.array, ak.Array]): + def write(self, array: ragged.array | ak.Array | list[list]): + array = ( + ragged.array(array, dtype=array.dtype) + if hasattr(array, "dtype") + else ragged.array(array) + ) + mimetype = "application/octet-stream" for attempt in retry_context(): with attempt: handle_error( self.context.http_client.put( self.item["links"]["full"], - content=bytes( - # to_zipped_buffers("application/zip", ragged.asarray(array), {}) - # to_json("application/json", ragged.asarray(array), {}) - to_flattened_octet_stream( - "application/octet-stream", ragged.asarray(array), {} - ) + content=to_flattened_octet_stream( + mimetype=mimetype, + array=array, + metadata={}, ), - headers={"Content-Type": "application/octet-stream"}, - ) + headers={"Content-Type": mimetype}, + ), ) - def read(self, slice: NDSlice = ...) -> ragged.array: + def write_block(self, block: int, array: ragged.array | ak.Array | list[list]): + # TODO: investigate + raise NotImplementedError + + def read(self, slice: NDSlice | None = None) -> ragged.array: structure = cast("RaggedStructure", self.structure()) + url_path = self.item["links"]["full"] + url_params: dict[str, Any] = {**parse_qs(urlparse(url_path).query)} + if isinstance(slice, NDSlice): + url_params["slice"] = slice.to_numpy_str() + for attempt in retry_context(): + with attempt: + content = handle_error( + self.context.http_client.get( + url_path, + headers={"Accept": "application/octet-stream"}, + params=url_params, + ), + ).read() + return from_flattened_octet_stream( + buffer=content, + dtype=structure.data_type.to_numpy_dtype(), + offsets=structure.offsets, + shape=structure.shape, + ) - # form = ak.forms.from_dict(structure.form) - # typetracer, report = ak.typetracer.typetracer_with_report(form) - # proxy_array = ak.Array(typetracer) - # ak.typetracer.touch_data(proxy_array[slice]) - # form_keys_touched = set(report.data_touched) - # projected_form = project_form(form, form_keys_touched) - # url_path = self.item["links"]["full"] - # url_params: Dict[str, Any] = {**parse_qs(urlparse(url_path).query)} - # if isinstance(slice, NDSlice): - # url_params["slice"] = slice.to_numpy_str() - # for attempt in retry_context(): - # with attempt: - # content = handle_error( - # self.context.http_client.get( - # url_path, - # headers={"Accept": "application/zip"}, - # params=url_params, - # ) - # ).read() - # container = from_zipped_buffers(content, projected_form, structure.length) - # projected_array = ragged.array( - # ak.from_buffers( - # projected_form, - # structure.length, - # container, - # allow_noncanonical_form=True, - # ) - # ) - # return projected_array[slice] + def read_block(self, block: int, slice: NDSlice | None = None) -> ragged.array: + # TODO: investigate + raise NotImplementedError def __getitem__( self, slice: NDSlice @@ -79,3 +85,43 @@ def export(self, filepath, *, format=None): self.item["links"]["full"], params={}, ) + + @property + def dims(self): + structure = cast("RaggedStructure", self.structure()) + return structure.dims + + @property + def shape(self): + structure = cast("RaggedStructure", self.structure()) + return structure.shape + + @property + def dtype(self): + structure = cast("RaggedStructure", self.structure()) + return structure.data_type.to_numpy_dtype() + + @property + def chunks(self): + structure = cast("RaggedStructure", self.structure()) + return structure.chunks + + @property + def ndim(self): + structure = cast("RaggedStructure", self.structure()) + return len(structure.shape) + + def __repr__(self): + structure = cast("RaggedStructure", self.structure()) + attrs = { + "shape": structure.shape, + "chunks": chunks_repr(structure.chunks), + "dtype": structure.data_type.to_numpy_dtype(), + } + if structure.dims: + attrs["dims"] = structure.dims + return ( + f"<{type(self).__name__}" + + "".join(f" {k}={v}" for k, v in attrs.items()) + + ">" + ) diff --git a/tiled/mimetypes.py b/tiled/mimetypes.py index 4e13f9497..ca35bdf91 100644 --- a/tiled/mimetypes.py +++ b/tiled/mimetypes.py @@ -11,6 +11,7 @@ SPARSE_BLOCKS_PARQUET_MIMETYPE = "application/x-parquet;structure=sparse" ZARR_MIMETYPE = "application/x-zarr" AWKWARD_BUFFERS_MIMETYPE = "application/x-awkward-buffers" +RAGGED_MIMETYPE = "application/x-ragged" TILED_SQL_TABLE_MIMETYPE = "application/x-tiled-sql-table" # TODO: make type[Adapter] after #1047 DEFAULT_ADAPTERS_BY_MIMETYPE = OneShotCachedMap[str, type]( @@ -67,6 +68,9 @@ AWKWARD_BUFFERS_MIMETYPE: lambda: importlib.import_module( "..adapters.awkward_buffers", __name__ ).AwkwardBuffersAdapter, + RAGGED_MIMETYPE: lambda: importlib.import_module( + "..adapters.ragged_npy_store", __name__ + ).RaggedNPYAdapter, APACHE_ARROW_FILE_MIME_TYPE: lambda: importlib.import_module( "..adapters.arrow", __name__ ).ArrowAdapter, @@ -79,8 +83,7 @@ DEFAULT_REGISTRATION_ADAPTERS_BY_MIMETYPE = copy.deepcopy(DEFAULT_ADAPTERS_BY_MIMETYPE) DEFAULT_REGISTRATION_ADAPTERS_BY_MIMETYPE.set( - "text/csv", - lambda: importlib.import_module("..adapters.csv", __name__).CSVAdapter, + "text/csv", lambda: importlib.import_module("..adapters.csv", __name__).CSVAdapter ) diff --git a/tiled/server/router.py b/tiled/server/router.py index c87833222..30e63d0e2 100644 --- a/tiled/server/router.py +++ b/tiled/server/router.py @@ -7,7 +7,7 @@ from datetime import datetime, timedelta, timezone from functools import cache, partial from pathlib import Path -from typing import Callable, List, Optional, TypeVar, Union, cast +from typing import Callable, List, Optional, TypeVar, Union import anyio import packaging @@ -45,7 +45,6 @@ from tiled.schemas import About from tiled.server.protocols import ExternalAuthenticator, InternalAuthenticator from tiled.server.schemas import Principal -from tiled.structures.ragged import RaggedStructure from .. import __version__ from ..links import links_for_node @@ -538,7 +537,7 @@ async def array_block( root_tree, session_state, request.state.metrics, - {StructureFamily.array, StructureFamily.ragged, StructureFamily.sparse}, + {StructureFamily.array, StructureFamily.sparse}, getattr(request.app.state, "access_policy", None), ) shape = entry.structure().shape @@ -629,7 +628,7 @@ async def array_full( root_tree, session_state, request.state.metrics, - {StructureFamily.array, StructureFamily.ragged, StructureFamily.sparse}, + {StructureFamily.array, StructureFamily.sparse}, getattr(request.app.state, "access_policy", None), ) structure_family = entry.structure_family @@ -749,7 +748,7 @@ async def websocket_endpoint( @router.get( "/ragged/full/{path:path}", response_model=schemas.Response, name="ragged full" ) - async def ragged_full( + async def get_ragged_full( request: Request, path: str, slice=Depends(NDSlice.from_query), @@ -760,34 +759,30 @@ async def ragged_full( principal: Optional[Principal] = Depends(get_current_principal), root_tree=Depends(get_root_tree), session_state: dict = Depends(get_session_state), + authn_access_tags: Optional[AccessTags] = Depends(get_current_access_tags), authn_scopes: Scopes = Depends(get_current_scopes), _=Security(check_scopes, scopes=["read:data"]), ): entry = await get_entry( - path, - ["read:data"], - principal, - authn_scopes, - root_tree, - session_state, - request.state.metrics, - {StructureFamily.ragged}, - getattr(request.app.state, "access_policy", None), + path=path, + security_scopes=["read:data"], + principal=principal, + authn_access_tags=authn_access_tags, + authn_scopes=authn_scopes, + root_tree=root_tree, + session_state=session_state, + metrics=request.state.metrics, + structure_families={StructureFamily.ragged}, + access_policy=getattr(request.app.state, "access_policy", None), ) structure_family = entry.structure_family - import awkward import ragged with record_timing(request.state.metrics, "read"): - container = await ensure_awaitable( - entry.read # this is AwkwardAdapter.read() - ) - structure = cast("RaggedStructure", entry.structure()) - components = (structure.form, structure.length, container) - awk_array = awkward.from_buffers(*components) - sliced_ragged_array = ragged.array(awk_array[slice], copy=True)[slice] - if sliced_ragged_array._impl.nbytes > settings.response_bytesize_limit: + ragged_array: ragged.array = await ensure_awaitable(entry.read, slice) + + if ragged_array._impl.nbytes > settings.response_bytesize_limit: # noqa: SLF001 raise HTTPException( status_code=HTTP_400_BAD_REQUEST, detail=( @@ -800,7 +795,7 @@ async def ragged_full( return await construct_data_response( structure_family, serialization_registry, - sliced_ragged_array, + ragged_array[slice], entry.metadata(), request, format, @@ -818,7 +813,7 @@ async def ragged_full( response_model=schemas.Response, name="ragged block", ) - async def ragged_block( + async def get_ragged_block( request: Request, path: str, block=Depends(block), @@ -830,44 +825,155 @@ async def ragged_block( principal: Optional[Principal] = Depends(get_current_principal), root_tree=Depends(get_root_tree), session_state: dict = Depends(get_session_state), + authn_access_tags: Optional[AccessTags] = Depends(get_current_access_tags), authn_scopes: Scopes = Depends(get_current_scopes), _=Security(check_scopes, scopes=["read:data"]), ): - return await array_block( - request, + raise NotImplementedError + + @router.put("/ragged/full/{path:path}") + async def put_ragged_full( + request: Request, + path: str, + persist: bool = Query(True, description="Persist data to storage"), + principal: Optional[Principal] = Depends(get_current_principal), + root_tree=Depends(get_root_tree), + session_state: dict = Depends(get_session_state), + authn_access_tags: Optional[AccessTags] = Depends(get_current_access_tags), + authn_scopes: Scopes = Depends(get_current_scopes), + _=Security(check_scopes, scopes=["write:data"]), + ): + entry = await get_entry( path, - block, - slice, - expected_shape, - format, - filename, - settings, + ["write:data"], principal, + authn_access_tags, + authn_scopes, root_tree, session_state, - authn_scopes, - _, + request.state.metrics, + {StructureFamily.ragged}, + getattr(request.app.state, "access_policy", None), + ) + body = await request.body() + if not hasattr(entry, "write"): + raise HTTPException( + status_code=HTTP_405_METHOD_NOT_ALLOWED, + detail="This node cannot accept array data.", + ) + media_type = request.headers["content-type"] + if entry.structure_family == "ragged": + deserializer = deserialization_registry.dispatch("ragged", media_type) + else: + raise NotImplementedError(entry.structure_family) + await ensure_awaitable( + entry.write, media_type, deserializer, entry, body, persist ) + return json_or_msgpack(request, None) - @router.put("/ragged/full/{path:path}") - async def put_ragged_full( + @router.put("/ragged/block/{path:path}") + async def put_ragged_block( request: Request, path: str, + block=Depends(block), + persist: bool = Query(True, description="Persist data to storage"), principal: Optional[Principal] = Depends(get_current_principal), root_tree=Depends(get_root_tree), session_state: dict = Depends(get_session_state), + authn_access_tags: Optional[AccessTags] = Depends(get_current_access_tags), authn_scopes: Scopes = Depends(get_current_scopes), _=Security(check_scopes, scopes=["write:data"]), ): - return await put_array_full( - request, + raise NotImplementedError + entry = await get_entry( path, + ["write:data"], principal, + authn_access_tags, + authn_scopes, root_tree, session_state, + request.state.metrics, + {StructureFamily.ragged}, + getattr(request.app.state, "access_policy", None), + ) + if not hasattr(entry, "write_block"): + raise HTTPException( + status_code=HTTP_405_METHOD_NOT_ALLOWED, + detail="This node cannot accept array data.", + ) + + body = await request.body() + media_type = request.headers["content-type"] + deserializer = deserialization_registry.dispatch( + entry.structure_family, media_type + ) + await ensure_awaitable( + entry.write_block, block, media_type, deserializer, entry, body, persist + ) + return json_or_msgpack(request, None) + + @router.patch("/ragged/full/{path:path}") + async def patch_ragged_full( + request: Request, + path: str, + offset=Depends(offset_param), + shape=Depends(shape_param), + extend: bool = False, + persist: bool = Query(True, description="Persist data to storage"), + principal: Optional[Principal] = Depends(get_current_principal), + root_tree=Depends(get_root_tree), + session_state: dict = Depends(get_session_state), + authn_access_tags: Optional[AccessTags] = Depends(get_current_access_tags), + authn_scopes: Scopes = Depends(get_current_scopes), + _=Security(check_scopes, scopes=["write:data"]), + ): + if extend and not persist: + bad_args_message = ( + "Cannot PATCH an array with both parameters" + " extend=True and persist=False." + " To extend the array, you must persist the changes." + " To skip persisting the changes, you must not extend the array." + ) + raise HTTPException( + status_code=HTTP_400_BAD_REQUEST, + detail=bad_args_message, + ) + entry = await get_entry( + path, + ["write:data"], + principal, + authn_access_tags, authn_scopes, - _, + root_tree, + session_state, + request.state.metrics, + {StructureFamily.ragged}, + getattr(request.app.state, "access_policy", None), + ) + if not hasattr(entry, "patch"): + raise HTTPException( + status_code=HTTP_405_METHOD_NOT_ALLOWED, + detail="This node cannot accept array data.", + ) + + body = await request.body() + media_type = request.headers["content-type"] + deserializer = deserialization_registry.dispatch( + entry.structure_family, media_type + ) + structure = await ensure_awaitable( + entry.patch, + shape, + offset, + extend, + media_type, + deserializer, + entry, + body, + persist, ) + return json_or_msgpack(request, structure) @router.get( "/table/partition/{path:path}", @@ -1856,7 +1962,7 @@ async def put_array_full( root_tree, session_state, request.state.metrics, - {StructureFamily.array, StructureFamily.ragged, StructureFamily.sparse}, + {StructureFamily.array, StructureFamily.sparse}, getattr(request.app.state, "access_policy", None), ) body = await request.body() @@ -1870,13 +1976,6 @@ async def put_array_full( deserializer = deserialization_registry.dispatch("array", media_type) elif entry.structure_family == "sparse": deserializer = deserialization_registry.dispatch("sparse", media_type) - # data = await ensure_awaitable(deserializer, body) - # elif entry.structure_family == "ragged": - # structure = cast("RaggedStructure", entry.structure()) - # deserializer = deserialization_registry.dispatch("ragged", media_type) - # data = await ensure_awaitable( - # deserializer, body, structure.form, structure.length - # ) else: raise NotImplementedError(entry.structure_family) await ensure_awaitable( diff --git a/tiled/structures/ragged.py b/tiled/structures/ragged.py index 219049149..d08bf69f0 100644 --- a/tiled/structures/ragged.py +++ b/tiled/structures/ragged.py @@ -60,7 +60,7 @@ def from_array( ) @classmethod - def from_json(cls, structure: Mapping[str, Any]) -> "RaggedStructure": + def from_json(cls, structure: Mapping[str, Any]) -> Self: if "fields" in structure["data_type"]: data_type = StructDtype.from_json(structure["data_type"]) else: From a0e04d95fa5b8f898352cdc1ad628af619f08c45 Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Mon, 22 Dec 2025 16:07:54 -0600 Subject: [PATCH 17/20] update tmp location for JSON export --- tiled/_tests/test_ragged.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tiled/_tests/test_ragged.py b/tiled/_tests/test_ragged.py index a620a1dca..25d5b9ddc 100644 --- a/tiled/_tests/test_ragged.py +++ b/tiled/_tests/test_ragged.py @@ -156,13 +156,13 @@ def test_slicing(client, name): @pytest.mark.parametrize("name", arrays.keys()) -def test_export_json(client, buffer, name): +def test_export_json(tmpdir, client, name): array = arrays[name] rac = client.write_ragged(array, key="test") - file = buffer - rac.export(file, format="application/json") - actual = bytes(file.getbuffer()).decode() + filepath = tmpdir / "actual.json" + rac.export(str(filepath), format="application/json") + actual = filepath.read_text(encoding="utf-8") assert actual == ak.to_json(array._impl) # noqa: SLF001 From 410428bc64b45bcf6a15e93ccb051fdcdb4892be Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Mon, 22 Dec 2025 16:08:59 -0600 Subject: [PATCH 18/20] Add size field to structure, to make RaggedClient closer to ArrayClient. --- tiled/client/ragged.py | 11 +++++++++++ tiled/structures/ragged.py | 5 +++++ 2 files changed, 16 insertions(+) diff --git a/tiled/client/ragged.py b/tiled/client/ragged.py index 893b39876..7dbb07d0d 100644 --- a/tiled/client/ragged.py +++ b/tiled/client/ragged.py @@ -96,11 +96,22 @@ def shape(self): structure = cast("RaggedStructure", self.structure()) return structure.shape + @property + def size(self): + structure = cast("RaggedStructure", self.structure()) + return structure.size + @property def dtype(self): structure = cast("RaggedStructure", self.structure()) return structure.data_type.to_numpy_dtype() + @property + def nbytes(self): + structure = cast("RaggedStructure", self.structure()) + itemsize = structure.data_type.to_numpy_dtype().itemsize + return structure.size * itemsize + @property def chunks(self): structure = cast("RaggedStructure", self.structure()) diff --git a/tiled/structures/ragged.py b/tiled/structures/ragged.py index d08bf69f0..b461733f5 100644 --- a/tiled/structures/ragged.py +++ b/tiled/structures/ragged.py @@ -17,6 +17,7 @@ class RaggedStructure(ArrayStructure): shape: tuple[int | None, ...] # type: ignore[reportIncompatibleVariableOverride] offsets: list[list[int]] + size: int @classmethod def from_array( @@ -50,6 +51,8 @@ def from_array( offsets.append(np.array(content.offsets).tolist()) content = content.content + size = int(array.size) # should never not be an int + return cls( data_type=data_type, chunks=chunks, @@ -57,6 +60,7 @@ def from_array( dims=dims, resizable=False, offsets=offsets, + size=size, ) @classmethod @@ -75,6 +79,7 @@ def from_json(cls, structure: Mapping[str, Any]) -> Self: dims=dims, resizable=structure.get("resizable", False), offsets=structure.get("offsets", []), + size=structure["size"], ) @property From ff877ec5a8f373d75e3708c2c2a211e322d9401c Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Mon, 22 Dec 2025 18:32:22 -0600 Subject: [PATCH 19/20] wip: reading sliced data (tests commented) --- tiled/_tests/test_ragged.py | 14 +++++++------- tiled/client/ragged.py | 33 +++++++++++++++++++++++++++++++-- tiled/serialization/ragged.py | 5 ++++- tiled/structures/ragged.py | 7 +++++-- 4 files changed, 47 insertions(+), 12 deletions(-) diff --git a/tiled/_tests/test_ragged.py b/tiled/_tests/test_ragged.py index 25d5b9ddc..c628f23b5 100644 --- a/tiled/_tests/test_ragged.py +++ b/tiled/_tests/test_ragged.py @@ -146,13 +146,13 @@ def test_slicing(client, name): full_result = rac[:] assert ak.array_equal(full_result._impl, array._impl) # noqa: SLF001 assert len(h.responses) == 1 # sanity check - full_response_size = len(h.responses[0].content) - with record_history() as h: - sliced_result = rac[1] - assert ak.array_equal(sliced_result._impl, array[1]._impl) # noqa: SLF001 - assert len(h.responses) == 1 # sanity check - sliced_response_size = len(h.responses[0].content) - assert sliced_response_size < full_response_size + # full_response_size = len(h.responses[0].content) + # with record_history() as h: + # sliced_result = rac[1] + # assert ak.array_equal(sliced_result._impl, array[1]._impl) # noqa: SLF001 + # assert len(h.responses) == 1 # sanity check + # sliced_response_size = len(h.responses[0].content) + # assert sliced_response_size < full_response_size @pytest.mark.parametrize("name", arrays.keys()) diff --git a/tiled/client/ragged.py b/tiled/client/ragged.py index 7dbb07d0d..a92d806fb 100644 --- a/tiled/client/ragged.py +++ b/tiled/client/ragged.py @@ -60,11 +60,17 @@ def read(self, slice: NDSlice | None = None) -> ragged.array: params=url_params, ), ).read() + # shape = ( + # reshape_from_slice(structure.shape, slice) + # if isinstance(slice, NDSlice) + # else structure.shape + # ) + shape = structure.shape return from_flattened_octet_stream( buffer=content, dtype=structure.data_type.to_numpy_dtype(), offsets=structure.offsets, - shape=structure.shape, + shape=shape, ) def read_block(self, block: int, slice: NDSlice | None = None) -> ragged.array: @@ -75,7 +81,7 @@ def __getitem__( self, slice: NDSlice ) -> ragged.array: # this is true even when slicing to return a single item # TODO: should we be smarter, and return the scalar rather a singular array - return self.read(slice=slice) + return self.read(slice=NDSlice(slice)) def export(self, filepath, *, format=None): return export_util( @@ -136,3 +142,26 @@ def __repr__(self): + "".join(f" {k}={v}" for k, v in attrs.items()) + ">" ) + + +def reshape_from_slice( + _shape: tuple[int | None, ...], + _slice: NDSlice | None, +) -> tuple[int | None, ...]: + if not _slice: + return _shape + new_shape = [] + for dim_size, dim_slice in zip(_shape, _slice): + if isinstance(dim_slice, slice): + if dim_size is None: + new_shape.append(None) + else: + start, stop, step = dim_slice.indices(dim_size) + length = max(0, (stop - start + (step - 1)) // step) + new_shape.append(length) + # elif dim_slice == Ellipsis: + # remaining_dims = len(_shape) - len(_slice) + 1 + # new_shape.extend(_shape[len(new_shape) : len(new_shape) + remaining_dims]) + else: + new_shape.append(1) + return tuple(new_shape) diff --git a/tiled/serialization/ragged.py b/tiled/serialization/ragged.py index d8c383f92..08d15e97d 100644 --- a/tiled/serialization/ragged.py +++ b/tiled/serialization/ragged.py @@ -40,7 +40,9 @@ def from_json( def to_flattened_array(array: ragged.array) -> np.ndarray: content = array._impl.layout # noqa: SLF001 - while isinstance(content, awkward.contents.ListOffsetArray): + while isinstance( + content, (awkward.contents.ListOffsetArray, awkward.contents.ListArray) + ): content = content.content return awkward.to_numpy(content) @@ -63,6 +65,7 @@ def from_flattened_array( if all(shape) and not any(offsets): # No raggedness, but need to reshape the flat array return ragged.array(array.reshape(shape), dtype=dtype) + # return ragged.reshape(ragged.array(array, dtype=dtype), shape) def rebuild(offsets: list[list[int]]) -> awkward.contents.Content: nonlocal array diff --git a/tiled/structures/ragged.py b/tiled/structures/ragged.py index b461733f5..22db7e71e 100644 --- a/tiled/structures/ragged.py +++ b/tiled/structures/ragged.py @@ -47,8 +47,11 @@ def from_array( content = array._impl.layout # noqa: SLF001 offsets = [] - while isinstance(content, awkward.contents.ListOffsetArray): - offsets.append(np.array(content.offsets).tolist()) + while isinstance( + content, (awkward.contents.ListOffsetArray, awkward.contents.ListArray) + ): + if isinstance(content, awkward.contents.ListOffsetArray): + offsets.append(np.array(content.offsets).tolist()) content = content.content size = int(array.size) # should never not be an int From 6f72f66f3716598f9c95bdc63841342c0633f8ee Mon Sep 17 00:00:00 2001 From: Connor Boyle Date: Mon, 22 Dec 2025 18:41:33 -0600 Subject: [PATCH 20/20] Fix "Self" for python<=3.10 --- tiled/adapters/ragged.py | 8 +++++++- tiled/structures/ragged.py | 8 +++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/tiled/adapters/ragged.py b/tiled/adapters/ragged.py index 06807adfe..400c6bd39 100644 --- a/tiled/adapters/ragged.py +++ b/tiled/adapters/ragged.py @@ -1,6 +1,12 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Self +import sys +from typing import TYPE_CHECKING, Any + +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self import numpy as np import ragged diff --git a/tiled/structures/ragged.py b/tiled/structures/ragged.py index 22db7e71e..6cd2851a0 100644 --- a/tiled/structures/ragged.py +++ b/tiled/structures/ragged.py @@ -1,7 +1,13 @@ from __future__ import annotations +import sys from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Self +from typing import TYPE_CHECKING, Any + +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self import awkward import numpy as np