diff --git a/cognite/client/_api/streams/__init__.py b/cognite/client/_api/streams/__init__.py new file mode 100644 index 0000000000..76768a44fd --- /dev/null +++ b/cognite/client/_api/streams/__init__.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +from collections.abc import MutableSequence, Sequence +from typing import TYPE_CHECKING, Any + +from cognite.client._api.streams.records import StreamsRecordsAPI +from cognite.client._api_client import APIClient +from cognite.client.data_classes.streams.stream import ( + Stream, + StreamDeleteItem, + StreamList, + StreamWrite, +) +from cognite.client.utils._auxiliary import interpolate_and_url_encode + +if TYPE_CHECKING: + from cognite.client import CogniteClient + from cognite.client.config import ClientConfig + + +def _dump_write_item(obj: StreamWrite | dict[str, Any]) -> dict[str, Any]: + if isinstance(obj, dict): + return obj + return obj.dump() + + +def _dump_delete_item(obj: StreamDeleteItem | dict[str, Any]) -> dict[str, Any]: + if isinstance(obj, dict): + return obj + return obj.dump() + + +class StreamsAPI(APIClient): + """ILA Streams API (``/streams``) and nested :class:`StreamsRecordsAPI` (``/streams/{id}/records``).""" + + _RESOURCE_PATH = "/streams" + + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + self.records = StreamsRecordsAPI(config, api_version, cognite_client) + + def create(self, items: Sequence[StreamWrite | dict[str, Any]]) -> StreamList: + """`Create streams `_.""" + res = self._post(self._RESOURCE_PATH, json={"items": [_dump_write_item(i) for i in items]}) + return StreamList._load(res.json()["items"], cognite_client=self._cognite_client) + + def list(self) -> StreamList: + """`List streams `_ in the project.""" + res = self._get(self._RESOURCE_PATH) + return StreamList._load(res.json()["items"], cognite_client=self._cognite_client) + + def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream: + """`Retrieve a stream `_.""" + path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id) + params: dict[str, Any] | None = None + if include_statistics is not None: + params = {"includeStatistics": "true" if include_statistics else "false"} + res = self._get(path, params=params) + return Stream._load(res.json(), cognite_client=self._cognite_client) + + def delete(self, items: MutableSequence[StreamDeleteItem | dict[str, Any]]) -> None: + """`Delete streams `_ (POST).""" + self._post(f"{self._RESOURCE_PATH}/delete", json={"items": [_dump_delete_item(i) for i in items]}) diff --git a/cognite/client/_api/streams/records.py b/cognite/client/_api/streams/records.py new file mode 100644 index 0000000000..fa0deb7a3d --- /dev/null +++ b/cognite/client/_api/streams/records.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from cognite.client._api_client import APIClient +from cognite.client.data_classes.streams.stream_record import ( + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, +) +from cognite.client.utils._auxiliary import interpolate_and_url_encode + +if TYPE_CHECKING: + from cognite.client import CogniteClient + from cognite.client.config import ClientConfig + + +class StreamsRecordsAPI(APIClient): + """ILA record operations under ``/streams/{streamId}/records/...``.""" + + _RESOURCE_PATH = "/streams" + + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + + def _records_base(self, stream_external_id: str) -> str: + return interpolate_and_url_encode("/streams/{}/records", stream_external_id) + + def ingest(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: + """`Ingest records `_ into a stream.""" + res = self._post(self._records_base(stream_external_id), json=body) + return RecordsIngestResponse._load(res.json(), cognite_client=self._cognite_client) + + def upsert(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: + """`Upsert records `_ in a mutable stream.""" + res = self._post(self._records_base(stream_external_id) + "/upsert", json=body) + return RecordsIngestResponse._load(res.json(), cognite_client=self._cognite_client) + + def delete(self, stream_external_id: str, body: dict[str, Any]) -> RecordsDeleteResponse: + """`Delete records `_ from a mutable stream.""" + res = self._post(self._records_base(stream_external_id) + "/delete", json=body) + return RecordsDeleteResponse._load(res.json(), cognite_client=self._cognite_client) + + def filter(self, stream_external_id: str, body: dict[str, Any]) -> RecordsFilterResponse: + """`Filter records `_.""" + res = self._post(self._records_base(stream_external_id) + "/filter", json=body) + return RecordsFilterResponse._load(res.json(), cognite_client=self._cognite_client) + + def aggregate(self, stream_external_id: str, body: dict[str, Any]) -> RecordsAggregateResponse: + """`Aggregate over records `_.""" + res = self._post(self._records_base(stream_external_id) + "/aggregate", json=body) + return RecordsAggregateResponse._load(res.json(), cognite_client=self._cognite_client) + + def sync(self, stream_external_id: str, body: dict[str, Any]) -> RecordsSyncResponse: + """`Sync records `_ (cursor-based read).""" + res = self._post(self._records_base(stream_external_id) + "/sync", json=body) + return RecordsSyncResponse._load(res.json(), cognite_client=self._cognite_client) diff --git a/cognite/client/_api_client.py b/cognite/client/_api_client.py index d61ab8270f..7afc6bff7f 100644 --- a/cognite/client/_api_client.py +++ b/cognite/client/_api_client.py @@ -104,6 +104,7 @@ class APIClient: "raw/dbs/[^/]+/tables$", "relationships", "sequences", + "streams", "simulators", "simulators/models", "simulators/models/revisions", @@ -129,6 +130,10 @@ class APIClient: "transformations/cancel", "transformations/notifications", "transformations/run", + # ILA streams: write record batches (create/delete stream use __NON_RETRYABLE "streams"; read POSTs stay retryable) + r"streams/[^/]+/records", + r"streams/[^/]+/records/upsert", + r"streams/[^/]+/records/delete", ) ) ) diff --git a/cognite/client/_cognite_client.py b/cognite/client/_cognite_client.py index 6394854cb3..1e98bb296c 100644 --- a/cognite/client/_cognite_client.py +++ b/cognite/client/_cognite_client.py @@ -26,6 +26,7 @@ from cognite.client._api.relationships import RelationshipsAPI from cognite.client._api.sequences import SequencesAPI from cognite.client._api.simulators import SimulatorsAPI +from cognite.client._api.streams import StreamsAPI from cognite.client._api.templates import TemplatesAPI from cognite.client._api.three_d import ThreeDAPI from cognite.client._api.time_series import TimeSeriesAPI @@ -89,6 +90,7 @@ def __init__(self, config: ClientConfig | None = None) -> None: self.workflows = WorkflowAPI(self._config, self._API_VERSION, self) self.units = UnitAPI(self._config, self._API_VERSION, self) self.simulators = SimulatorsAPI(self._config, self._API_VERSION, self) + self.streams = StreamsAPI(self._config, self._API_VERSION, self) # APIs just using base_url: self._api_client = APIClient(self._config, api_version=None, cognite_client=self) diff --git a/cognite/client/data_classes/__init__.py b/cognite/client/data_classes/__init__.py index 5fcaf1796e..fc50add41a 100644 --- a/cognite/client/data_classes/__init__.py +++ b/cognite/client/data_classes/__init__.py @@ -203,6 +203,25 @@ GeometryFilter, TimestampRange, ) +from cognite.client.data_classes.streams import ( + Record, + RecordList, + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, + Stream, + StreamDeleteItem, + StreamLifecycleSettings, + StreamLimit, + StreamLimitSettings, + StreamList, + StreamSettings, + StreamWrite, + SyncRecord, + SyncRecordList, +) from cognite.client.data_classes.templates import ( ConstantResolver, Source, @@ -469,6 +488,13 @@ "LatestDatapointQuery", "OidcCredentials", "RawTable", + "Record", + "RecordList", + "RecordsAggregateResponse", + "RecordsDeleteResponse", + "RecordsFilterResponse", + "RecordsIngestResponse", + "RecordsSyncResponse", "Relationship", "RelationshipFilter", "RelationshipList", @@ -506,7 +532,17 @@ "Source", "SourceFile", "StatusCode", + "Stream", + "StreamDeleteItem", + "StreamLifecycleSettings", + "StreamLimit", + "StreamLimitSettings", + "StreamList", + "StreamSettings", + "StreamWrite", "SubworkflowTaskParameters", + "SyncRecord", + "SyncRecordList", "Table", "TableList", "TableWrite", diff --git a/cognite/client/data_classes/streams/__init__.py b/cognite/client/data_classes/streams/__init__.py new file mode 100644 index 0000000000..ed98581cc9 --- /dev/null +++ b/cognite/client/data_classes/streams/__init__.py @@ -0,0 +1,41 @@ +from cognite.client.data_classes.streams.stream import ( + Stream, + StreamDeleteItem, + StreamLifecycleSettings, + StreamLimit, + StreamLimitSettings, + StreamList, + StreamSettings, + StreamWrite, +) +from cognite.client.data_classes.streams.stream_record import ( + Record, + RecordList, + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, + SyncRecord, + SyncRecordList, +) + +__all__ = [ + "Record", + "RecordList", + "RecordsAggregateResponse", + "RecordsDeleteResponse", + "RecordsFilterResponse", + "RecordsIngestResponse", + "RecordsSyncResponse", + "Stream", + "StreamDeleteItem", + "StreamLifecycleSettings", + "StreamLimit", + "StreamLimitSettings", + "StreamList", + "StreamSettings", + "StreamWrite", + "SyncRecord", + "SyncRecordList", +] diff --git a/cognite/client/data_classes/streams/stream.py b/cognite/client/data_classes/streams/stream.py new file mode 100644 index 0000000000..ab400bfb2a --- /dev/null +++ b/cognite/client/data_classes/streams/stream.py @@ -0,0 +1,192 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from typing_extensions import Self + +from cognite.client.data_classes._base import ( + CogniteObject, + CogniteResource, + CogniteResourceList, + ExternalIDTransformerMixin, +) +from cognite.client.utils._text import convert_all_keys_to_camel_case + +if TYPE_CHECKING: + from cognite.client import CogniteClient + + +class StreamLimit(CogniteObject): + """Numeric limit bucket for a stream (provisioned / optionally consumed).""" + + def __init__(self, provisioned: float, consumed: float | None = None) -> None: + self.provisioned = provisioned + self.consumed = consumed + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + provisioned=resource["provisioned"], + consumed=resource.get("consumed"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"provisioned": self.provisioned} + if self.consumed is not None: + out["consumed"] = self.consumed + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamLifecycleSettings(CogniteObject): + """Lifecycle metadata for a stream (human-readable).""" + + def __init__( + self, + retained_after_soft_delete: str, + data_deleted_after: str | None = None, + ) -> None: + self.retained_after_soft_delete = retained_after_soft_delete + self.data_deleted_after = data_deleted_after + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + retained_after_soft_delete=resource["retainedAfterSoftDelete"], + data_deleted_after=resource.get("dataDeletedAfter"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"retained_after_soft_delete": self.retained_after_soft_delete} + if self.data_deleted_after is not None: + out["data_deleted_after"] = self.data_deleted_after + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamLimitSettings(CogniteObject): + """Provisioned/consumed limits for a stream.""" + + def __init__( + self, + max_records_total: StreamLimit, + max_giga_bytes_total: StreamLimit, + max_filtering_interval: str | None = None, + ) -> None: + self.max_records_total = max_records_total + self.max_giga_bytes_total = max_giga_bytes_total + self.max_filtering_interval = max_filtering_interval + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + max_records_total=StreamLimit._load(resource["maxRecordsTotal"], cognite_client=cognite_client), + max_giga_bytes_total=StreamLimit._load(resource["maxGigaBytesTotal"], cognite_client=cognite_client), + max_filtering_interval=resource.get("maxFilteringInterval"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "max_records_total": self.max_records_total.dump(camel_case=camel_case), + "max_giga_bytes_total": self.max_giga_bytes_total.dump(camel_case=camel_case), + } + if self.max_filtering_interval is not None: + out["max_filtering_interval"] = self.max_filtering_interval + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamSettings(CogniteObject): + """Read model for stream settings (lifecycle + limits).""" + + def __init__(self, lifecycle: StreamLifecycleSettings, limits: StreamLimitSettings) -> None: + self.lifecycle = lifecycle + self.limits = limits + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + lifecycle=StreamLifecycleSettings._load(resource["lifecycle"], cognite_client=cognite_client), + limits=StreamLimitSettings._load(resource["limits"], cognite_client=cognite_client), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "lifecycle": self.lifecycle.dump(camel_case=camel_case), + "limits": self.limits.dump(camel_case=camel_case), + } + + +class Stream(CogniteResource): + """A stream (ILA ``StreamResponseItem``).""" + + def __init__( + self, + external_id: str, + created_time: int, + created_from_template: str, + type: str, + settings: StreamSettings, + cognite_client: CogniteClient | None = None, + ) -> None: + self.external_id = external_id + self.created_time = created_time + self.created_from_template = created_from_template + self.type = type + self.settings = settings + self._cognite_client = cognite_client + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + external_id=resource["externalId"], + created_time=resource["createdTime"], + created_from_template=resource["createdFromTemplate"], + type=resource["type"], + settings=StreamSettings._load(resource["settings"], cognite_client=cognite_client), + cognite_client=cognite_client, + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = { + "external_id": self.external_id, + "created_time": self.created_time, + "created_from_template": self.created_from_template, + "type": self.type, + "settings": self.settings.dump(camel_case=camel_case), + } + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamList(CogniteResourceList[Stream], ExternalIDTransformerMixin): + """List of streams (``StreamResponse.items``).""" + + _RESOURCE = Stream + + +class StreamWrite(CogniteObject): + """Request item for creating a stream (``StreamRequestItem``).""" + + def __init__(self, external_id: str, settings: dict[str, Any]) -> None: + self.external_id = external_id + self.settings = settings + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(external_id=resource["externalId"], settings=resource["settings"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = {"external_id": self.external_id, "settings": self.settings} + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamDeleteItem(CogniteObject): + """Identifier for ``POST /streams/delete``.""" + + def __init__(self, external_id: str) -> None: + self.external_id = external_id + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(external_id=resource["externalId"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = {"external_id": self.external_id} + return convert_all_keys_to_camel_case(out) if camel_case else out diff --git a/cognite/client/data_classes/streams/stream_record.py b/cognite/client/data_classes/streams/stream_record.py new file mode 100644 index 0000000000..e09df553a3 --- /dev/null +++ b/cognite/client/data_classes/streams/stream_record.py @@ -0,0 +1,216 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from typing_extensions import Self + +from cognite.client.data_classes._base import ( + CogniteObject, + CogniteResource, + CogniteResourceList, + ExternalIDTransformerMixin, +) +from cognite.client.utils._text import convert_all_keys_to_camel_case + +if TYPE_CHECKING: + from cognite.client import CogniteClient + + +class Record(CogniteResource): + """A record returned from filter (ILA ``Record``).""" + + def __init__( + self, + space: str, + external_id: str, + created_time: int, + last_updated_time: int, + properties: dict[str, Any], + cognite_client: CogniteClient | None = None, + ) -> None: + self.space = space + self.external_id = external_id + self.created_time = created_time + self.last_updated_time = last_updated_time + self.properties = properties + self._cognite_client = cognite_client + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + properties=resource.get("properties", {}), + cognite_client=cognite_client, + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = { + "space": self.space, + "external_id": self.external_id, + "created_time": self.created_time, + "last_updated_time": self.last_updated_time, + "properties": self.properties, + } + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordList(CogniteResourceList[Record], ExternalIDTransformerMixin): + _RESOURCE = Record + + +class SyncRecord(CogniteResource): + """Record entry from sync (ILA ``SyncRecord``).""" + + def __init__( + self, + space: str, + external_id: str, + created_time: int, + last_updated_time: int, + status: str, + properties: dict[str, Any] | None = None, + cognite_client: CogniteClient | None = None, + ) -> None: + self.space = space + self.external_id = external_id + self.created_time = created_time + self.last_updated_time = last_updated_time + self.status = status + self.properties = properties + self._cognite_client = cognite_client + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + status=resource["status"], + properties=resource.get("properties"), + cognite_client=cognite_client, + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "space": self.space, + "external_id": self.external_id, + "created_time": self.created_time, + "last_updated_time": self.last_updated_time, + "status": self.status, + } + if self.properties is not None: + out["properties"] = self.properties + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class SyncRecordList(CogniteResourceList[SyncRecord], ExternalIDTransformerMixin): + _RESOURCE = SyncRecord + + +class RecordsFilterResponse(CogniteObject): + """``POST .../records/filter`` response.""" + + def __init__(self, items: RecordList, typing: dict[str, Any] | None = None) -> None: + self.items = items + self.typing = typing + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + items = RecordList._load(resource.get("items", []), cognite_client=cognite_client) + return cls(items=items, typing=resource.get("typing")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"items": self.items.dump(camel_case=camel_case)} + if self.typing is not None: + out["typing"] = self.typing + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordsSyncResponse(CogniteObject): + """``POST .../records/sync`` response.""" + + def __init__( + self, + items: SyncRecordList, + next_cursor: str, + has_next: bool, + typing: dict[str, Any] | None = None, + ) -> None: + self.items = items + self.next_cursor = next_cursor + self.has_next = has_next + self.typing = typing + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + items = SyncRecordList._load(resource.get("items", []), cognite_client=cognite_client) + return cls( + items=items, + next_cursor=resource["nextCursor"], + has_next=resource["hasNext"], + typing=resource.get("typing"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "items": self.items.dump(camel_case=camel_case), + "nextCursor": self.next_cursor, + "hasNext": self.has_next, + } + if self.typing is not None: + out["typing"] = self.typing + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordsAggregateResponse(CogniteObject): + """``POST .../records/aggregate`` response.""" + + def __init__(self, aggregates: dict[str, Any], typing: dict[str, Any] | None = None) -> None: + self.aggregates = aggregates + self.typing = typing + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(aggregates=resource.get("aggregates", {}), typing=resource.get("typing")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"aggregates": self.aggregates} + if self.typing is not None: + out["typing"] = self.typing + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordsDeleteResponse(CogniteObject): + """``POST .../records/delete`` — empty object means full success.""" + + def __init__(self, data: dict[str, Any]) -> None: + self._data = data + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(data=resource) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return convert_all_keys_to_camel_case(self._data) if camel_case else self._data + + +class RecordsIngestResponse(CogniteObject): + """``POST .../records`` (ingest/upsert) JSON body — often ``{}`` on success.""" + + def __init__(self, data: dict[str, Any]) -> None: + self._data = data + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(data=resource) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return convert_all_keys_to_camel_case(self._data) if camel_case else self._data + + @property + def is_empty_success(self) -> bool: + return self._data == {} diff --git a/cognite/client/testing.py b/cognite/client/testing.py index e964576b8c..c4ff63cb1d 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -69,6 +69,8 @@ from cognite.client._api.simulators.routine_revisions import SimulatorRoutineRevisionsAPI from cognite.client._api.simulators.routines import SimulatorRoutinesAPI from cognite.client._api.simulators.runs import SimulatorRunsAPI +from cognite.client._api.streams import StreamsAPI +from cognite.client._api.streams.records import StreamsRecordsAPI from cognite.client._api.synthetic_time_series import SyntheticDatapointsAPI from cognite.client._api.templates import ( TemplateGroupsAPI, @@ -182,6 +184,9 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.simulators.routines.revisions = MagicMock(spec_set=SimulatorRoutineRevisionsAPI) self.simulators.logs = MagicMock(spec_set=SimulatorLogsAPI) + self.streams = MagicMock(spec=StreamsAPI) + self.streams.records = MagicMock(spec_set=StreamsRecordsAPI) + self.sequences = MagicMock(spec=SequencesAPI) self.sequences.rows = MagicMock(spec_set=SequencesDataAPI) self.sequences.data = MagicMock(spec_set=SequencesDataAPI) diff --git a/tests/tests_unit/test_api/test_streams.py b/tests/tests_unit/test_api/test_streams.py new file mode 100644 index 0000000000..5b92be9085 --- /dev/null +++ b/tests/tests_unit/test_api/test_streams.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest + +from cognite.client import ClientConfig, CogniteClient +from cognite.client.credentials import Token +from cognite.client.data_classes.streams import RecordsIngestResponse, StreamList, StreamWrite + + +@pytest.fixture +def client() -> CogniteClient: + return CogniteClient( + ClientConfig( + client_name="unit-streams", + project="test-proj", + credentials=Token("token"), + base_url="https://greenfield.cognitedata.com", + ) + ) + + +def test_streams_list_parses_items(client: CogniteClient) -> None: + sample = { + "items": [ + { + "externalId": "st1", + "createdTime": 10, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + ] + } + client.streams._get = MagicMock(return_value=MagicMock(json=lambda: sample)) + out = client.streams.list() + assert isinstance(out, StreamList) + assert out[0].external_id == "st1" + client.streams._get.assert_called_once_with("/streams") + + +def test_streams_retrieve_include_statistics_query(client: CogniteClient) -> None: + sample = { + "externalId": "st1", + "createdTime": 10, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + client.streams._get = MagicMock(return_value=MagicMock(json=lambda: sample)) + client.streams.retrieve("st1", include_statistics=True) + client.streams._get.assert_called_once_with("/streams/st1", params={"includeStatistics": "true"}) + + +def test_streams_create_posts_items(client: CogniteClient) -> None: + sample = { + "items": [ + { + "externalId": "st1", + "createdTime": 10, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + ] + } + client.streams._post = MagicMock(return_value=MagicMock(json=lambda: sample)) + w = StreamWrite("st1", {"template": {"name": "ImmutableTestStream"}}) + client.streams.create([w]) + client.streams._post.assert_called_once() + call_kw = client.streams._post.call_args + assert call_kw[0][0] == "/streams" + assert call_kw[1]["json"]["items"][0]["externalId"] == "st1" + + +def test_records_ingest_posts(client: CogniteClient) -> None: + client.streams.records._post = MagicMock(return_value=MagicMock(json=lambda: {})) + out = client.streams.records.ingest("my-stream", {"items": []}) + assert isinstance(out, RecordsIngestResponse) + client.streams.records._post.assert_called_once_with("/streams/my-stream/records", json={"items": []}) diff --git a/tests/tests_unit/test_api_client.py b/tests/tests_unit/test_api_client.py index 55b21f361c..bf0e003796 100644 --- a/tests/tests_unit/test_api_client.py +++ b/tests/tests_unit/test_api_client.py @@ -1377,6 +1377,15 @@ def test_is_retryable_resource_api_endpoints(self, api_client_with_token, method ("POST", "https://api.cognitedata.com/api/v1/projects/bla/ai/tools/documents/summarize", True), ("POST", "https://api.cognitedata.com/api/v1/projects/bla/ai/tools/documents/ask", True), ("POST", "https://api.cognitedata.com/api/v1/projects/bla/ai/tools/documents/task", False), + # ILA streams + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/delete", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/upsert", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/delete", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/filter", True), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/aggregate", True), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/sync", True), ] ), ) diff --git a/tests/tests_unit/test_data_classes/test_streams.py b/tests/tests_unit/test_data_classes/test_streams.py new file mode 100644 index 0000000000..45520c5fb6 --- /dev/null +++ b/tests/tests_unit/test_data_classes/test_streams.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +from cognite.client.data_classes.streams import ( + Record, + RecordsFilterResponse, + RecordsSyncResponse, + Stream, + StreamList, + StreamWrite, + SyncRecord, +) + + +def test_stream_roundtrip() -> None: + raw = { + "externalId": "s1", + "createdTime": 1, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0, "consumed": 0.5}, + }, + }, + } + s = Stream._load(raw) + back = s.dump(camel_case=True) + assert back["externalId"] == "s1" + assert back["settings"]["limits"]["maxRecordsTotal"]["provisioned"] == 1000.0 + + +def test_stream_list_load() -> None: + raw = { + "items": [ + { + "externalId": "s1", + "createdTime": 1, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + ] + } + lst = StreamList._load(raw["items"]) + assert len(lst) == 1 + assert lst[0].external_id == "s1" + + +def test_stream_write_dump() -> None: + w = StreamWrite("abc", {"template": {"name": "ImmutableTestStream"}}) + assert w.dump()["externalId"] == "abc" + assert w.dump()["settings"]["template"]["name"] == "ImmutableTestStream" + + +def test_record_load() -> None: + raw = { + "space": "sp", + "externalId": "r1", + "createdTime": 2, + "lastUpdatedTime": 3, + "properties": {"sp": {"c": {"p": 1}}}, + } + r = Record._load(raw) + assert r.space == "sp" + assert r.properties["sp"]["c"]["p"] == 1 + + +def test_records_filter_response() -> None: + raw = { + "items": [ + { + "space": "sp", + "externalId": "r1", + "createdTime": 1, + "lastUpdatedTime": 2, + "properties": {}, + } + ] + } + fr = RecordsFilterResponse._load(raw) + assert len(fr.items) == 1 + assert isinstance(fr.items[0], Record) + + +def test_records_sync_response() -> None: + raw = { + "items": [ + { + "space": "sp", + "externalId": "r1", + "createdTime": 1, + "lastUpdatedTime": 2, + "status": "created", + } + ], + "nextCursor": "c", + "hasNext": False, + } + sr = RecordsSyncResponse._load(raw) + assert sr.next_cursor == "c" + assert not sr.has_next + assert isinstance(sr.items[0], SyncRecord)