Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ repos:
- id: end-of-file-fixer
- repo: https://github.com/charliermarsh/ruff-pre-commit
# keep the version here in sync with the version in uv.lock
rev: "v0.11.13"
rev: "v0.12.0"
hooks:
- id: ruff-check
args: [--fix, --exit-non-zero-on-fix]
Expand Down
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"python.analysis.autoImportCompletions": true,
"python.analysis.typeCheckingMode": "standard",
"python.testing.pytestArgs": [
"tilebox-datasets"
"tilebox-workflows"
],
"search.useGlobalIgnoreFiles": false,
"search.useIgnoreFiles": true
Expand Down
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.38.0] - 2025-06-24

### Added

- `tilebox-datasets`: Added `delete_collection` method to `DatasetClient` to delete a collection.
- `tilebox-workflows`: Added support for nested dataclasses and protobuf messages inside a `list`, `dict` or `tuple`
as task arguments.

## [0.37.1] - 2025-06-10

Expand Down Expand Up @@ -194,7 +198,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Released packages: `tilebox-datasets`, `tilebox-workflows`, `tilebox-storage`, `tilebox-grpc`


[Unreleased]: https://github.com/tilebox/tilebox-python/compare/v0.37.1...HEAD
[Unreleased]: https://github.com/tilebox/tilebox-python/compare/v0.38.0...HEAD
[0.38.0]: https://github.com/tilebox/tilebox-python/compare/v0.37.1...v0.38.0
[0.37.1]: https://github.com/tilebox/tilebox-python/compare/v0.37.0...v0.37.1
[0.37.0]: https://github.com/tilebox/tilebox-python/compare/v0.36.1...v0.37.0
[0.36.1]: https://github.com/tilebox/tilebox-python/compare/v0.36.0...v0.36.1
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

# Tilebox Python

Python clients for [Tilebox](https://tilebox.com) - a framework for space data management and workflow orchestration.
Python library for [Tilebox](https://tilebox.com), a lightweight space data management and orchestration software - on ground and in orbit.

## Install

Expand Down
6 changes: 6 additions & 0 deletions tilebox-datasets/tests/data/test_time_interval.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ def test_time_interval_to_half_open_preserves_eq(interval: TimeInterval) -> None
assert interval.to_half_open() == interval


@given(time_intervals())
def test_time_interval_to_half_open_preserves_hash(interval: TimeInterval) -> None:
"""Assert that converting a time interval to a half-open interval preserves hash equality"""
assert hash(interval.to_half_open()) == hash(interval)


@given(time_intervals())
def test_time_interval_repr(interval: TimeInterval) -> None:
"""Assert the repr of a time interval is as expected"""
Expand Down
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
7 changes: 7 additions & 0 deletions tilebox-datasets/tilebox/datasets/data/time_interval.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ def __eq__(self, other: object) -> bool:
a, b = self.to_half_open(), other.to_half_open()
return (a.start, a.end) == (b.start, b.end)

def __hash__(self) -> int:
"""Hash the time interval"""

# if two intervals are equal, they should have the same hash, so we convert to half-open intervals first
half_open = self.to_half_open()
return hash((half_open.start, half_open.end))

def __repr__(self) -> str:
return self.format()

Expand Down
8 changes: 4 additions & 4 deletions tilebox-workflows/tests/runner/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ def test_runner_with_fibonacci_workflow() -> None:
# we hardcode the trace parent for the job, which allows us to assert that every single outgoing request
# matches exactly byte for byte
get_trace_parent_mock.return_value = "00-f8d3b65869f638c5bfe173ffb3b3e5a0-ccf5709467cafc52-01"
job = client.jobs().submit("fibonacci", FibonacciTask(n), cluster="dev-cluster-ESugE7S4cwADVK")
job = client.jobs().submit("fibonacci", FibonacciTask(n))

cache = InMemoryCache()
runner = client.runner("dev-cluster-ESugE7S4cwADVK", tasks=[FibonacciTask, SumResultTask], cache=cache)
runner = client.runner(tasks=[FibonacciTask, SumResultTask], cache=cache)
runner.run_all()

job_cache = cache.group(str(job.id))
Expand All @@ -85,10 +85,10 @@ def test_runner_with_flaky_task() -> None:
# we hardcode the trace parent for the job, which allows us to assert that every single outgoing request
# matches exactly byte for byte
get_trace_parent_mock.return_value = "00-9680c9bfd602c4befe7b65a33a7b886d-3de78304f4cfbc40-01"
job = client.jobs().submit("flaky-task", FlakyTask(), cluster="dev-cluster-ESugE7S4cwADVK")
job = client.jobs().submit("flaky-task", FlakyTask())

cache = InMemoryCache()
runner = client.runner("dev-cluster-ESugE7S4cwADVK", tasks=[FlakyTask], cache=cache)
runner = client.runner(tasks=[FlakyTask], cache=cache)

runner.run_all() # task will fail
job = job_client.find(job) # load current job state
Expand Down
Git LFS file not shown
Git LFS file not shown
125 changes: 124 additions & 1 deletion tilebox-workflows/tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def test_serialize_deserialize_task_protobuf() -> None:
assert deserialize_task(ExampleProtobufTaskWithSingleProtobufArg, serialize_task(task)) == task


@dataclass
@dataclass(frozen=True, eq=True)
class NestedJson:
nested_x: str

Expand Down Expand Up @@ -225,3 +225,126 @@ class ExampleTaskWithNestedProtobuf(Task):
def test_serialize_deserialize_task_nested_protobuf() -> None:
task = ExampleTaskWithNestedProtobuf("Hello", SampleArgs(some_string="World", some_int=123))
assert deserialize_task(ExampleTaskWithNestedProtobuf, serialize_task(task)) == task


class ExampleTaskWithNestedJsonInList(Task):
x: str
nested_list: list[NestedJson]


def test_serialize_deserialize_task_nested_json_in_list() -> None:
task = ExampleTaskWithNestedJsonInList("Hello", [NestedJson("World"), NestedJson("!")])
assert deserialize_task(ExampleTaskWithNestedJsonInList, serialize_task(task)) == task


class ExampleTaskWithNestedJsonInTuple(Task):
x: str
nested_tuple: tuple[NestedJson, NestedJson]


def test_serialize_deserialize_task_nested_json_in_tuple() -> None:
task = ExampleTaskWithNestedJsonInTuple("Hello", (NestedJson("World"), NestedJson("!")))
assert deserialize_task(ExampleTaskWithNestedJsonInTuple, serialize_task(task)) == task


class ExampleTaskWithNestedJsonInVariadicTuple(Task):
x: str
nested_tuple: tuple[NestedJson, ...]


def test_serialize_deserialize_task_nested_json_in_variadic_tuple() -> None:
task = ExampleTaskWithNestedJsonInVariadicTuple("Hello", (NestedJson("World"), NestedJson("!"), NestedJson("!!!")))
assert deserialize_task(ExampleTaskWithNestedJsonInVariadicTuple, serialize_task(task)) == task


class ExampleTaskWithNestedJsonListOnly(Task):
nested_list: list[NestedJson]


def test_serialize_deserialize_task_nested_json_list_only() -> None:
task = ExampleTaskWithNestedJsonListOnly([NestedJson("World"), NestedJson("!")])
assert deserialize_task(ExampleTaskWithNestedJsonListOnly, serialize_task(task)) == task


class ExampleTaskWithNestedJsonInDict(Task):
x: str
nested_list: dict[str, NestedJson]


def test_serialize_deserialize_task_nested_json_in_dict() -> None:
task = ExampleTaskWithNestedJsonInDict("Hello", {"a": NestedJson("World"), "b": NestedJson("!")})
assert deserialize_task(ExampleTaskWithNestedJsonInDict, serialize_task(task)) == task


class ExampleTaskWithNestedJsonInNestedDict(Task):
x: str
nested_list: dict[str, dict[str, list[NestedJson]]]


def test_serialize_deserialize_task_nested_json_in_nested_dict() -> None:
task = ExampleTaskWithNestedJsonInNestedDict("Hello", {"a": {"b": [NestedJson("World"), NestedJson("!")]}})
assert deserialize_task(ExampleTaskWithNestedJsonInNestedDict, serialize_task(task)) == task


class ExampleTaskWithNestedProtobufInList(Task):
x: str
nested_list: list[SampleArgs]


def test_serialize_deserialize_task_nested_protobuf_in_list() -> None:
task = ExampleTaskWithNestedProtobufInList("Hello", [SampleArgs(some_string="World", some_int=123)])
assert deserialize_task(ExampleTaskWithNestedProtobufInList, serialize_task(task)) == task


class ExampleTaskWithNestedProtobufInTuple(Task):
x: str
nested_tuple: tuple[SampleArgs, SampleArgs]


def test_serialize_deserialize_task_nested_protobuf_in_tuple() -> None:
task = ExampleTaskWithNestedProtobufInTuple(
"Hello", (SampleArgs(some_string="World", some_int=123), SampleArgs(some_string="!", some_int=456))
)
assert deserialize_task(ExampleTaskWithNestedProtobufInTuple, serialize_task(task)) == task


class ExampleTaskWithNestedProtobufInVariadicTuple(Task):
x: str
nested_tuple: tuple[SampleArgs, ...]


def test_serialize_deserialize_task_nested_protobuf_in_variadic_tuple() -> None:
task = ExampleTaskWithNestedProtobufInVariadicTuple(
"Hello",
(
SampleArgs(some_string="World", some_int=123),
SampleArgs(some_string="!", some_int=456),
SampleArgs(some_string="!!!", some_int=789),
),
)
assert deserialize_task(ExampleTaskWithNestedProtobufInVariadicTuple, serialize_task(task)) == task


class ExampleTaskWithNestedProtobufInDict(Task):
x: str
nested_dict: dict[str, SampleArgs]


def test_serialize_deserialize_task_nested_protobuf_in_dict() -> None:
task = ExampleTaskWithNestedProtobufInDict(
"Hello", {"a": SampleArgs(some_string="World", some_int=123), "b": SampleArgs(some_string="!", some_int=456)}
)
assert deserialize_task(ExampleTaskWithNestedProtobufInDict, serialize_task(task)) == task


class ExampleTaskWithNestedProtobufInNestedDict(Task):
x: str
nested_dict: dict[str, dict[str, list[SampleArgs]]]


def test_serialize_deserialize_task_nested_protobuf_in_nested_dict() -> None:
task = ExampleTaskWithNestedProtobufInNestedDict(
"Hello",
{"a": {"b": [SampleArgs(some_string="World", some_int=123), SampleArgs(some_string="!", some_int=456)]}},
)
assert deserialize_task(ExampleTaskWithNestedProtobufInNestedDict, serialize_task(task)) == task
62 changes: 46 additions & 16 deletions tilebox-workflows/tilebox/workflows/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from base64 import b64decode, b64encode
from collections.abc import Sequence
from dataclasses import dataclass, field, fields, is_dataclass
from typing import Any, cast
from typing import Any, cast, get_args, get_origin

# from python 3.11 onwards this is available as typing.dataclass_transform:
from typing_extensions import dataclass_transform
Expand Down Expand Up @@ -299,7 +299,7 @@ def serialize_task(task: Task) -> bytes:
return b"" # empty task
if len(task_fields) == 1:
# if there is only one field, we can serialize it directly
field = _serialize_field(task, task_fields[0].name)
field = _serialize_value(getattr(task, task_fields[0].name), base64_encode_protobuf=False)
if not isinstance(field, bytes):
field = json.dumps(field).encode()
return field
Expand All @@ -314,18 +314,24 @@ def _serialize_as_dict(task: Task) -> dict[str, Any]:
if skip:
continue

value = _serialize_field(task, dataclass_field.name)
if isinstance(value, bytes):
as_dict[dataclass_field.name] = b64encode(value).decode("ascii")
else:
as_dict[dataclass_field.name] = value
as_dict[dataclass_field.name] = _serialize_value(
getattr(task, dataclass_field.name), base64_encode_protobuf=True
)

return as_dict


def _serialize_field(task: Task, field_name: str) -> Any:
value = getattr(task, field_name)
def _serialize_value(value: Any, base64_encode_protobuf: bool) -> Any: # noqa: PLR0911
if isinstance(value, list):
return [_serialize_value(v, base64_encode_protobuf) for v in value]
if isinstance(value, tuple):
return tuple(_serialize_value(v, base64_encode_protobuf) for v in value)
if isinstance(value, dict):
# avoid serializing the dict keys, since nested dicts are not valid keys in dicts
return {k: _serialize_value(v, base64_encode_protobuf) for k, v in value.items()}
if hasattr(value, "SerializeToString"): # protobuf message
if base64_encode_protobuf:
return b64encode(value.SerializeToString()).decode("ascii")
return value.SerializeToString()
if is_dataclass(value):
return _serialize_as_dict(value) # type: ignore[arg-type]
Expand All @@ -348,9 +354,7 @@ def deserialize_task(task_cls: type, task_input: bytes) -> Task:
if hasattr(field_type, "FromString"): # protobuf message
value = field_type.FromString(task_input) # type: ignore[arg-type]
else:
value = json.loads(task_input.decode())
if is_dataclass(field_type) and isinstance(value, dict):
value = _deserialize_dataclass(field_type, value) # type: ignore[arg-type]
value = _deserialize_value(field_type, json.loads(task_input.decode())) # type: ignore[arg-type]

return task_cls(**{task_fields[0].name: value})

Expand All @@ -362,9 +366,35 @@ def _deserialize_dataclass(cls: type, params: dict[str, Any]) -> Task:
for param in list(params):
# recursively deserialize nested dataclasses
field = cls.__dataclass_fields__[param]
if hasattr(field.type, "FromString"):
params[field.name] = field.type.FromString(b64decode(params[field.name]))
elif is_dataclass(field.type) and isinstance(params[field.name], dict):
params[field.name] = _deserialize_dataclass(field.type, params[field.name]) # type: ignore[arg-type]
params[field.name] = _deserialize_value(field.type, params[field.name])

return cls(**params)


def _deserialize_value(field_type: type, value: Any) -> Any: # noqa: PLR0911
if hasattr(field_type, "FromString"):
return field_type.FromString(b64decode(value))
if is_dataclass(field_type) and isinstance(value, dict):
return _deserialize_dataclass(field_type, value)

# in case our field type is a list or dict, we need to recursively deserialize the values
origin_type = get_origin(field_type)
if not origin_type:
return value # simple type, no further recursion needed

type_args = get_args(field_type) # the wrapped type in a container, e.g. list[str] -> type_args is (str,)

if isinstance(value, list) and origin_type is list and len(type_args) == 1:
return [_deserialize_value(type_args[0], v) for v in value]
if isinstance(value, list) and origin_type is tuple:
# tuples are serialized as json list, so we get a list back
# which we want to convert back to a tuple
if len(type_args) == 2 and type_args[1] is Ellipsis:
type_args = (type_args[0],) # variadic tuple, we only have one type argument to use for all values
return tuple(_deserialize_value(type_args[0], v) for v in value)
return tuple(_deserialize_value(type_args[min(i, len(type_args) - 1)], v) for i, v in enumerate(value))

if isinstance(value, dict) and origin_type is dict and len(type_args) == 2:
return {k: _deserialize_value(type_args[1], v) for k, v in value.items()}

return value
Loading
Loading