From e2f4158c8d8497c2fc7505346ac5fca843485b30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Wed, 25 Mar 2026 23:12:28 +0100 Subject: [PATCH 1/3] feat: add StreamsAPI for ILA streams and records Expose CogniteClient.streams with create/list/retrieve/delete, record ingest/upsert/delete/filter/aggregate/sync, and deprecated single-stream delete. Query params use string true/false for includeStatistics per API. Made-with: Cursor --- cognite/client/_api/streams.py | 87 +++++++++++++++++++++++++++++++ cognite/client/_cognite_client.py | 2 + cognite/client/testing.py | 3 ++ 3 files changed, 92 insertions(+) create mode 100644 cognite/client/_api/streams.py diff --git a/cognite/client/_api/streams.py b/cognite/client/_api/streams.py new file mode 100644 index 0000000000..959c6099d4 --- /dev/null +++ b/cognite/client/_api/streams.py @@ -0,0 +1,87 @@ +from __future__ import annotations + +from collections.abc import MutableSequence +from typing import TYPE_CHECKING, Any + +from cognite.client._api_client import APIClient +from cognite.client.utils._auxiliary import interpolate_and_url_encode + +if TYPE_CHECKING: + from cognite.client import CogniteClient + from cognite.client.config import ClientConfig + + +class StreamsAPI(APIClient): + """ILA Streams and Records APIs under ``/streams`` and ``/streams/{streamId}/records/...``.""" + + _RESOURCE_PATH = "/streams" + + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + + def _records_base(self, stream_external_id: str) -> str: + return interpolate_and_url_encode("/streams/{}/records", stream_external_id) + + # --- Streams (operationIds: createStream, listStreams, getStream, deleteStream, deleteStreams) --- + + def create(self, items: list[dict[str, Any]]) -> dict[str, Any]: + """Create stream(s). Body matches ``{"items": [StreamRequestItem, ...]}``.""" + res = self._post(self._RESOURCE_PATH, json={"items": items}) + return res.json() + + def list(self) -> dict[str, Any]: + """List streams in the project.""" + res = self._get(self._RESOURCE_PATH) + return res.json() + + def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> dict[str, Any]: + """Retrieve one stream.""" + path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id) + params: dict[str, Any] | None = None + if include_statistics is not None: + # API expects lowercase "true"/"false" query strings, not Python bool serialization. + params = {"includeStatistics": "true" if include_statistics else "false"} + res = self._get(path, params=params) + return res.json() + + def delete(self, items: MutableSequence[dict[str, Any]]) -> None: + """Delete streams via ``POST /streams/delete``.""" + self._post(f"{self._RESOURCE_PATH}/delete", json={"items": items}) + + def delete_deprecated(self, stream_external_id: str) -> dict[str, Any]: + """Deprecated ``DELETE /streams/{streamId}`` — API responds with **410**; raises ``CogniteAPIError``.""" + path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id) + res = self._delete(path) + return res.json() + + # --- Records (operationIds: ingestRecords, upsertRecords, deleteRecords, filterRecords, aggregateRecords, syncRecords) --- + + def ingest_records(self, stream_external_id: str, body: dict[str, Any]) -> dict[str, Any]: + """POST ``/streams/{id}/records``.""" + res = self._post(self._records_base(stream_external_id), json=body) + return res.json() + + def upsert_records(self, stream_external_id: str, body: dict[str, Any]) -> dict[str, Any]: + """POST ``.../records/upsert``.""" + res = self._post(self._records_base(stream_external_id) + "/upsert", json=body) + return res.json() + + def delete_records(self, stream_external_id: str, body: dict[str, Any]) -> dict[str, Any]: + """POST ``.../records/delete``.""" + res = self._post(self._records_base(stream_external_id) + "/delete", json=body) + return res.json() + + def filter_records(self, stream_external_id: str, body: dict[str, Any]) -> dict[str, Any]: + """POST ``.../records/filter``.""" + res = self._post(self._records_base(stream_external_id) + "/filter", json=body) + return res.json() + + def aggregate_records(self, stream_external_id: str, body: dict[str, Any]) -> dict[str, Any]: + """POST ``.../records/aggregate``.""" + res = self._post(self._records_base(stream_external_id) + "/aggregate", json=body) + return res.json() + + def sync_records(self, stream_external_id: str, body: dict[str, Any]) -> dict[str, Any]: + """POST ``.../records/sync``.""" + res = self._post(self._records_base(stream_external_id) + "/sync", json=body) + return res.json() diff --git a/cognite/client/_cognite_client.py b/cognite/client/_cognite_client.py index 6394854cb3..1e98bb296c 100644 --- a/cognite/client/_cognite_client.py +++ b/cognite/client/_cognite_client.py @@ -26,6 +26,7 @@ from cognite.client._api.relationships import RelationshipsAPI from cognite.client._api.sequences import SequencesAPI from cognite.client._api.simulators import SimulatorsAPI +from cognite.client._api.streams import StreamsAPI from cognite.client._api.templates import TemplatesAPI from cognite.client._api.three_d import ThreeDAPI from cognite.client._api.time_series import TimeSeriesAPI @@ -89,6 +90,7 @@ def __init__(self, config: ClientConfig | None = None) -> None: self.workflows = WorkflowAPI(self._config, self._API_VERSION, self) self.units = UnitAPI(self._config, self._API_VERSION, self) self.simulators = SimulatorsAPI(self._config, self._API_VERSION, self) + self.streams = StreamsAPI(self._config, self._API_VERSION, self) # APIs just using base_url: self._api_client = APIClient(self._config, api_version=None, cognite_client=self) diff --git a/cognite/client/testing.py b/cognite/client/testing.py index e964576b8c..f84fb398d4 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -69,6 +69,7 @@ from cognite.client._api.simulators.routine_revisions import SimulatorRoutineRevisionsAPI from cognite.client._api.simulators.routines import SimulatorRoutinesAPI from cognite.client._api.simulators.runs import SimulatorRunsAPI +from cognite.client._api.streams import StreamsAPI from cognite.client._api.synthetic_time_series import SyntheticDatapointsAPI from cognite.client._api.templates import ( TemplateGroupsAPI, @@ -182,6 +183,8 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.simulators.routines.revisions = MagicMock(spec_set=SimulatorRoutineRevisionsAPI) self.simulators.logs = MagicMock(spec_set=SimulatorLogsAPI) + self.streams = MagicMock(spec_set=StreamsAPI) + self.sequences = MagicMock(spec=SequencesAPI) self.sequences.rows = MagicMock(spec_set=SequencesDataAPI) self.sequences.data = MagicMock(spec_set=SequencesDataAPI) From 16795658c7e79cdd4b8269f38bd95647b49da9d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Wed, 25 Mar 2026 23:33:41 +0100 Subject: [PATCH 2/3] feat(streams): package layout, typed data classes, and POST retry policy - Replace single-file StreamsAPI with cognite.client._api.streams package and nested StreamsRecordsAPI for /streams/{id}/records/*. - Add data_classes.streams (Stream, Record*, response types, lists with mixins). - Register streams in non-retryable create/delete paths and explicit record write paths; keep filter/aggregate/sync POSTs retryable. - Extend CogniteClientMock and add unit tests for API and data classes. - Document retry behavior in TestRetryableEndpoints. Made-with: Cursor --- cognite/client/_api/streams.py | 87 ------- cognite/client/_api/streams/__init__.py | 96 ++++++++ cognite/client/_api/streams/records.py | 59 +++++ cognite/client/_api_client.py | 5 + cognite/client/data_classes/__init__.py | 36 +++ .../client/data_classes/streams/__init__.py | 41 ++++ cognite/client/data_classes/streams/stream.py | 192 ++++++++++++++++ .../data_classes/streams/stream_record.py | 216 ++++++++++++++++++ cognite/client/testing.py | 4 +- tests/tests_unit/test_api/test_streams.py | 100 ++++++++ tests/tests_unit/test_api_client.py | 9 + .../test_data_classes/test_streams.py | 110 +++++++++ 12 files changed, 867 insertions(+), 88 deletions(-) delete mode 100644 cognite/client/_api/streams.py create mode 100644 cognite/client/_api/streams/__init__.py create mode 100644 cognite/client/_api/streams/records.py create mode 100644 cognite/client/data_classes/streams/__init__.py create mode 100644 cognite/client/data_classes/streams/stream.py create mode 100644 cognite/client/data_classes/streams/stream_record.py create mode 100644 tests/tests_unit/test_api/test_streams.py create mode 100644 tests/tests_unit/test_data_classes/test_streams.py diff --git a/cognite/client/_api/streams.py b/cognite/client/_api/streams.py deleted file mode 100644 index 959c6099d4..0000000000 --- a/cognite/client/_api/streams.py +++ /dev/null @@ -1,87 +0,0 @@ -from __future__ import annotations - -from collections.abc import MutableSequence -from typing import TYPE_CHECKING, Any - -from cognite.client._api_client import APIClient -from cognite.client.utils._auxiliary import interpolate_and_url_encode - -if TYPE_CHECKING: - from cognite.client import CogniteClient - from cognite.client.config import ClientConfig - - -class StreamsAPI(APIClient): - """ILA Streams and Records APIs under ``/streams`` and ``/streams/{streamId}/records/...``.""" - - _RESOURCE_PATH = "/streams" - - def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: - super().__init__(config, api_version, cognite_client) - - def _records_base(self, stream_external_id: str) -> str: - return interpolate_and_url_encode("/streams/{}/records", stream_external_id) - - # --- Streams (operationIds: createStream, listStreams, getStream, deleteStream, deleteStreams) --- - - def create(self, items: list[dict[str, Any]]) -> dict[str, Any]: - """Create stream(s). Body matches ``{"items": [StreamRequestItem, ...]}``.""" - res = self._post(self._RESOURCE_PATH, json={"items": items}) - return res.json() - - def list(self) -> dict[str, Any]: - """List streams in the project.""" - res = self._get(self._RESOURCE_PATH) - return res.json() - - def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> dict[str, Any]: - """Retrieve one stream.""" - path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id) - params: dict[str, Any] | None = None - if include_statistics is not None: - # API expects lowercase "true"/"false" query strings, not Python bool serialization. - params = {"includeStatistics": "true" if include_statistics else "false"} - res = self._get(path, params=params) - return res.json() - - def delete(self, items: MutableSequence[dict[str, Any]]) -> None: - """Delete streams via ``POST /streams/delete``.""" - self._post(f"{self._RESOURCE_PATH}/delete", json={"items": items}) - - def delete_deprecated(self, stream_external_id: str) -> dict[str, Any]: - """Deprecated ``DELETE /streams/{streamId}`` — API responds with **410**; raises ``CogniteAPIError``.""" - path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id) - res = self._delete(path) - return res.json() - - # --- Records (operationIds: ingestRecords, upsertRecords, deleteRecords, filterRecords, aggregateRecords, syncRecords) --- - - def ingest_records(self, stream_external_id: str, body: dict[str, Any]) -> dict[str, Any]: - """POST ``/streams/{id}/records``.""" - res = self._post(self._records_base(stream_external_id), json=body) - return res.json() - - def upsert_records(self, stream_external_id: str, body: dict[str, Any]) -> dict[str, Any]: - """POST ``.../records/upsert``.""" - res = self._post(self._records_base(stream_external_id) + "/upsert", json=body) - return res.json() - - def delete_records(self, stream_external_id: str, body: dict[str, Any]) -> dict[str, Any]: - """POST ``.../records/delete``.""" - res = self._post(self._records_base(stream_external_id) + "/delete", json=body) - return res.json() - - def filter_records(self, stream_external_id: str, body: dict[str, Any]) -> dict[str, Any]: - """POST ``.../records/filter``.""" - res = self._post(self._records_base(stream_external_id) + "/filter", json=body) - return res.json() - - def aggregate_records(self, stream_external_id: str, body: dict[str, Any]) -> dict[str, Any]: - """POST ``.../records/aggregate``.""" - res = self._post(self._records_base(stream_external_id) + "/aggregate", json=body) - return res.json() - - def sync_records(self, stream_external_id: str, body: dict[str, Any]) -> dict[str, Any]: - """POST ``.../records/sync``.""" - res = self._post(self._records_base(stream_external_id) + "/sync", json=body) - return res.json() diff --git a/cognite/client/_api/streams/__init__.py b/cognite/client/_api/streams/__init__.py new file mode 100644 index 0000000000..9e274e4173 --- /dev/null +++ b/cognite/client/_api/streams/__init__.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +from collections.abc import MutableSequence, Sequence +from typing import TYPE_CHECKING, Any + +from cognite.client._api.streams.records import StreamsRecordsAPI +from cognite.client._api_client import APIClient +from cognite.client.data_classes.streams.stream import ( + Stream, + StreamDeleteItem, + StreamList, + StreamWrite, +) +from cognite.client.data_classes.streams.stream_record import ( + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, +) +from cognite.client.utils._auxiliary import interpolate_and_url_encode + +if TYPE_CHECKING: + from cognite.client import CogniteClient + from cognite.client.config import ClientConfig + + +def _dump_write_item(obj: StreamWrite | dict[str, Any]) -> dict[str, Any]: + if isinstance(obj, dict): + return obj + return obj.dump() + + +def _dump_delete_item(obj: StreamDeleteItem | dict[str, Any]) -> dict[str, Any]: + if isinstance(obj, dict): + return obj + return obj.dump() + + +class StreamsAPI(APIClient): + """ILA Streams API (``/streams``) and nested :class:`StreamsRecordsAPI` (``/streams/{id}/records``).""" + + _RESOURCE_PATH = "/streams" + + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + self.records = StreamsRecordsAPI(config, api_version, cognite_client) + + def create(self, items: Sequence[StreamWrite | dict[str, Any]]) -> StreamList: + """`Create streams `_.""" + res = self._post(self._RESOURCE_PATH, json={"items": [_dump_write_item(i) for i in items]}) + return StreamList._load(res.json()["items"], cognite_client=self._cognite_client) + + def list(self) -> StreamList: + """`List streams `_ in the project.""" + res = self._get(self._RESOURCE_PATH) + return StreamList._load(res.json()["items"], cognite_client=self._cognite_client) + + def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream: + """`Retrieve a stream `_.""" + path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id) + params: dict[str, Any] | None = None + if include_statistics is not None: + params = {"includeStatistics": "true" if include_statistics else "false"} + res = self._get(path, params=params) + return Stream._load(res.json(), cognite_client=self._cognite_client) + + def delete(self, items: MutableSequence[StreamDeleteItem | dict[str, Any]]) -> None: + """`Delete streams `_ (POST).""" + self._post(f"{self._RESOURCE_PATH}/delete", json={"items": [_dump_delete_item(i) for i in items]}) + + def delete_deprecated(self, stream_external_id: str) -> dict[str, Any]: + """Deprecated ``DELETE`` stream — may yield **410**; see API docs.""" + path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id) + res = self._delete(path) + return res.json() + + # --- Backwards-compatible names (delegate to :attr:`records`) --- + + def ingest_records(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: + return self.records.ingest(stream_external_id, body) + + def upsert_records(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: + return self.records.upsert(stream_external_id, body) + + def delete_records(self, stream_external_id: str, body: dict[str, Any]) -> RecordsDeleteResponse: + return self.records.delete(stream_external_id, body) + + def filter_records(self, stream_external_id: str, body: dict[str, Any]) -> RecordsFilterResponse: + return self.records.filter(stream_external_id, body) + + def aggregate_records(self, stream_external_id: str, body: dict[str, Any]) -> RecordsAggregateResponse: + return self.records.aggregate(stream_external_id, body) + + def sync_records(self, stream_external_id: str, body: dict[str, Any]) -> RecordsSyncResponse: + return self.records.sync(stream_external_id, body) diff --git a/cognite/client/_api/streams/records.py b/cognite/client/_api/streams/records.py new file mode 100644 index 0000000000..fa0deb7a3d --- /dev/null +++ b/cognite/client/_api/streams/records.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from cognite.client._api_client import APIClient +from cognite.client.data_classes.streams.stream_record import ( + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, +) +from cognite.client.utils._auxiliary import interpolate_and_url_encode + +if TYPE_CHECKING: + from cognite.client import CogniteClient + from cognite.client.config import ClientConfig + + +class StreamsRecordsAPI(APIClient): + """ILA record operations under ``/streams/{streamId}/records/...``.""" + + _RESOURCE_PATH = "/streams" + + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + + def _records_base(self, stream_external_id: str) -> str: + return interpolate_and_url_encode("/streams/{}/records", stream_external_id) + + def ingest(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: + """`Ingest records `_ into a stream.""" + res = self._post(self._records_base(stream_external_id), json=body) + return RecordsIngestResponse._load(res.json(), cognite_client=self._cognite_client) + + def upsert(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: + """`Upsert records `_ in a mutable stream.""" + res = self._post(self._records_base(stream_external_id) + "/upsert", json=body) + return RecordsIngestResponse._load(res.json(), cognite_client=self._cognite_client) + + def delete(self, stream_external_id: str, body: dict[str, Any]) -> RecordsDeleteResponse: + """`Delete records `_ from a mutable stream.""" + res = self._post(self._records_base(stream_external_id) + "/delete", json=body) + return RecordsDeleteResponse._load(res.json(), cognite_client=self._cognite_client) + + def filter(self, stream_external_id: str, body: dict[str, Any]) -> RecordsFilterResponse: + """`Filter records `_.""" + res = self._post(self._records_base(stream_external_id) + "/filter", json=body) + return RecordsFilterResponse._load(res.json(), cognite_client=self._cognite_client) + + def aggregate(self, stream_external_id: str, body: dict[str, Any]) -> RecordsAggregateResponse: + """`Aggregate over records `_.""" + res = self._post(self._records_base(stream_external_id) + "/aggregate", json=body) + return RecordsAggregateResponse._load(res.json(), cognite_client=self._cognite_client) + + def sync(self, stream_external_id: str, body: dict[str, Any]) -> RecordsSyncResponse: + """`Sync records `_ (cursor-based read).""" + res = self._post(self._records_base(stream_external_id) + "/sync", json=body) + return RecordsSyncResponse._load(res.json(), cognite_client=self._cognite_client) diff --git a/cognite/client/_api_client.py b/cognite/client/_api_client.py index d61ab8270f..7afc6bff7f 100644 --- a/cognite/client/_api_client.py +++ b/cognite/client/_api_client.py @@ -104,6 +104,7 @@ class APIClient: "raw/dbs/[^/]+/tables$", "relationships", "sequences", + "streams", "simulators", "simulators/models", "simulators/models/revisions", @@ -129,6 +130,10 @@ class APIClient: "transformations/cancel", "transformations/notifications", "transformations/run", + # ILA streams: write record batches (create/delete stream use __NON_RETRYABLE "streams"; read POSTs stay retryable) + r"streams/[^/]+/records", + r"streams/[^/]+/records/upsert", + r"streams/[^/]+/records/delete", ) ) ) diff --git a/cognite/client/data_classes/__init__.py b/cognite/client/data_classes/__init__.py index 5fcaf1796e..fc50add41a 100644 --- a/cognite/client/data_classes/__init__.py +++ b/cognite/client/data_classes/__init__.py @@ -203,6 +203,25 @@ GeometryFilter, TimestampRange, ) +from cognite.client.data_classes.streams import ( + Record, + RecordList, + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, + Stream, + StreamDeleteItem, + StreamLifecycleSettings, + StreamLimit, + StreamLimitSettings, + StreamList, + StreamSettings, + StreamWrite, + SyncRecord, + SyncRecordList, +) from cognite.client.data_classes.templates import ( ConstantResolver, Source, @@ -469,6 +488,13 @@ "LatestDatapointQuery", "OidcCredentials", "RawTable", + "Record", + "RecordList", + "RecordsAggregateResponse", + "RecordsDeleteResponse", + "RecordsFilterResponse", + "RecordsIngestResponse", + "RecordsSyncResponse", "Relationship", "RelationshipFilter", "RelationshipList", @@ -506,7 +532,17 @@ "Source", "SourceFile", "StatusCode", + "Stream", + "StreamDeleteItem", + "StreamLifecycleSettings", + "StreamLimit", + "StreamLimitSettings", + "StreamList", + "StreamSettings", + "StreamWrite", "SubworkflowTaskParameters", + "SyncRecord", + "SyncRecordList", "Table", "TableList", "TableWrite", diff --git a/cognite/client/data_classes/streams/__init__.py b/cognite/client/data_classes/streams/__init__.py new file mode 100644 index 0000000000..ed98581cc9 --- /dev/null +++ b/cognite/client/data_classes/streams/__init__.py @@ -0,0 +1,41 @@ +from cognite.client.data_classes.streams.stream import ( + Stream, + StreamDeleteItem, + StreamLifecycleSettings, + StreamLimit, + StreamLimitSettings, + StreamList, + StreamSettings, + StreamWrite, +) +from cognite.client.data_classes.streams.stream_record import ( + Record, + RecordList, + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, + SyncRecord, + SyncRecordList, +) + +__all__ = [ + "Record", + "RecordList", + "RecordsAggregateResponse", + "RecordsDeleteResponse", + "RecordsFilterResponse", + "RecordsIngestResponse", + "RecordsSyncResponse", + "Stream", + "StreamDeleteItem", + "StreamLifecycleSettings", + "StreamLimit", + "StreamLimitSettings", + "StreamList", + "StreamSettings", + "StreamWrite", + "SyncRecord", + "SyncRecordList", +] diff --git a/cognite/client/data_classes/streams/stream.py b/cognite/client/data_classes/streams/stream.py new file mode 100644 index 0000000000..ab400bfb2a --- /dev/null +++ b/cognite/client/data_classes/streams/stream.py @@ -0,0 +1,192 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from typing_extensions import Self + +from cognite.client.data_classes._base import ( + CogniteObject, + CogniteResource, + CogniteResourceList, + ExternalIDTransformerMixin, +) +from cognite.client.utils._text import convert_all_keys_to_camel_case + +if TYPE_CHECKING: + from cognite.client import CogniteClient + + +class StreamLimit(CogniteObject): + """Numeric limit bucket for a stream (provisioned / optionally consumed).""" + + def __init__(self, provisioned: float, consumed: float | None = None) -> None: + self.provisioned = provisioned + self.consumed = consumed + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + provisioned=resource["provisioned"], + consumed=resource.get("consumed"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"provisioned": self.provisioned} + if self.consumed is not None: + out["consumed"] = self.consumed + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamLifecycleSettings(CogniteObject): + """Lifecycle metadata for a stream (human-readable).""" + + def __init__( + self, + retained_after_soft_delete: str, + data_deleted_after: str | None = None, + ) -> None: + self.retained_after_soft_delete = retained_after_soft_delete + self.data_deleted_after = data_deleted_after + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + retained_after_soft_delete=resource["retainedAfterSoftDelete"], + data_deleted_after=resource.get("dataDeletedAfter"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"retained_after_soft_delete": self.retained_after_soft_delete} + if self.data_deleted_after is not None: + out["data_deleted_after"] = self.data_deleted_after + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamLimitSettings(CogniteObject): + """Provisioned/consumed limits for a stream.""" + + def __init__( + self, + max_records_total: StreamLimit, + max_giga_bytes_total: StreamLimit, + max_filtering_interval: str | None = None, + ) -> None: + self.max_records_total = max_records_total + self.max_giga_bytes_total = max_giga_bytes_total + self.max_filtering_interval = max_filtering_interval + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + max_records_total=StreamLimit._load(resource["maxRecordsTotal"], cognite_client=cognite_client), + max_giga_bytes_total=StreamLimit._load(resource["maxGigaBytesTotal"], cognite_client=cognite_client), + max_filtering_interval=resource.get("maxFilteringInterval"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "max_records_total": self.max_records_total.dump(camel_case=camel_case), + "max_giga_bytes_total": self.max_giga_bytes_total.dump(camel_case=camel_case), + } + if self.max_filtering_interval is not None: + out["max_filtering_interval"] = self.max_filtering_interval + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamSettings(CogniteObject): + """Read model for stream settings (lifecycle + limits).""" + + def __init__(self, lifecycle: StreamLifecycleSettings, limits: StreamLimitSettings) -> None: + self.lifecycle = lifecycle + self.limits = limits + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + lifecycle=StreamLifecycleSettings._load(resource["lifecycle"], cognite_client=cognite_client), + limits=StreamLimitSettings._load(resource["limits"], cognite_client=cognite_client), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "lifecycle": self.lifecycle.dump(camel_case=camel_case), + "limits": self.limits.dump(camel_case=camel_case), + } + + +class Stream(CogniteResource): + """A stream (ILA ``StreamResponseItem``).""" + + def __init__( + self, + external_id: str, + created_time: int, + created_from_template: str, + type: str, + settings: StreamSettings, + cognite_client: CogniteClient | None = None, + ) -> None: + self.external_id = external_id + self.created_time = created_time + self.created_from_template = created_from_template + self.type = type + self.settings = settings + self._cognite_client = cognite_client + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + external_id=resource["externalId"], + created_time=resource["createdTime"], + created_from_template=resource["createdFromTemplate"], + type=resource["type"], + settings=StreamSettings._load(resource["settings"], cognite_client=cognite_client), + cognite_client=cognite_client, + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = { + "external_id": self.external_id, + "created_time": self.created_time, + "created_from_template": self.created_from_template, + "type": self.type, + "settings": self.settings.dump(camel_case=camel_case), + } + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamList(CogniteResourceList[Stream], ExternalIDTransformerMixin): + """List of streams (``StreamResponse.items``).""" + + _RESOURCE = Stream + + +class StreamWrite(CogniteObject): + """Request item for creating a stream (``StreamRequestItem``).""" + + def __init__(self, external_id: str, settings: dict[str, Any]) -> None: + self.external_id = external_id + self.settings = settings + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(external_id=resource["externalId"], settings=resource["settings"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = {"external_id": self.external_id, "settings": self.settings} + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamDeleteItem(CogniteObject): + """Identifier for ``POST /streams/delete``.""" + + def __init__(self, external_id: str) -> None: + self.external_id = external_id + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(external_id=resource["externalId"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = {"external_id": self.external_id} + return convert_all_keys_to_camel_case(out) if camel_case else out diff --git a/cognite/client/data_classes/streams/stream_record.py b/cognite/client/data_classes/streams/stream_record.py new file mode 100644 index 0000000000..e09df553a3 --- /dev/null +++ b/cognite/client/data_classes/streams/stream_record.py @@ -0,0 +1,216 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from typing_extensions import Self + +from cognite.client.data_classes._base import ( + CogniteObject, + CogniteResource, + CogniteResourceList, + ExternalIDTransformerMixin, +) +from cognite.client.utils._text import convert_all_keys_to_camel_case + +if TYPE_CHECKING: + from cognite.client import CogniteClient + + +class Record(CogniteResource): + """A record returned from filter (ILA ``Record``).""" + + def __init__( + self, + space: str, + external_id: str, + created_time: int, + last_updated_time: int, + properties: dict[str, Any], + cognite_client: CogniteClient | None = None, + ) -> None: + self.space = space + self.external_id = external_id + self.created_time = created_time + self.last_updated_time = last_updated_time + self.properties = properties + self._cognite_client = cognite_client + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + properties=resource.get("properties", {}), + cognite_client=cognite_client, + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = { + "space": self.space, + "external_id": self.external_id, + "created_time": self.created_time, + "last_updated_time": self.last_updated_time, + "properties": self.properties, + } + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordList(CogniteResourceList[Record], ExternalIDTransformerMixin): + _RESOURCE = Record + + +class SyncRecord(CogniteResource): + """Record entry from sync (ILA ``SyncRecord``).""" + + def __init__( + self, + space: str, + external_id: str, + created_time: int, + last_updated_time: int, + status: str, + properties: dict[str, Any] | None = None, + cognite_client: CogniteClient | None = None, + ) -> None: + self.space = space + self.external_id = external_id + self.created_time = created_time + self.last_updated_time = last_updated_time + self.status = status + self.properties = properties + self._cognite_client = cognite_client + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + status=resource["status"], + properties=resource.get("properties"), + cognite_client=cognite_client, + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "space": self.space, + "external_id": self.external_id, + "created_time": self.created_time, + "last_updated_time": self.last_updated_time, + "status": self.status, + } + if self.properties is not None: + out["properties"] = self.properties + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class SyncRecordList(CogniteResourceList[SyncRecord], ExternalIDTransformerMixin): + _RESOURCE = SyncRecord + + +class RecordsFilterResponse(CogniteObject): + """``POST .../records/filter`` response.""" + + def __init__(self, items: RecordList, typing: dict[str, Any] | None = None) -> None: + self.items = items + self.typing = typing + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + items = RecordList._load(resource.get("items", []), cognite_client=cognite_client) + return cls(items=items, typing=resource.get("typing")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"items": self.items.dump(camel_case=camel_case)} + if self.typing is not None: + out["typing"] = self.typing + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordsSyncResponse(CogniteObject): + """``POST .../records/sync`` response.""" + + def __init__( + self, + items: SyncRecordList, + next_cursor: str, + has_next: bool, + typing: dict[str, Any] | None = None, + ) -> None: + self.items = items + self.next_cursor = next_cursor + self.has_next = has_next + self.typing = typing + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + items = SyncRecordList._load(resource.get("items", []), cognite_client=cognite_client) + return cls( + items=items, + next_cursor=resource["nextCursor"], + has_next=resource["hasNext"], + typing=resource.get("typing"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "items": self.items.dump(camel_case=camel_case), + "nextCursor": self.next_cursor, + "hasNext": self.has_next, + } + if self.typing is not None: + out["typing"] = self.typing + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordsAggregateResponse(CogniteObject): + """``POST .../records/aggregate`` response.""" + + def __init__(self, aggregates: dict[str, Any], typing: dict[str, Any] | None = None) -> None: + self.aggregates = aggregates + self.typing = typing + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(aggregates=resource.get("aggregates", {}), typing=resource.get("typing")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"aggregates": self.aggregates} + if self.typing is not None: + out["typing"] = self.typing + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordsDeleteResponse(CogniteObject): + """``POST .../records/delete`` — empty object means full success.""" + + def __init__(self, data: dict[str, Any]) -> None: + self._data = data + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(data=resource) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return convert_all_keys_to_camel_case(self._data) if camel_case else self._data + + +class RecordsIngestResponse(CogniteObject): + """``POST .../records`` (ingest/upsert) JSON body — often ``{}`` on success.""" + + def __init__(self, data: dict[str, Any]) -> None: + self._data = data + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(data=resource) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return convert_all_keys_to_camel_case(self._data) if camel_case else self._data + + @property + def is_empty_success(self) -> bool: + return self._data == {} diff --git a/cognite/client/testing.py b/cognite/client/testing.py index f84fb398d4..c4ff63cb1d 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -70,6 +70,7 @@ from cognite.client._api.simulators.routines import SimulatorRoutinesAPI from cognite.client._api.simulators.runs import SimulatorRunsAPI from cognite.client._api.streams import StreamsAPI +from cognite.client._api.streams.records import StreamsRecordsAPI from cognite.client._api.synthetic_time_series import SyntheticDatapointsAPI from cognite.client._api.templates import ( TemplateGroupsAPI, @@ -183,7 +184,8 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.simulators.routines.revisions = MagicMock(spec_set=SimulatorRoutineRevisionsAPI) self.simulators.logs = MagicMock(spec_set=SimulatorLogsAPI) - self.streams = MagicMock(spec_set=StreamsAPI) + self.streams = MagicMock(spec=StreamsAPI) + self.streams.records = MagicMock(spec_set=StreamsRecordsAPI) self.sequences = MagicMock(spec=SequencesAPI) self.sequences.rows = MagicMock(spec_set=SequencesDataAPI) diff --git a/tests/tests_unit/test_api/test_streams.py b/tests/tests_unit/test_api/test_streams.py new file mode 100644 index 0000000000..be91a58441 --- /dev/null +++ b/tests/tests_unit/test_api/test_streams.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest + +from cognite.client import ClientConfig, CogniteClient +from cognite.client.credentials import Token +from cognite.client.data_classes.streams import StreamList, StreamWrite + + +@pytest.fixture +def client() -> CogniteClient: + return CogniteClient( + ClientConfig( + client_name="unit-streams", + project="test-proj", + credentials=Token("token"), + base_url="https://greenfield.cognitedata.com", + ) + ) + + +def test_streams_list_parses_items(client: CogniteClient) -> None: + sample = { + "items": [ + { + "externalId": "st1", + "createdTime": 10, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + ] + } + client.streams._get = MagicMock(return_value=MagicMock(json=lambda: sample)) + out = client.streams.list() + assert isinstance(out, StreamList) + assert out[0].external_id == "st1" + client.streams._get.assert_called_once_with("/streams") + + +def test_streams_retrieve_include_statistics_query(client: CogniteClient) -> None: + sample = { + "externalId": "st1", + "createdTime": 10, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + client.streams._get = MagicMock(return_value=MagicMock(json=lambda: sample)) + client.streams.retrieve("st1", include_statistics=True) + client.streams._get.assert_called_once_with( + "/streams/st1", params={"includeStatistics": "true"} + ) + + +def test_streams_create_posts_items(client: CogniteClient) -> None: + sample = { + "items": [ + { + "externalId": "st1", + "createdTime": 10, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + ] + } + client.streams._post = MagicMock(return_value=MagicMock(json=lambda: sample)) + w = StreamWrite("st1", {"template": {"name": "ImmutableTestStream"}}) + client.streams.create([w]) + client.streams._post.assert_called_once() + call_kw = client.streams._post.call_args + assert call_kw[0][0] == "/streams" + assert call_kw[1]["json"]["items"][0]["externalId"] == "st1" + + +def test_records_ingest_delegates(client: CogniteClient) -> None: + client.streams.records.ingest = MagicMock() + client.streams.ingest_records("my-stream", {"items": []}) + client.streams.records.ingest.assert_called_once_with("my-stream", {"items": []}) diff --git a/tests/tests_unit/test_api_client.py b/tests/tests_unit/test_api_client.py index 55b21f361c..bf0e003796 100644 --- a/tests/tests_unit/test_api_client.py +++ b/tests/tests_unit/test_api_client.py @@ -1377,6 +1377,15 @@ def test_is_retryable_resource_api_endpoints(self, api_client_with_token, method ("POST", "https://api.cognitedata.com/api/v1/projects/bla/ai/tools/documents/summarize", True), ("POST", "https://api.cognitedata.com/api/v1/projects/bla/ai/tools/documents/ask", True), ("POST", "https://api.cognitedata.com/api/v1/projects/bla/ai/tools/documents/task", False), + # ILA streams + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/delete", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/upsert", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/delete", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/filter", True), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/aggregate", True), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/sync", True), ] ), ) diff --git a/tests/tests_unit/test_data_classes/test_streams.py b/tests/tests_unit/test_data_classes/test_streams.py new file mode 100644 index 0000000000..45520c5fb6 --- /dev/null +++ b/tests/tests_unit/test_data_classes/test_streams.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +from cognite.client.data_classes.streams import ( + Record, + RecordsFilterResponse, + RecordsSyncResponse, + Stream, + StreamList, + StreamWrite, + SyncRecord, +) + + +def test_stream_roundtrip() -> None: + raw = { + "externalId": "s1", + "createdTime": 1, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0, "consumed": 0.5}, + }, + }, + } + s = Stream._load(raw) + back = s.dump(camel_case=True) + assert back["externalId"] == "s1" + assert back["settings"]["limits"]["maxRecordsTotal"]["provisioned"] == 1000.0 + + +def test_stream_list_load() -> None: + raw = { + "items": [ + { + "externalId": "s1", + "createdTime": 1, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + ] + } + lst = StreamList._load(raw["items"]) + assert len(lst) == 1 + assert lst[0].external_id == "s1" + + +def test_stream_write_dump() -> None: + w = StreamWrite("abc", {"template": {"name": "ImmutableTestStream"}}) + assert w.dump()["externalId"] == "abc" + assert w.dump()["settings"]["template"]["name"] == "ImmutableTestStream" + + +def test_record_load() -> None: + raw = { + "space": "sp", + "externalId": "r1", + "createdTime": 2, + "lastUpdatedTime": 3, + "properties": {"sp": {"c": {"p": 1}}}, + } + r = Record._load(raw) + assert r.space == "sp" + assert r.properties["sp"]["c"]["p"] == 1 + + +def test_records_filter_response() -> None: + raw = { + "items": [ + { + "space": "sp", + "externalId": "r1", + "createdTime": 1, + "lastUpdatedTime": 2, + "properties": {}, + } + ] + } + fr = RecordsFilterResponse._load(raw) + assert len(fr.items) == 1 + assert isinstance(fr.items[0], Record) + + +def test_records_sync_response() -> None: + raw = { + "items": [ + { + "space": "sp", + "externalId": "r1", + "createdTime": 1, + "lastUpdatedTime": 2, + "status": "created", + } + ], + "nextCursor": "c", + "hasNext": False, + } + sr = RecordsSyncResponse._load(raw) + assert sr.next_cursor == "c" + assert not sr.has_next + assert isinstance(sr.items[0], SyncRecord) From a76d8f7d3ce70c1640d3672b38e3f1dda57b9e28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Wed, 25 Mar 2026 23:36:57 +0100 Subject: [PATCH 3/3] refactor(streams): drop compat aliases and legacy DELETE helper Remove ingest_records/*_records delegators from StreamsAPI; use client.streams.records.* only. Remove delete_deprecated (unreleased API). Update unit test to assert records.ingest directly. Made-with: Cursor --- cognite/client/_api/streams/__init__.py | 33 ----------------------- tests/tests_unit/test_api/test_streams.py | 15 +++++------ 2 files changed, 7 insertions(+), 41 deletions(-) diff --git a/cognite/client/_api/streams/__init__.py b/cognite/client/_api/streams/__init__.py index 9e274e4173..76768a44fd 100644 --- a/cognite/client/_api/streams/__init__.py +++ b/cognite/client/_api/streams/__init__.py @@ -11,13 +11,6 @@ StreamList, StreamWrite, ) -from cognite.client.data_classes.streams.stream_record import ( - RecordsAggregateResponse, - RecordsDeleteResponse, - RecordsFilterResponse, - RecordsIngestResponse, - RecordsSyncResponse, -) from cognite.client.utils._auxiliary import interpolate_and_url_encode if TYPE_CHECKING: @@ -68,29 +61,3 @@ def retrieve(self, stream_external_id: str, include_statistics: bool | None = No def delete(self, items: MutableSequence[StreamDeleteItem | dict[str, Any]]) -> None: """`Delete streams `_ (POST).""" self._post(f"{self._RESOURCE_PATH}/delete", json={"items": [_dump_delete_item(i) for i in items]}) - - def delete_deprecated(self, stream_external_id: str) -> dict[str, Any]: - """Deprecated ``DELETE`` stream — may yield **410**; see API docs.""" - path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id) - res = self._delete(path) - return res.json() - - # --- Backwards-compatible names (delegate to :attr:`records`) --- - - def ingest_records(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: - return self.records.ingest(stream_external_id, body) - - def upsert_records(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: - return self.records.upsert(stream_external_id, body) - - def delete_records(self, stream_external_id: str, body: dict[str, Any]) -> RecordsDeleteResponse: - return self.records.delete(stream_external_id, body) - - def filter_records(self, stream_external_id: str, body: dict[str, Any]) -> RecordsFilterResponse: - return self.records.filter(stream_external_id, body) - - def aggregate_records(self, stream_external_id: str, body: dict[str, Any]) -> RecordsAggregateResponse: - return self.records.aggregate(stream_external_id, body) - - def sync_records(self, stream_external_id: str, body: dict[str, Any]) -> RecordsSyncResponse: - return self.records.sync(stream_external_id, body) diff --git a/tests/tests_unit/test_api/test_streams.py b/tests/tests_unit/test_api/test_streams.py index be91a58441..5b92be9085 100644 --- a/tests/tests_unit/test_api/test_streams.py +++ b/tests/tests_unit/test_api/test_streams.py @@ -6,7 +6,7 @@ from cognite.client import ClientConfig, CogniteClient from cognite.client.credentials import Token -from cognite.client.data_classes.streams import StreamList, StreamWrite +from cognite.client.data_classes.streams import RecordsIngestResponse, StreamList, StreamWrite @pytest.fixture @@ -62,9 +62,7 @@ def test_streams_retrieve_include_statistics_query(client: CogniteClient) -> Non } client.streams._get = MagicMock(return_value=MagicMock(json=lambda: sample)) client.streams.retrieve("st1", include_statistics=True) - client.streams._get.assert_called_once_with( - "/streams/st1", params={"includeStatistics": "true"} - ) + client.streams._get.assert_called_once_with("/streams/st1", params={"includeStatistics": "true"}) def test_streams_create_posts_items(client: CogniteClient) -> None: @@ -94,7 +92,8 @@ def test_streams_create_posts_items(client: CogniteClient) -> None: assert call_kw[1]["json"]["items"][0]["externalId"] == "st1" -def test_records_ingest_delegates(client: CogniteClient) -> None: - client.streams.records.ingest = MagicMock() - client.streams.ingest_records("my-stream", {"items": []}) - client.streams.records.ingest.assert_called_once_with("my-stream", {"items": []}) +def test_records_ingest_posts(client: CogniteClient) -> None: + client.streams.records._post = MagicMock(return_value=MagicMock(json=lambda: {})) + out = client.streams.records.ingest("my-stream", {"items": []}) + assert isinstance(out, RecordsIngestResponse) + client.streams.records._post.assert_called_once_with("/streams/my-stream/records", json={"items": []})