Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions cognite/client/_api/streams/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from __future__ import annotations

from collections.abc import MutableSequence, Sequence
from typing import TYPE_CHECKING, Any

from cognite.client._api.streams.records import StreamsRecordsAPI
from cognite.client._api_client import APIClient
from cognite.client.data_classes.streams.stream import (
Stream,
StreamDeleteItem,
StreamList,
StreamWrite,
)
from cognite.client.utils._auxiliary import interpolate_and_url_encode

if TYPE_CHECKING:
from cognite.client import CogniteClient
from cognite.client.config import ClientConfig


def _dump_write_item(obj: StreamWrite | dict[str, Any]) -> dict[str, Any]:
if isinstance(obj, dict):
return obj
return obj.dump()


def _dump_delete_item(obj: StreamDeleteItem | dict[str, Any]) -> dict[str, Any]:
if isinstance(obj, dict):
return obj
return obj.dump()


class StreamsAPI(APIClient):
"""ILA Streams API (``/streams``) and nested :class:`StreamsRecordsAPI` (``/streams/{id}/records``)."""

_RESOURCE_PATH = "/streams"

def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None:
super().__init__(config, api_version, cognite_client)
self.records = StreamsRecordsAPI(config, api_version, cognite_client)

def create(self, items: Sequence[StreamWrite | dict[str, Any]]) -> StreamList:
"""`Create streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/createStream>`_."""
res = self._post(self._RESOURCE_PATH, json={"items": [_dump_write_item(i) for i in items]})
return StreamList._load(res.json()["items"], cognite_client=self._cognite_client)

def list(self) -> StreamList:
"""`List streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/listStreams>`_ in the project."""
res = self._get(self._RESOURCE_PATH)
return StreamList._load(res.json()["items"], cognite_client=self._cognite_client)

def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream:
"""`Retrieve a stream <https://api-docs.cognite.com/20230101/tag/Streams/operation/getStream>`_."""
path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id)
params: dict[str, Any] | None = None
if include_statistics is not None:
params = {"includeStatistics": "true" if include_statistics else "false"}
res = self._get(path, params=params)
return Stream._load(res.json(), cognite_client=self._cognite_client)

def delete(self, items: MutableSequence[StreamDeleteItem | dict[str, Any]]) -> None:
"""`Delete streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/deleteStreams>`_ (POST)."""
self._post(f"{self._RESOURCE_PATH}/delete", json={"items": [_dump_delete_item(i) for i in items]})
59 changes: 59 additions & 0 deletions cognite/client/_api/streams/records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from cognite.client._api_client import APIClient
from cognite.client.data_classes.streams.stream_record import (
RecordsAggregateResponse,
RecordsDeleteResponse,
RecordsFilterResponse,
RecordsIngestResponse,
RecordsSyncResponse,
)
from cognite.client.utils._auxiliary import interpolate_and_url_encode

if TYPE_CHECKING:
from cognite.client import CogniteClient
from cognite.client.config import ClientConfig


class StreamsRecordsAPI(APIClient):
"""ILA record operations under ``/streams/{streamId}/records/...``."""

_RESOURCE_PATH = "/streams"

def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None:
super().__init__(config, api_version, cognite_client)

def _records_base(self, stream_external_id: str) -> str:
return interpolate_and_url_encode("/streams/{}/records", stream_external_id)

def ingest(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse:
"""`Ingest records <https://api-docs.cognite.com/20230101/tag/Records/operation/ingestRecords>`_ into a stream."""
res = self._post(self._records_base(stream_external_id), json=body)
return RecordsIngestResponse._load(res.json(), cognite_client=self._cognite_client)

def upsert(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse:
"""`Upsert records <https://api-docs.cognite.com/20230101/tag/Records/operation/upsertRecords>`_ in a mutable stream."""
res = self._post(self._records_base(stream_external_id) + "/upsert", json=body)
return RecordsIngestResponse._load(res.json(), cognite_client=self._cognite_client)

def delete(self, stream_external_id: str, body: dict[str, Any]) -> RecordsDeleteResponse:
"""`Delete records <https://api-docs.cognite.com/20230101/tag/Records/operation/deleteRecords>`_ from a mutable stream."""
res = self._post(self._records_base(stream_external_id) + "/delete", json=body)
return RecordsDeleteResponse._load(res.json(), cognite_client=self._cognite_client)

def filter(self, stream_external_id: str, body: dict[str, Any]) -> RecordsFilterResponse:
"""`Filter records <https://api-docs.cognite.com/20230101/tag/Records/operation/filterRecords>`_."""
res = self._post(self._records_base(stream_external_id) + "/filter", json=body)
return RecordsFilterResponse._load(res.json(), cognite_client=self._cognite_client)

def aggregate(self, stream_external_id: str, body: dict[str, Any]) -> RecordsAggregateResponse:
"""`Aggregate over records <https://api-docs.cognite.com/20230101/tag/Records/operation/aggregateRecords>`_."""
res = self._post(self._records_base(stream_external_id) + "/aggregate", json=body)
return RecordsAggregateResponse._load(res.json(), cognite_client=self._cognite_client)

def sync(self, stream_external_id: str, body: dict[str, Any]) -> RecordsSyncResponse:
"""`Sync records <https://api-docs.cognite.com/20230101/tag/Records/operation/syncRecords>`_ (cursor-based read)."""
res = self._post(self._records_base(stream_external_id) + "/sync", json=body)
return RecordsSyncResponse._load(res.json(), cognite_client=self._cognite_client)
5 changes: 5 additions & 0 deletions cognite/client/_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class APIClient:
"raw/dbs/[^/]+/tables$",
"relationships",
"sequences",
"streams",
"simulators",
"simulators/models",
"simulators/models/revisions",
Expand All @@ -129,6 +130,10 @@ class APIClient:
"transformations/cancel",
"transformations/notifications",
"transformations/run",
# ILA streams: write record batches (create/delete stream use __NON_RETRYABLE "streams"; read POSTs stay retryable)
r"streams/[^/]+/records",
r"streams/[^/]+/records/upsert",
r"streams/[^/]+/records/delete",
)
)
)
Expand Down
2 changes: 2 additions & 0 deletions cognite/client/_cognite_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from cognite.client._api.relationships import RelationshipsAPI
from cognite.client._api.sequences import SequencesAPI
from cognite.client._api.simulators import SimulatorsAPI
from cognite.client._api.streams import StreamsAPI
from cognite.client._api.templates import TemplatesAPI
from cognite.client._api.three_d import ThreeDAPI
from cognite.client._api.time_series import TimeSeriesAPI
Expand Down Expand Up @@ -89,6 +90,7 @@ def __init__(self, config: ClientConfig | None = None) -> None:
self.workflows = WorkflowAPI(self._config, self._API_VERSION, self)
self.units = UnitAPI(self._config, self._API_VERSION, self)
self.simulators = SimulatorsAPI(self._config, self._API_VERSION, self)
self.streams = StreamsAPI(self._config, self._API_VERSION, self)
# APIs just using base_url:
self._api_client = APIClient(self._config, api_version=None, cognite_client=self)

Expand Down
36 changes: 36 additions & 0 deletions cognite/client/data_classes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,25 @@
GeometryFilter,
TimestampRange,
)
from cognite.client.data_classes.streams import (
Record,
RecordList,
RecordsAggregateResponse,
RecordsDeleteResponse,
RecordsFilterResponse,
RecordsIngestResponse,
RecordsSyncResponse,
Stream,
StreamDeleteItem,
StreamLifecycleSettings,
StreamLimit,
StreamLimitSettings,
StreamList,
StreamSettings,
StreamWrite,
SyncRecord,
SyncRecordList,
)
from cognite.client.data_classes.templates import (
ConstantResolver,
Source,
Expand Down Expand Up @@ -469,6 +488,13 @@
"LatestDatapointQuery",
"OidcCredentials",
"RawTable",
"Record",
"RecordList",
"RecordsAggregateResponse",
"RecordsDeleteResponse",
"RecordsFilterResponse",
"RecordsIngestResponse",
"RecordsSyncResponse",
"Relationship",
"RelationshipFilter",
"RelationshipList",
Expand Down Expand Up @@ -506,7 +532,17 @@
"Source",
"SourceFile",
"StatusCode",
"Stream",
"StreamDeleteItem",
"StreamLifecycleSettings",
"StreamLimit",
"StreamLimitSettings",
"StreamList",
"StreamSettings",
"StreamWrite",
"SubworkflowTaskParameters",
"SyncRecord",
"SyncRecordList",
"Table",
"TableList",
"TableWrite",
Expand Down
41 changes: 41 additions & 0 deletions cognite/client/data_classes/streams/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from cognite.client.data_classes.streams.stream import (
Stream,
StreamDeleteItem,
StreamLifecycleSettings,
StreamLimit,
StreamLimitSettings,
StreamList,
StreamSettings,
StreamWrite,
)
from cognite.client.data_classes.streams.stream_record import (
Record,
RecordList,
RecordsAggregateResponse,
RecordsDeleteResponse,
RecordsFilterResponse,
RecordsIngestResponse,
RecordsSyncResponse,
SyncRecord,
SyncRecordList,
)

__all__ = [
"Record",
"RecordList",
"RecordsAggregateResponse",
"RecordsDeleteResponse",
"RecordsFilterResponse",
"RecordsIngestResponse",
"RecordsSyncResponse",
"Stream",
"StreamDeleteItem",
"StreamLifecycleSettings",
"StreamLimit",
"StreamLimitSettings",
"StreamList",
"StreamSettings",
"StreamWrite",
"SyncRecord",
"SyncRecordList",
]
Loading
Loading