From d9bb6a47876ae6e5b7d02d70fd3545a242c6bb66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20Schj=C3=B8lberg?= Date: Wed, 25 Mar 2026 17:34:35 +0100 Subject: [PATCH 01/14] feat: add events-to-records migration command --- .../record_property_mapping.py | 55 ++++++++++ .../_cdf_tk/commands/_migrate/conversion.py | 65 +++++++++++ .../_cdf_tk/commands/_migrate/data_classes.py | 22 ++++ cognite_toolkit/_cdf_tk/feature_flags.py | 4 + .../test_migration_cmd/test_conversion.py | 103 ++++++++++++++++++ .../test_record_property_mapping_yaml.py | 101 +++++++++++++++++ 6 files changed, 350 insertions(+) create mode 100644 cognite_toolkit/_cdf_tk/client/resource_classes/record_property_mapping.py create mode 100644 tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_record_property_mapping_yaml.py diff --git a/cognite_toolkit/_cdf_tk/client/resource_classes/record_property_mapping.py b/cognite_toolkit/_cdf_tk/client/resource_classes/record_property_mapping.py new file mode 100644 index 0000000000..9442e653c0 --- /dev/null +++ b/cognite_toolkit/_cdf_tk/client/resource_classes/record_property_mapping.py @@ -0,0 +1,55 @@ +from typing import Literal, Self + +from pydantic import ValidationError, model_validator + +from cognite_toolkit._cdf_tk.client._resource_base import BaseModelObject +from cognite_toolkit._cdf_tk.client.identifiers import ContainerId +from cognite_toolkit._cdf_tk.exceptions import ToolkitValueError +from cognite_toolkit._cdf_tk.utils.file import read_yaml_content + + +class RecordPropertyMapping(BaseModelObject): + """One target container + property map for records migration.""" + + external_id: str + container_id: ContainerId + property_mapping: dict[str, str] + + +class RecordMigrationConfig(BaseModelObject): + """Root YAML model for events-to-records: target stream, resource type, and named mappings.""" + + stream_external_id: str + resource_type: Literal["event"] + mappings: list[RecordPropertyMapping] + + @model_validator(mode="after") + def _mapping_external_ids_unique(self) -> Self: + seen: set[str] = set() + for mapping in self.mappings: + if mapping.external_id in seen: + raise ValueError( + f"Duplicate externalId in record property mappings: {mapping.external_id!r}" + ) + seen.add(mapping.external_id) + return self + + +def load_record_migration_config_yaml(yaml_content: str) -> RecordMigrationConfig: + """Parse YAML into ``RecordMigrationConfig`` for events-to-records.""" + content = read_yaml_content(yaml_content) + if not isinstance(content, dict): + raise ToolkitValueError( + f"Expected a YAML mapping with streamExternalId, resourceType, and mappings; got {type(content).__name__}." + ) + if "mappings" not in content: + raise ToolkitValueError( + "Missing required key 'mappings'. Top-level keys must include streamExternalId, resourceType, and mappings." + ) + try: + config = RecordMigrationConfig._load(content) + except ValidationError as exc: + raise ToolkitValueError(f"Invalid record migration config: {exc}") from exc + if not config.mappings: + raise ToolkitValueError("mappings must contain at least one record property mapping.") + return config diff --git a/cognite_toolkit/_cdf_tk/commands/_migrate/conversion.py b/cognite_toolkit/_cdf_tk/commands/_migrate/conversion.py index 1ea3bc9b8d..888fafcaf7 100644 --- a/cognite_toolkit/_cdf_tk/commands/_migrate/conversion.py +++ b/cognite_toolkit/_cdf_tk/commands/_migrate/conversion.py @@ -41,6 +41,8 @@ from cognite_toolkit._cdf_tk.client.resource_classes.event import EventResponse from cognite_toolkit._cdf_tk.client.resource_classes.filemetadata import FileMetadataResponse from cognite_toolkit._cdf_tk.client.resource_classes.migration import AssetCentricId +from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import RecordPropertyMapping +from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordRequest, RecordSource from cognite_toolkit._cdf_tk.client.resource_classes.resource_view_mapping import ResourceViewMappingRequest from cognite_toolkit._cdf_tk.client.resource_classes.timeseries import TimeSeriesResponse from cognite_toolkit._cdf_tk.client.resource_classes.view_to_view_mapping import ViewToViewMapping @@ -305,6 +307,69 @@ def asset_centric_to_dm( return instance, issue +def asset_centric_to_record( + resource: AssetCentricResourceExtended, + instance_id: NodeId, + record_mapping: RecordPropertyMapping, + container_properties: dict[str, ContainerPropertyDefinition], + direct_relation_cache: DirectRelationCache, +) -> tuple[RecordRequest | None, ConversionIssue]: + """Convert an asset-centric resource to a record request. + + Args: + resource: The asset-centric resource to convert. + instance_id: The target record space and external_id. + record_mapping: The record property mapping defining the target container and property mapping. + container_properties: Property definitions from the target container (for type validation/coercion). + direct_relation_cache: Cache for direct relation references. + + Returns: + A tuple of the RecordRequest (or None on failure) and any conversion issues. + """ + resource_type = _lookup_resource_type(resource) + dumped = resource.dump() + try: + id_ = dumped.pop("id") + except KeyError as e: + raise ValueError("Resource must have an 'id' field.") from e + if not isinstance(id_, int): + raise TypeError(f"Resource 'id' field must be an int, got {type(id_)}.") + dumped.pop("dataSetId", None) + dumped.pop("externalId", None) + + issue = ConversionIssue( + id=str(AssetCentricId(resource_type=resource_type, id_=id_)), + asset_centric_id=AssetCentricId(resource_type=resource_type, id_=id_), + instance_id=NodeId(space=instance_id.space, external_id=instance_id.external_id), + ) + + properties = create_properties( + dumped, + container_properties, + record_mapping.property_mapping, + resource_type, + issue=issue, + direct_relation_cache=direct_relation_cache, + container_id=record_mapping.container_id, + ) + + sources: list[RecordSource] = [] + if properties: + sources.append( + RecordSource( + source=record_mapping.container_id, + properties=properties, + ) + ) + + record = RecordRequest( + space=instance_id.space, + external_id=instance_id.external_id, + sources=sources, + ) + return record, issue + + def _lookup_resource_type(resource_type: AssetCentricResourceExtended) -> AssetCentricTypeExtended: if isinstance(resource_type, AssetResponse): return "asset" diff --git a/cognite_toolkit/_cdf_tk/commands/_migrate/data_classes.py b/cognite_toolkit/_cdf_tk/commands/_migrate/data_classes.py index e348105a5a..8202082607 100644 --- a/cognite_toolkit/_cdf_tk/commands/_migrate/data_classes.py +++ b/cognite_toolkit/_cdf_tk/commands/_migrate/data_classes.py @@ -15,6 +15,7 @@ from cognite_toolkit._cdf_tk.client.identifiers import EdgeUntypedId, InstanceId, InternalId, NodeUntypedId from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling import ViewId from cognite_toolkit._cdf_tk.client.resource_classes.migration import AssetCentricId +from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import RecordPropertyMapping from cognite_toolkit._cdf_tk.commands._migrate.default_mappings import ( ASSET_ANNOTATIONS_ID, FILE_ANNOTATIONS_ID, @@ -187,6 +188,27 @@ class EventMapping(MigrationMapping): resource_type: Literal["event"] = "event" instance_id: NodeUntypedId + def get_record_property_mapping(self, mappings_by_external_id: dict[str, RecordPropertyMapping]) -> RecordPropertyMapping: + """Resolve the record property mapping for this row (events-to-records). + + Uses ``ingestionMapping`` (the same CSV field as instance migration). The value must match + ``externalId`` of an entry in the YAML ``mappings`` list. If the YAML defines only one mapping, + this column may be omitted. + """ + if self.ingestion_mapping is not None: + try: + return mappings_by_external_id[self.ingestion_mapping] + except KeyError as e: + raise ToolkitValueError( + f"Unknown mapping externalId {self.ingestion_mapping!r}. " + f"Defined in target YAML: {sorted(mappings_by_external_id)}." + ) from e + if len(mappings_by_external_id) == 1: + return next(iter(mappings_by_external_id.values())) + raise ToolkitValueError( + "ingestionMapping is required in the CSV when the target YAML defines multiple mappings." + ) + class TimeSeriesMapping(MigrationMapping): resource_type: Literal["timeseries"] = "timeseries" diff --git a/cognite_toolkit/_cdf_tk/feature_flags.py b/cognite_toolkit/_cdf_tk/feature_flags.py index 30005236c8..d657d64534 100644 --- a/cognite_toolkit/_cdf_tk/feature_flags.py +++ b/cognite_toolkit/_cdf_tk/feature_flags.py @@ -54,6 +54,10 @@ class Flags(Enum): visible=False, description="Enables the infield-migrate command for migrating Infield configs to regular Toolkit configs", ) + RECORDS_MIGRATE = FlagMetadata( + visible=False, + description="Enables the 'events-to-records' migration command", + ) INFIELD_DEV = FlagMetadata( visible=False, description="For InField developers: sets the schema space for the InFieldOnCDM data model to test migration before it becomes a system model.", diff --git a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_conversion.py b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_conversion.py index 2f2369cc41..0f50c7d5ce 100644 --- a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_conversion.py +++ b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_conversion.py @@ -14,6 +14,7 @@ from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling import ( BooleanProperty, ConstraintOrIndexState, + ContainerPropertyDefinition, ContainerId, ContainerPropertyDefinition, DateProperty, @@ -40,6 +41,8 @@ from cognite_toolkit._cdf_tk.client.resource_classes.event import EventResponse from cognite_toolkit._cdf_tk.client.resource_classes.filemetadata import FileMetadataResponse from cognite_toolkit._cdf_tk.client.resource_classes.migration import AssetCentricId, CreatedSourceSystem +from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import RecordPropertyMapping +from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordRequest from cognite_toolkit._cdf_tk.client.resource_classes.resource_view_mapping import ResourceViewMappingResponse from cognite_toolkit._cdf_tk.client.resource_classes.timeseries import TimeSeriesResponse from cognite_toolkit._cdf_tk.client.resource_classes.view_to_view_mapping import ViewToViewMapping @@ -50,6 +53,7 @@ DirectRelationCache, EdgeOtherSide, asset_centric_to_dm, + asset_centric_to_record, convert_container_properties, convert_edges, create_properties, @@ -1672,3 +1676,102 @@ def test_convert_edges( assert results.container_properties == expected_relations assert [edge.model_dump() for edge in results.edges] == [edge.model_dump() for edge in expected_edges] assert results.errors == expected_errors + + +class TestAssetCentricToRecord: + CONTAINER_ID = ContainerId(space="my_stream_space", external_id="EventContainer") + INSTANCE_ID = NodeId(space="my_space", external_id="event_42") + + def _make_record_mapping(self, property_mapping: dict[str, str]) -> RecordPropertyMapping: + return RecordPropertyMapping( + external_id="my_event_mapping", + container_id=self.CONTAINER_ID, + property_mapping=property_mapping, + ) + + def _make_container_properties(self) -> dict[str, ContainerPropertyDefinition]: + return { + "description": ContainerPropertyDefinition( + type=TextProperty(), nullable=True, immutable=False, auto_increment=False + ), + "startTime": ContainerPropertyDefinition( + type=TimestampProperty(), nullable=True, immutable=False, auto_increment=False + ), + } + + @staticmethod + def _make_direct_relation_cache() -> DirectRelationCache: + return DirectRelationCache(MagicMock(spec=ToolkitClient)) + + def test_basic_event_conversion(self) -> None: + event = EventResponse( + id=42, + description="An event", + start_time=0, + created_time=1, + last_updated_time=2, + ) + record_mapping = self._make_record_mapping({"description": "description", "startTime": "startTime"}) + container_properties = self._make_container_properties() + + record, issue = asset_centric_to_record( + event, + instance_id=self.INSTANCE_ID, + record_mapping=record_mapping, + container_properties=container_properties, + direct_relation_cache=self._make_direct_relation_cache(), + ) + + assert record is not None + assert record.space == self.INSTANCE_ID.space + assert record.external_id == self.INSTANCE_ID.external_id + assert len(record.sources) == 1 + assert record.sources[0].source == self.CONTAINER_ID + assert record.sources[0].properties["description"] == "An event" + assert record.sources[0].properties["startTime"] == "1970-01-01T00:00:00.000+00:00" + + def test_id_and_metadata_fields_are_excluded(self) -> None: + """id, externalId, dataSetId must not appear in mapped properties.""" + event = EventResponse( + id=99, + external_id="evt_99", + data_set_id=100, + description="Test", + created_time=0, + last_updated_time=0, + ) + # Map all possible fields so we can check excluded ones don't appear + record_mapping = self._make_record_mapping({"description": "description"}) + container_properties = self._make_container_properties() + + record, _ = asset_centric_to_record( + event, + instance_id=self.INSTANCE_ID, + record_mapping=record_mapping, + container_properties=container_properties, + direct_relation_cache=self._make_direct_relation_cache(), + ) + + assert record is not None + assert len(record.sources) == 1 + # Verify the dumped record body does not contain id/externalId/dataSetId + body = record.sources[0].properties + assert "id" not in body + assert "externalId" not in body + assert "dataSetId" not in body + + def test_no_mappable_properties_yields_empty_sources(self) -> None: + event = EventResponse(id=1, description="Event", created_time=0, last_updated_time=0) + record_mapping = self._make_record_mapping({}) # empty mapping + container_properties = self._make_container_properties() + + record, _ = asset_centric_to_record( + event, + instance_id=self.INSTANCE_ID, + record_mapping=record_mapping, + container_properties=container_properties, + direct_relation_cache=self._make_direct_relation_cache(), + ) + + assert record is not None + assert record.sources == [] diff --git a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_record_property_mapping_yaml.py b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_record_property_mapping_yaml.py new file mode 100644 index 0000000000..a2498e2d03 --- /dev/null +++ b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_record_property_mapping_yaml.py @@ -0,0 +1,101 @@ +import textwrap + +import pytest + +from cognite_toolkit._cdf_tk.client.identifiers import ContainerId +from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling import NodeId +from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import ( + RecordPropertyMapping, + load_record_migration_config_yaml, +) +from cognite_toolkit._cdf_tk.commands._migrate.data_classes import EventMapping +from cognite_toolkit._cdf_tk.exceptions import ToolkitValueError + + +def test_load_wrapped_config() -> None: + yaml = textwrap.dedent( + """ + streamExternalId: my-stream + resourceType: event + mappings: + - externalId: evt-records + containerId: + space: dm + externalId: MyContainer + propertyMapping: + description: description + """ + ) + config = load_record_migration_config_yaml(yaml) + assert config.stream_external_id == "my-stream" + assert config.resource_type == "event" + assert len(config.mappings) == 1 + assert config.mappings[0].external_id == "evt-records" + + +def test_root_list_rejected() -> None: + with pytest.raises(ToolkitValueError, match="Expected a YAML mapping"): + load_record_migration_config_yaml("[]") + + +def test_missing_mappings_key_rejected() -> None: + with pytest.raises(ToolkitValueError, match="Missing required key 'mappings'"): + load_record_migration_config_yaml( + "streamExternalId: s\nresourceType: event\n" + ) + + +def test_empty_mappings_rejected() -> None: + with pytest.raises(ToolkitValueError, match="at least one"): + load_record_migration_config_yaml( + "streamExternalId: s\nresourceType: event\nmappings: []\n" + ) + + +def test_non_event_resource_type_rejected() -> None: + with pytest.raises(ToolkitValueError, match="Invalid record migration config"): + load_record_migration_config_yaml( + "streamExternalId: s\nresourceType: asset\nmappings:\n" + " - externalId: x\n" + " containerId: {space: dm, externalId: C}\n" + " propertyMapping: {}\n" + ) + + +def test_duplicate_external_id_rejected() -> None: + yaml = textwrap.dedent( + """ + streamExternalId: s + resourceType: event + mappings: + - externalId: dup + containerId: {space: s, externalId: c} + propertyMapping: {} + - externalId: dup + containerId: {space: s, externalId: c} + propertyMapping: {} + """ + ) + with pytest.raises(ToolkitValueError, match="Duplicate externalId"): + load_record_migration_config_yaml(yaml) + + +def test_event_mapping_resolves_ingestion_mapping_for_records() -> None: + m1 = RecordPropertyMapping( + external_id="a", + container_id=ContainerId(space="s", external_id="c1"), + property_mapping={}, + ) + m2 = RecordPropertyMapping( + external_id="b", + container_id=ContainerId(space="s", external_id="c2"), + property_mapping={}, + ) + by_id = {m.external_id: m for m in [m1, m2]} + row = EventMapping( + resource_type="event", + instance_id=NodeId(space="sp", external_id="e1"), + id=1, + ingestion_mapping="b", + ) + assert row.get_record_property_mapping(by_id).container_id.external_id == "c2" From 169142d0c7679f598393214c38d662bc64738c1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20Schj=C3=B8lberg?= Date: Thu, 26 Mar 2026 09:24:41 +0100 Subject: [PATCH 02/14] Refactoring --- .../record_property_mapping.py | 8 ++++++- .../_cdf_tk/commands/_migrate/data_classes.py | 23 +++++++++++-------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/cognite_toolkit/_cdf_tk/client/resource_classes/record_property_mapping.py b/cognite_toolkit/_cdf_tk/client/resource_classes/record_property_mapping.py index 9442e653c0..5081efb925 100644 --- a/cognite_toolkit/_cdf_tk/client/resource_classes/record_property_mapping.py +++ b/cognite_toolkit/_cdf_tk/client/resource_classes/record_property_mapping.py @@ -22,9 +22,10 @@ class RecordMigrationConfig(BaseModelObject): stream_external_id: str resource_type: Literal["event"] mappings: list[RecordPropertyMapping] + default_mapping: str | None = None @model_validator(mode="after") - def _mapping_external_ids_unique(self) -> Self: + def _validate_mappings(self) -> Self: seen: set[str] = set() for mapping in self.mappings: if mapping.external_id in seen: @@ -32,6 +33,11 @@ def _mapping_external_ids_unique(self) -> Self: f"Duplicate externalId in record property mappings: {mapping.external_id!r}" ) seen.add(mapping.external_id) + if self.default_mapping is not None and self.default_mapping not in seen: + raise ValueError( + f"defaultMapping {self.default_mapping!r} does not match any mapping externalId. " + f"Available: {sorted(seen)}." + ) return self diff --git a/cognite_toolkit/_cdf_tk/commands/_migrate/data_classes.py b/cognite_toolkit/_cdf_tk/commands/_migrate/data_classes.py index 8202082607..e049c1deb0 100644 --- a/cognite_toolkit/_cdf_tk/commands/_migrate/data_classes.py +++ b/cognite_toolkit/_cdf_tk/commands/_migrate/data_classes.py @@ -188,25 +188,28 @@ class EventMapping(MigrationMapping): resource_type: Literal["event"] = "event" instance_id: NodeUntypedId - def get_record_property_mapping(self, mappings_by_external_id: dict[str, RecordPropertyMapping]) -> RecordPropertyMapping: + def get_record_property_mapping( + self, + mappings_by_external_id: dict[str, RecordPropertyMapping], + default_mapping: str | None = None, + ) -> RecordPropertyMapping: """Resolve the record property mapping for this row (events-to-records). - Uses ``ingestionMapping`` (the same CSV field as instance migration). The value must match - ``externalId`` of an entry in the YAML ``mappings`` list. If the YAML defines only one mapping, - this column may be omitted. + Resolution order: + 1. ``ingestionMapping`` column in the CSV row. + 2. ``defaultMapping`` field from the YAML config. """ - if self.ingestion_mapping is not None: + resolved = self.ingestion_mapping or default_mapping + if resolved is not None: try: - return mappings_by_external_id[self.ingestion_mapping] + return mappings_by_external_id[resolved] except KeyError as e: raise ToolkitValueError( - f"Unknown mapping externalId {self.ingestion_mapping!r}. " + f"Unknown mapping externalId {resolved!r}. " f"Defined in target YAML: {sorted(mappings_by_external_id)}." ) from e - if len(mappings_by_external_id) == 1: - return next(iter(mappings_by_external_id.values())) raise ToolkitValueError( - "ingestionMapping is required in the CSV when the target YAML defines multiple mappings." + "ingestionMapping is required in the CSV (or set defaultMapping in the YAML config)." ) From d99e80a48544c804a9e4cb8a8807c8559e9dd7fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20Schj=C3=B8lberg?= Date: Thu, 26 Mar 2026 11:32:19 +0100 Subject: [PATCH 03/14] Refactoring --- .../record_property_mapping.py | 6 +-- .../_cdf_tk/commands/_migrate/conversion.py | 6 +-- .../_cdf_tk/commands/_migrate/data_classes.py | 25 ------------- .../_cdf_tk/commands/_migrate/issues.py | 7 +++- .../test_migration_cmd/test_conversion.py | 2 - .../test_record_property_mapping_yaml.py | 37 ++----------------- 6 files changed, 14 insertions(+), 69 deletions(-) diff --git a/cognite_toolkit/_cdf_tk/client/resource_classes/record_property_mapping.py b/cognite_toolkit/_cdf_tk/client/resource_classes/record_property_mapping.py index 5081efb925..2ce6a0ab33 100644 --- a/cognite_toolkit/_cdf_tk/client/resource_classes/record_property_mapping.py +++ b/cognite_toolkit/_cdf_tk/client/resource_classes/record_property_mapping.py @@ -29,9 +29,7 @@ def _validate_mappings(self) -> Self: seen: set[str] = set() for mapping in self.mappings: if mapping.external_id in seen: - raise ValueError( - f"Duplicate externalId in record property mappings: {mapping.external_id!r}" - ) + raise ValueError(f"Duplicate externalId in record property mappings: {mapping.external_id!r}") seen.add(mapping.external_id) if self.default_mapping is not None and self.default_mapping not in seen: raise ValueError( @@ -42,7 +40,7 @@ def _validate_mappings(self) -> Self: def load_record_migration_config_yaml(yaml_content: str) -> RecordMigrationConfig: - """Parse YAML into ``RecordMigrationConfig`` for events-to-records.""" + """Parse YAML into `RecordMigrationConfig` for events-to-records.""" content = read_yaml_content(yaml_content) if not isinstance(content, dict): raise ToolkitValueError( diff --git a/cognite_toolkit/_cdf_tk/commands/_migrate/conversion.py b/cognite_toolkit/_cdf_tk/commands/_migrate/conversion.py index 888fafcaf7..f40f3056df 100644 --- a/cognite_toolkit/_cdf_tk/commands/_migrate/conversion.py +++ b/cognite_toolkit/_cdf_tk/commands/_migrate/conversion.py @@ -42,7 +42,7 @@ from cognite_toolkit._cdf_tk.client.resource_classes.filemetadata import FileMetadataResponse from cognite_toolkit._cdf_tk.client.resource_classes.migration import AssetCentricId from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import RecordPropertyMapping -from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordRequest, RecordSource +from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordId, RecordRequest, RecordSource from cognite_toolkit._cdf_tk.client.resource_classes.resource_view_mapping import ResourceViewMappingRequest from cognite_toolkit._cdf_tk.client.resource_classes.timeseries import TimeSeriesResponse from cognite_toolkit._cdf_tk.client.resource_classes.view_to_view_mapping import ViewToViewMapping @@ -309,7 +309,7 @@ def asset_centric_to_dm( def asset_centric_to_record( resource: AssetCentricResourceExtended, - instance_id: NodeId, + instance_id: RecordId, record_mapping: RecordPropertyMapping, container_properties: dict[str, ContainerPropertyDefinition], direct_relation_cache: DirectRelationCache, @@ -340,7 +340,7 @@ def asset_centric_to_record( issue = ConversionIssue( id=str(AssetCentricId(resource_type=resource_type, id_=id_)), asset_centric_id=AssetCentricId(resource_type=resource_type, id_=id_), - instance_id=NodeId(space=instance_id.space, external_id=instance_id.external_id), + instance_id=instance_id, ) properties = create_properties( diff --git a/cognite_toolkit/_cdf_tk/commands/_migrate/data_classes.py b/cognite_toolkit/_cdf_tk/commands/_migrate/data_classes.py index e049c1deb0..e348105a5a 100644 --- a/cognite_toolkit/_cdf_tk/commands/_migrate/data_classes.py +++ b/cognite_toolkit/_cdf_tk/commands/_migrate/data_classes.py @@ -15,7 +15,6 @@ from cognite_toolkit._cdf_tk.client.identifiers import EdgeUntypedId, InstanceId, InternalId, NodeUntypedId from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling import ViewId from cognite_toolkit._cdf_tk.client.resource_classes.migration import AssetCentricId -from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import RecordPropertyMapping from cognite_toolkit._cdf_tk.commands._migrate.default_mappings import ( ASSET_ANNOTATIONS_ID, FILE_ANNOTATIONS_ID, @@ -188,30 +187,6 @@ class EventMapping(MigrationMapping): resource_type: Literal["event"] = "event" instance_id: NodeUntypedId - def get_record_property_mapping( - self, - mappings_by_external_id: dict[str, RecordPropertyMapping], - default_mapping: str | None = None, - ) -> RecordPropertyMapping: - """Resolve the record property mapping for this row (events-to-records). - - Resolution order: - 1. ``ingestionMapping`` column in the CSV row. - 2. ``defaultMapping`` field from the YAML config. - """ - resolved = self.ingestion_mapping or default_mapping - if resolved is not None: - try: - return mappings_by_external_id[resolved] - except KeyError as e: - raise ToolkitValueError( - f"Unknown mapping externalId {resolved!r}. " - f"Defined in target YAML: {sorted(mappings_by_external_id)}." - ) from e - raise ToolkitValueError( - "ingestionMapping is required in the CSV (or set defaultMapping in the YAML config)." - ) - class TimeSeriesMapping(MigrationMapping): resource_type: Literal["timeseries"] = "timeseries" diff --git a/cognite_toolkit/_cdf_tk/commands/_migrate/issues.py b/cognite_toolkit/_cdf_tk/commands/_migrate/issues.py index 097e95d6cf..dbfe280f00 100644 --- a/cognite_toolkit/_cdf_tk/commands/_migrate/issues.py +++ b/cognite_toolkit/_cdf_tk/commands/_migrate/issues.py @@ -5,6 +5,7 @@ from cognite_toolkit._cdf_tk.client.identifiers import NodeUntypedId from cognite_toolkit._cdf_tk.client.resource_classes.migration import AssetCentricId +from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordId from cognite_toolkit._cdf_tk.storageio.logger import LogEntry @@ -160,7 +161,7 @@ class ConversionIssue(MigrationIssue): type: Literal["conversion"] = "conversion" asset_centric_id: AssetCentricId - instance_id: NodeUntypedId + instance_id: NodeUntypedId | RecordId missing_asset_centric_properties: list[str] = Field(default_factory=list) missing_instance_properties: list[str] = Field(default_factory=list) invalid_instance_property_types: list[InvalidPropertyDataType] = Field(default_factory=list) @@ -186,6 +187,10 @@ def serialize_asset_centric_id(self, asset_centric_id: AssetCentricId) -> dict[s "id": asset_centric_id.id_, } + @field_serializer("instance_id") + def serialize_instance_id(self, instance_id: NodeUntypedId | RecordId) -> dict[str, Any]: + return {"space": instance_id.space, "externalId": instance_id.external_id} + class WriteIssue(MigrationIssue): """Represents a write issue encountered during migration. diff --git a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_conversion.py b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_conversion.py index 0f50c7d5ce..6adaff8b8a 100644 --- a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_conversion.py +++ b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_conversion.py @@ -14,7 +14,6 @@ from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling import ( BooleanProperty, ConstraintOrIndexState, - ContainerPropertyDefinition, ContainerId, ContainerPropertyDefinition, DateProperty, @@ -42,7 +41,6 @@ from cognite_toolkit._cdf_tk.client.resource_classes.filemetadata import FileMetadataResponse from cognite_toolkit._cdf_tk.client.resource_classes.migration import AssetCentricId, CreatedSourceSystem from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import RecordPropertyMapping -from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordRequest from cognite_toolkit._cdf_tk.client.resource_classes.resource_view_mapping import ResourceViewMappingResponse from cognite_toolkit._cdf_tk.client.resource_classes.timeseries import TimeSeriesResponse from cognite_toolkit._cdf_tk.client.resource_classes.view_to_view_mapping import ViewToViewMapping diff --git a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_record_property_mapping_yaml.py b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_record_property_mapping_yaml.py index a2498e2d03..c5ef0fb978 100644 --- a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_record_property_mapping_yaml.py +++ b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_record_property_mapping_yaml.py @@ -2,13 +2,7 @@ import pytest -from cognite_toolkit._cdf_tk.client.identifiers import ContainerId -from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling import NodeId -from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import ( - RecordPropertyMapping, - load_record_migration_config_yaml, -) -from cognite_toolkit._cdf_tk.commands._migrate.data_classes import EventMapping +from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import load_record_migration_config_yaml from cognite_toolkit._cdf_tk.exceptions import ToolkitValueError @@ -40,16 +34,12 @@ def test_root_list_rejected() -> None: def test_missing_mappings_key_rejected() -> None: with pytest.raises(ToolkitValueError, match="Missing required key 'mappings'"): - load_record_migration_config_yaml( - "streamExternalId: s\nresourceType: event\n" - ) + load_record_migration_config_yaml("streamExternalId: s\nresourceType: event\n") def test_empty_mappings_rejected() -> None: with pytest.raises(ToolkitValueError, match="at least one"): - load_record_migration_config_yaml( - "streamExternalId: s\nresourceType: event\nmappings: []\n" - ) + load_record_migration_config_yaml("streamExternalId: s\nresourceType: event\nmappings: []\n") def test_non_event_resource_type_rejected() -> None: @@ -78,24 +68,3 @@ def test_duplicate_external_id_rejected() -> None: ) with pytest.raises(ToolkitValueError, match="Duplicate externalId"): load_record_migration_config_yaml(yaml) - - -def test_event_mapping_resolves_ingestion_mapping_for_records() -> None: - m1 = RecordPropertyMapping( - external_id="a", - container_id=ContainerId(space="s", external_id="c1"), - property_mapping={}, - ) - m2 = RecordPropertyMapping( - external_id="b", - container_id=ContainerId(space="s", external_id="c2"), - property_mapping={}, - ) - by_id = {m.external_id: m for m in [m1, m2]} - row = EventMapping( - resource_type="event", - instance_id=NodeId(space="sp", external_id="e1"), - id=1, - ingestion_mapping="b", - ) - assert row.get_record_property_mapping(by_id).container_id.external_id == "c2" From 5e499fe13e71b4e56401338800dc376e2a8d9694 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20Schj=C3=B8lberg?= Date: Thu, 26 Mar 2026 14:58:47 +0100 Subject: [PATCH 04/14] Clarify InstanceId semantics --- cognite_toolkit/_cdf_tk/commands/_migrate/issues.py | 4 ++-- cognite_toolkit/_cdf_tk/feature_flags.py | 4 ---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/cognite_toolkit/_cdf_tk/commands/_migrate/issues.py b/cognite_toolkit/_cdf_tk/commands/_migrate/issues.py index dbfe280f00..24905ca0a0 100644 --- a/cognite_toolkit/_cdf_tk/commands/_migrate/issues.py +++ b/cognite_toolkit/_cdf_tk/commands/_migrate/issues.py @@ -3,7 +3,7 @@ from pydantic import BaseModel, Field, field_serializer from pydantic.alias_generators import to_camel -from cognite_toolkit._cdf_tk.client.identifiers import NodeUntypedId +from cognite_toolkit._cdf_tk.client.identifiers import NodeId, NodeUntypedId from cognite_toolkit._cdf_tk.client.resource_classes.migration import AssetCentricId from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordId from cognite_toolkit._cdf_tk.storageio.logger import LogEntry @@ -188,7 +188,7 @@ def serialize_asset_centric_id(self, asset_centric_id: AssetCentricId) -> dict[s } @field_serializer("instance_id") - def serialize_instance_id(self, instance_id: NodeUntypedId | RecordId) -> dict[str, Any]: + def serialize_instance_id(self, instance_id: NodeId | RecordId) -> dict[str, Any]: return {"space": instance_id.space, "externalId": instance_id.external_id} diff --git a/cognite_toolkit/_cdf_tk/feature_flags.py b/cognite_toolkit/_cdf_tk/feature_flags.py index d657d64534..30005236c8 100644 --- a/cognite_toolkit/_cdf_tk/feature_flags.py +++ b/cognite_toolkit/_cdf_tk/feature_flags.py @@ -54,10 +54,6 @@ class Flags(Enum): visible=False, description="Enables the infield-migrate command for migrating Infield configs to regular Toolkit configs", ) - RECORDS_MIGRATE = FlagMetadata( - visible=False, - description="Enables the 'events-to-records' migration command", - ) INFIELD_DEV = FlagMetadata( visible=False, description="For InField developers: sets the schema space for the InFieldOnCDM data model to test migration before it becomes a system model.", From bfc20d06fc3e5c70b775f0fb170197d0babde94c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20Schj=C3=B8lberg?= Date: Thu, 26 Mar 2026 15:25:30 +0100 Subject: [PATCH 05/14] Refactor tests --- .../_cdf_tk/commands/_migrate/conversion.py | 13 ++--- .../_cdf_tk/commands/_migrate/issues.py | 2 + .../test_migration_cmd/test_conversion.py | 50 +++---------------- 3 files changed, 14 insertions(+), 51 deletions(-) diff --git a/cognite_toolkit/_cdf_tk/commands/_migrate/conversion.py b/cognite_toolkit/_cdf_tk/commands/_migrate/conversion.py index f40f3056df..014ce5b21a 100644 --- a/cognite_toolkit/_cdf_tk/commands/_migrate/conversion.py +++ b/cognite_toolkit/_cdf_tk/commands/_migrate/conversion.py @@ -353,19 +353,14 @@ def asset_centric_to_record( container_id=record_mapping.container_id, ) - sources: list[RecordSource] = [] - if properties: - sources.append( - RecordSource( - source=record_mapping.container_id, - properties=properties, - ) - ) + if not properties: + issue.no_mappable_properties = True + return None, issue record = RecordRequest( space=instance_id.space, external_id=instance_id.external_id, - sources=sources, + sources=[RecordSource(source=record_mapping.container_id, properties=properties)], ) return record, issue diff --git a/cognite_toolkit/_cdf_tk/commands/_migrate/issues.py b/cognite_toolkit/_cdf_tk/commands/_migrate/issues.py index 24905ca0a0..943e371e10 100644 --- a/cognite_toolkit/_cdf_tk/commands/_migrate/issues.py +++ b/cognite_toolkit/_cdf_tk/commands/_migrate/issues.py @@ -168,6 +168,7 @@ class ConversionIssue(MigrationIssue): failed_conversions: list[FailedConversion] = Field(default_factory=list) ignored_asset_centric_properties: list[str] = Field(default_factory=list) missing_instance_space: str | None = None + no_mappable_properties: bool = False @property def has_issues(self) -> bool: @@ -178,6 +179,7 @@ def has_issues(self) -> bool: or self.invalid_instance_property_types or self.failed_conversions or self.missing_instance_space + or self.no_mappable_properties ) @field_serializer("asset_centric_id") diff --git a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_conversion.py b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_conversion.py index 6adaff8b8a..6a521a4dc5 100644 --- a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_conversion.py +++ b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_conversion.py @@ -41,6 +41,7 @@ from cognite_toolkit._cdf_tk.client.resource_classes.filemetadata import FileMetadataResponse from cognite_toolkit._cdf_tk.client.resource_classes.migration import AssetCentricId, CreatedSourceSystem from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import RecordPropertyMapping +from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordId from cognite_toolkit._cdf_tk.client.resource_classes.resource_view_mapping import ResourceViewMappingResponse from cognite_toolkit._cdf_tk.client.resource_classes.timeseries import TimeSeriesResponse from cognite_toolkit._cdf_tk.client.resource_classes.view_to_view_mapping import ViewToViewMapping @@ -1678,7 +1679,7 @@ def test_convert_edges( class TestAssetCentricToRecord: CONTAINER_ID = ContainerId(space="my_stream_space", external_id="EventContainer") - INSTANCE_ID = NodeId(space="my_space", external_id="event_42") + INSTANCE_ID = RecordId(space="my_space", external_id="event_42") def _make_record_mapping(self, property_mapping: dict[str, str]) -> RecordPropertyMapping: return RecordPropertyMapping( @@ -1697,10 +1698,6 @@ def _make_container_properties(self) -> dict[str, ContainerPropertyDefinition]: ), } - @staticmethod - def _make_direct_relation_cache() -> DirectRelationCache: - return DirectRelationCache(MagicMock(spec=ToolkitClient)) - def test_basic_event_conversion(self) -> None: event = EventResponse( id=42, @@ -1709,7 +1706,7 @@ def test_basic_event_conversion(self) -> None: created_time=1, last_updated_time=2, ) - record_mapping = self._make_record_mapping({"description": "description", "startTime": "startTime"}) + record_mapping = self._make_record_mapping({"description": "description"}) container_properties = self._make_container_properties() record, issue = asset_centric_to_record( @@ -1717,48 +1714,18 @@ def test_basic_event_conversion(self) -> None: instance_id=self.INSTANCE_ID, record_mapping=record_mapping, container_properties=container_properties, - direct_relation_cache=self._make_direct_relation_cache(), + direct_relation_cache=DirectRelationCache(MagicMock(spec=ToolkitClient)), ) + assert not issue.has_issues assert record is not None assert record.space == self.INSTANCE_ID.space assert record.external_id == self.INSTANCE_ID.external_id assert len(record.sources) == 1 assert record.sources[0].source == self.CONTAINER_ID assert record.sources[0].properties["description"] == "An event" - assert record.sources[0].properties["startTime"] == "1970-01-01T00:00:00.000+00:00" - - def test_id_and_metadata_fields_are_excluded(self) -> None: - """id, externalId, dataSetId must not appear in mapped properties.""" - event = EventResponse( - id=99, - external_id="evt_99", - data_set_id=100, - description="Test", - created_time=0, - last_updated_time=0, - ) - # Map all possible fields so we can check excluded ones don't appear - record_mapping = self._make_record_mapping({"description": "description"}) - container_properties = self._make_container_properties() - - record, _ = asset_centric_to_record( - event, - instance_id=self.INSTANCE_ID, - record_mapping=record_mapping, - container_properties=container_properties, - direct_relation_cache=self._make_direct_relation_cache(), - ) - assert record is not None - assert len(record.sources) == 1 - # Verify the dumped record body does not contain id/externalId/dataSetId - body = record.sources[0].properties - assert "id" not in body - assert "externalId" not in body - assert "dataSetId" not in body - - def test_no_mappable_properties_yields_empty_sources(self) -> None: + def test_no_mappable_properties_returns_none(self) -> None: event = EventResponse(id=1, description="Event", created_time=0, last_updated_time=0) record_mapping = self._make_record_mapping({}) # empty mapping container_properties = self._make_container_properties() @@ -1768,8 +1735,7 @@ def test_no_mappable_properties_yields_empty_sources(self) -> None: instance_id=self.INSTANCE_ID, record_mapping=record_mapping, container_properties=container_properties, - direct_relation_cache=self._make_direct_relation_cache(), + direct_relation_cache=DirectRelationCache(MagicMock(spec=ToolkitClient)), ) - assert record is not None - assert record.sources == [] + assert record is None From 8610d8fe2290b15e5bd633b21fc1b5a984153342 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20Schj=C3=B8lberg?= Date: Wed, 25 Mar 2026 17:37:42 +0100 Subject: [PATCH 06/14] feat: wire events-to-records migration pipeline and CLI command --- cognite_toolkit/_cdf_tk/apps/_migrate_app.py | 127 ++++++++++++++++++ .../_cdf_tk/commands/_migrate/command.py | 62 ++++++++- .../_cdf_tk/commands/_migrate/data_mapper.py | 56 ++++++++ .../_cdf_tk/commands/_migrate/migration_io.py | 117 +++++++++++++++- .../test_migration_cmd/test_command.py | 46 +++++++ 5 files changed, 404 insertions(+), 4 deletions(-) diff --git a/cognite_toolkit/_cdf_tk/apps/_migrate_app.py b/cognite_toolkit/_cdf_tk/apps/_migrate_app.py index 2b443c1b53..d44c7f7ad1 100644 --- a/cognite_toolkit/_cdf_tk/apps/_migrate_app.py +++ b/cognite_toolkit/_cdf_tk/apps/_migrate_app.py @@ -8,6 +8,9 @@ from cognite_toolkit._cdf_tk.client import ToolkitClient from cognite_toolkit._cdf_tk.client.resource_classes.annotation import AnnotationResponse from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling import ContainerId +from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import ( + load_record_migration_config_yaml, +) from cognite_toolkit._cdf_tk.client.resource_classes.view_to_view_mapping import ViewToViewMapping from cognite_toolkit._cdf_tk.commands import MigrationPrepareCommand from cognite_toolkit._cdf_tk.commands._migrate import MigrationCommand @@ -23,6 +26,7 @@ ) from cognite_toolkit._cdf_tk.commands._migrate.data_mapper import ( AssetCentricToInstanceMapper, + AssetCentricToRecordMapper, CanvasMapper, ChartMapper, FDMtoCDMMapper, @@ -37,6 +41,7 @@ from cognite_toolkit._cdf_tk.commands._migrate.migration_io import ( AnnotationMigrationIO, AssetCentricMigrationIO, + RecordsMigrationIO, ThreeDAssetMappingMigrationIO, ThreeDMigrationIO, ) @@ -91,6 +96,8 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.command("charts")(self.charts) self.command("3d")(self.three_d) self.command("3d-mappings")(self.three_d_asset_mapping) + if Flags.RECORDS_MIGRATE.is_enabled(): + self.command("events-to-records")(self.events_to_records) if Flags.INFIELD_MIGRATE.is_enabled(): self.command("infield-configs")(self.infield_configs) self.command("infield-data")(self.infield_data) @@ -594,6 +601,126 @@ def events( ) ) + @classmethod + def events_to_records( + cls, + ctx: typer.Context, + config_file: Annotated[ + Path, + typer.Option( + "--config-file", + exists=True, + file_okay=True, + dir_okay=False, + readable=True, + help="Path to a YAML file containing the target configuration: streamExternalId, resourceType, and one or more mappings " + "(each mapping must contain: externalId, containerId and propertyMapping). ", + ), + ], + mapping_file: Annotated[ + Path | None, + typer.Option( + "--mapping-file", + "-m", + help="Path to a CSV file listing which events to migrate. " + "Columns: id, space, externalId; ingestionMapping is optional if the YAML defines a single mapping " + "or defaultMapping is set in the config file. " + "Mutually exclusive with --data-set-id.", + ), + ] = None, + data_set_id: Annotated[ + str | None, + typer.Option( + "--data-set-id", + "-s", + help="External ID of the data set to migrate all events from. " + "Requires defaultMapping to be set in the config file. " + "Mutually exclusive with --mapping-file.", + ), + ] = None, + skip_existing: Annotated[ + bool, + typer.Option( + "--skip-existing", + help="If set, queries the stream for existing records and skips uploads for " + "(space, externalId) pairs that already exist.", + ), + ] = False, + log_dir: Annotated[ + Path, + typer.Option( + "--log-dir", + "-l", + help="Path to the directory where logs will be stored. If the directory does not exist, it will be created.", + ), + ] = Path(f"migration_logs_{TODAY!s}"), + dry_run: Annotated[ + bool, + typer.Option( + "--dry-run", + "-d", + help="If set, the migration will not be executed, but only a report of what would be done is printed.", + ), + ] = False, + auto_yes: Annotated[ + bool, + typer.Option( + "--yes", + "-y", + help="If set, no confirmation prompt will be shown before proceeding with the migration.", + ), + ] = False, + verbose: Annotated[ + bool, + typer.Option( + "--verbose", + "-v", + help="Turn on to get more verbose output when running the command", + ), + ] = False, + ) -> None: + """Migrate Events to records (Streams API).""" + client = EnvironmentVariables.create_from_environment().get_client() + try: + migration_config = load_record_migration_config_yaml(config_file.read_text()) + except ToolkitValueError as exc: + raise typer.BadParameter(str(exc)) from exc + if data_set_id is not None and migration_config.default_mapping is None: + raise typer.BadParameter( + "--data-set-id requires defaultMapping to be set in the config file, " + "so all events in the data set are mapped to the same target container." + ) + mappings_by_external_id = {m.external_id: m for m in migration_config.mappings} + + selected, dry_run, verbose, skip_existing = cls._prepare_asset_centric_arguments( + client=client, + mapping_file=mapping_file, + data_set_id=data_set_id, + consumption_view=None, + ingestion_mapping=None, + dry_run=dry_run, + auto_yes=auto_yes, + verbose=verbose, + kind="Events", + resource_type="event", + skip_existing=skip_existing, + ) + + cmd = MigrationCommand(client=client) + cmd.run( + lambda: cmd.migrate( + selectors=[selected], + data=RecordsMigrationIO( + client, stream_external_id=migration_config.stream_external_id, skip_existing=skip_existing + ), + mapper=AssetCentricToRecordMapper(client, mappings_by_external_id=mappings_by_external_id, default_mapping=migration_config.default_mapping), # type: ignore[arg-type] + log_dir=log_dir, + dry_run=dry_run, + verbose=verbose, + user_log_filestem="events-records", + ) + ) + @classmethod def timeseries( cls, diff --git a/cognite_toolkit/_cdf_tk/commands/_migrate/command.py b/cognite_toolkit/_cdf_tk/commands/_migrate/command.py index b850e5e4ac..bcb122d8ed 100644 --- a/cognite_toolkit/_cdf_tk/commands/_migrate/command.py +++ b/cognite_toolkit/_cdf_tk/commands/_migrate/command.py @@ -15,9 +15,12 @@ ItemsFailedResponse, ItemsSuccessResponse, ) +from cognite_toolkit._cdf_tk.client.identifiers import ExternalId +from cognite_toolkit._cdf_tk.client.resource_classes.streams import StreamResponse from cognite_toolkit._cdf_tk.commands._base import ToolkitCommand from cognite_toolkit._cdf_tk.commands._migrate.creators import MigrationCreator from cognite_toolkit._cdf_tk.commands._migrate.data_mapper import DataMapper +from cognite_toolkit._cdf_tk.commands._migrate.migration_io import RecordsMigrationIO from cognite_toolkit._cdf_tk.commands.deploy import DeployCommand from cognite_toolkit._cdf_tk.constants import DMS_INSTANCE_LIMIT_MARGIN from cognite_toolkit._cdf_tk.cruds import ResourceWorker @@ -94,8 +97,12 @@ def migrate( ) if needed_capacity and not isinstance(data, ChartIO): - # Chart are not creating any new nodes. - self.validate_available_capacity(data.client, needed_capacity) + # Charts are not creating any new nodes. + if isinstance(data, RecordsMigrationIO): + stream = self.validate_stream_exists(data.client, data.stream_external_id) + self.validate_stream_capacity(data.client, stream, needed_capacity) + else: + self.validate_available_capacity(data.client, needed_capacity) results_by_selector: dict[str, list[MigrationStatusResult]] = {} with ( NDJsonWriter( @@ -146,7 +153,8 @@ def migrate( executor.raise_on_error() action = "Would migrate" if dry_run else "Migrating" - console.print(f"{action} {total:,} {selected.display_name} to instances.") + target = "records" if isinstance(data, RecordsMigrationIO) else "instances" + console.print(f"{action} {total:,} {selected.display_name} to {target}.") return results_by_selector @@ -357,6 +365,54 @@ def upload_items(page: Page[T_DataRequest]) -> None: return upload_items + @staticmethod + def validate_stream_exists(client: ToolkitClient, stream_external_id: str) -> StreamResponse: + results = client.streams.retrieve( + [ExternalId(external_id=stream_external_id)], include_statistics=True, ignore_unknown_ids=True + ) + if not results: + raise ToolkitMigrationError( + f"Stream '{stream_external_id}' does not exist. " + "Please create the stream before running a records migration." + ) + return results[0] + + def validate_stream_capacity(self, client: ToolkitClient, stream: StreamResponse, record_count: int) -> None: + limits = stream.settings.limits if stream.settings else None + if limits is None: + self.console(f"Unable to check stream capacity for '{stream.external_id}' (no settings returned).") + return + + records_usage = limits.max_records_total + records_consumed = records_usage.consumed or 0 + records_available = records_usage.provisioned - records_consumed + + if records_available < record_count: + raise ToolkitValueError( + f"Stream '{stream.external_id}' does not have enough record capacity. " + f"Provisioned: {records_usage.provisioned:,}, consumed: {records_consumed:,}, " + f"available: {records_available:,}, needed: {record_count:,}." + ) + + storage_usage = limits.max_giga_bytes_total + storage_consumed = storage_usage.consumed or 0 + storage_available = storage_usage.provisioned - storage_consumed + if storage_available <= 0: + raise ToolkitValueError( + f"Stream '{stream.external_id}' does not have enough storage capacity. " + f"Provisioned: {storage_usage.provisioned:,} GB, consumed: {storage_consumed:,} GB." + ) + + records_total_after = records_consumed + record_count + self.console( + f"Stream '{stream.external_id}' has enough capacity. " + f"Records after migration: {records_total_after:,} / {records_usage.provisioned:,}. " + ) + self.console( + f"Before migration, you've so far used {storage_consumed:,} / {storage_usage.provisioned:,} GB of stream storage. " + "Note that storage capacity is NOT considered when checking for capacity, and the migration might fail if you end up going over this limit." + ) + @staticmethod def validate_migration_model_available(client: ToolkitClient) -> None: models = client.tool.data_models.retrieve([MODEL_ID], inline_views=False) diff --git a/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py b/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py index c1861c4318..39f34b489e 100644 --- a/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py +++ b/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py @@ -28,6 +28,7 @@ ) from cognite_toolkit._cdf_tk.client.resource_classes.cognite_file import CogniteFileResponse from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling import ( + ContainerPropertyDefinition, DirectNodeRelation, EdgeRequest, EdgeResponse, @@ -55,6 +56,8 @@ ThreeDModelClassicResponse, ) from cognite_toolkit._cdf_tk.client.resource_classes.view_to_view_mapping import ViewToViewMapping +from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import RecordPropertyMapping +from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordRequest from cognite_toolkit._cdf_tk.commands._migrate.conversion import ( ConnectionCreator, ConversionContext, @@ -62,10 +65,13 @@ DirectRelationCache, EdgeOtherSide, asset_centric_to_dm, + asset_centric_to_record, convert_container_properties, convert_edges, ) from cognite_toolkit._cdf_tk.commands._migrate.data_classes import ( + EventMapping, + MigrationMapping, ThreeDMigrationRequest, ThreeDRevisionMigrationRequest, ) @@ -241,6 +247,56 @@ def _map_single_item( return instance, conversion_issue +class AssetCentricToRecordMapper(AssetCentricMapper[T_AssetCentricResourceExtended, RecordRequest]): + def __init__(self, client: ToolkitClient, mappings_by_external_id: dict[str, RecordPropertyMapping]) -> None: + super().__init__(client) + self._mappings_by_external_id = mappings_by_external_id + self._container_properties_by_mapping_external_id: dict[str, dict[str, ContainerPropertyDefinition]] = {} + + def prepare(self, source_selector: AssetCentricMigrationSelector) -> None: + seen_container: dict[str, dict[str, ContainerPropertyDefinition]] = {} + for external_id, record_mapping in self._mappings_by_external_id.items(): + container_key = f"{record_mapping.container_id.space}:{record_mapping.container_id.external_id}" + if container_key not in seen_container: + containers = self.client.tool.containers.retrieve([record_mapping.container_id]) + if not containers: + raise ToolkitValueError( + f"Container {record_mapping.container_id!r} was not found in CDF" + ) + seen_container[container_key] = containers[0].properties + self._container_properties_by_mapping_external_id[external_id] = seen_container[container_key] + + def _record_mapping_for_row(self, mapping: MigrationMapping) -> RecordPropertyMapping: + if not isinstance(mapping, EventMapping): + raise ToolkitValueError("Records migration only supports Event mapping rows.") + return mapping.get_record_property_mapping(self._mappings_by_external_id) + + def _map_single_item( + self, item: AssetCentricMapping[T_AssetCentricResourceExtended] + ) -> tuple[RecordRequest | None, ConversionIssue]: + row_mapping = item.mapping + record_mapping = self._record_mapping_for_row(row_mapping) + container_properties = self._container_properties_by_mapping_external_id[record_mapping.external_id] + record, conversion_issue = asset_centric_to_record( + item.resource, + instance_id=NodeId( + space=row_mapping.instance_id.space, external_id=row_mapping.instance_id.external_id + ), + record_mapping=record_mapping, + container_properties=container_properties, + direct_relation_cache=self._direct_relation_cache, + ) + if row_mapping.instance_id.space == MISSING_INSTANCE_SPACE: + conversion_issue.missing_instance_space = f"Missing instance space for dataset ID {row_mapping.data_set_id!r}" + if record is not None and not record.sources: + self.logger.tracker.add_issue( + str(item.mapping.as_asset_centric_id()), + "No properties could be successfully mapped (at least one property is required to create a record)", + ) + return None, conversion_issue + return record, conversion_issue + + class ChartMapper(DataMapper[ChartSelector, ChartResponse, ChartRequest]): DEFAULT_EVENT_VIEW = ViewId(space="cdf_cdm", external_id="CogniteActivity", version="v1") diff --git a/cognite_toolkit/_cdf_tk/commands/_migrate/migration_io.py b/cognite_toolkit/_cdf_tk/commands/_migrate/migration_io.py index 9c572ed662..2cc7fbe364 100644 --- a/cognite_toolkit/_cdf_tk/commands/_migrate/migration_io.py +++ b/cognite_toolkit/_cdf_tk/commands/_migrate/migration_io.py @@ -1,5 +1,9 @@ +import json +from collections import defaultdict from collections.abc import Iterable, Iterator, Mapping, Sequence -from typing import ClassVar, Literal +from typing import ClassVar, Literal, cast + +from pydantic import JsonValue from cognite_toolkit._cdf_tk.client import ToolkitClient from cognite_toolkit._cdf_tk.client.http_client import ( @@ -18,6 +22,7 @@ from cognite_toolkit._cdf_tk.client.resource_classes.annotation import AnnotationResponse from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling import EdgeId, InstanceRequest, NodeId from cognite_toolkit._cdf_tk.client.resource_classes.pending_instance_id import PendingInstanceId +from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordRequest from cognite_toolkit._cdf_tk.client.resource_classes.three_d import ( AssetMappingClassicResponse, AssetMappingDMRequestId, @@ -25,6 +30,7 @@ ) from cognite_toolkit._cdf_tk.commands._migrate.data_classes import ThreeDMigrationRequest from cognite_toolkit._cdf_tk.constants import MISSING_EXTERNAL_ID, MISSING_INSTANCE_SPACE +from cognite_toolkit._cdf_tk.cruds._resource_cruds.streams import StreamCRUD from cognite_toolkit._cdf_tk.exceptions import ToolkitNotImplementedError, ToolkitValueError from cognite_toolkit._cdf_tk.storageio import ( AnnotationIO, @@ -281,6 +287,115 @@ def as_pending_instance_id(item: InstanceRequest) -> PendingInstanceId: ) +class RecordsMigrationIO(AssetCentricMigrationIO): + """IO class for migrating asset-centric resources to records. + + Inherits all read-side logic (streaming, counting) from AssetCentricMigrationIO + and overrides only the upload path to target a records stream. + """ + + KIND = "RecordsMigration" + CHUNK_SIZE = 500 + UPLOAD_ENDPOINT = "/streams/{streamId}/records" + FILTER_ENDPOINT = "/streams/{streamId}/records/filter" + # Records filter API: multivalued `in` filters allow at most 100 values (see API error range [1, 100]). + FILTER_IN_MAX_VALUES = 100 + + def __init__(self, client: ToolkitClient, stream_external_id: str, skip_existing: bool = False) -> None: + super().__init__(client) + self.stream_external_id = stream_external_id + self.skip_existing = skip_existing + self._last_updated_time_windows: list[dict[str, int] | None] | None = None + if skip_existing: + stream_crud = StreamCRUD.create_loader(self.client) + self._last_updated_time_windows = stream_crud.last_updated_time_windows(self.stream_external_id) + + def _remove_existing( + self, + data_chunk: Sequence[DataItem[RecordRequest]], + http_client: HTTPClient, + ) -> list[DataItem[RecordRequest]]: + """Return upload items whose (space, externalId) are not already in the stream. + + Marks skipped items on the logger tracker. Groups by space and queries the filter API per group. + """ + if not data_chunk: + return [] + + last_updated_time_windows = self._last_updated_time_windows + assert last_updated_time_windows is not None + + by_space: dict[str, list[DataItem[RecordRequest]]] = defaultdict(list) + for upload_item in data_chunk: + by_space[upload_item.item.space].append(upload_item) + + existing_pairs: set[tuple[str, str]] = set() + filter_url = self.client.config.create_api_url(self.FILTER_ENDPOINT.format(streamId=self.stream_external_id)) + + for space, items in by_space.items(): + external_ids = [upload_item.item.external_id for upload_item in items] + for id_batch in chunker_sequence(external_ids, self.FILTER_IN_MAX_VALUES): + batch_list = list(id_batch) + for last_updated_time in last_updated_time_windows: + body = cast( + dict[str, JsonValue], + { + "filter": { + "and": [ + {"equals": {"property": ["space"], "value": space}}, + {"in": {"property": ["externalId"], "values": batch_list}}, + ] + }, + "limit": min(len(batch_list), 1000), + }, + ) + if last_updated_time is not None: + body["lastUpdatedTime"] = cast(JsonValue, last_updated_time) + request = RequestMessage( + endpoint_url=filter_url, + method="POST", + body_content=body, + ) + result = http_client.request_single_retries(request) + response = result.get_success_or_raise(request) + payload = json.loads(response.body) + for record in payload.get("items") or []: + record_space = record.get("space") + external_id = record.get("externalId") + if isinstance(record_space, str) and isinstance(external_id, str): + existing_pairs.add((record_space, external_id)) + + to_upload: list[DataItem[RecordRequest]] = [] + for upload_item in data_chunk: + pair = (upload_item.item.space, upload_item.item.external_id) + if pair in existing_pairs: + self.logger.tracker.finalize_item(upload_item.tracking_id, "skipped") + else: + to_upload.append(upload_item) + + return to_upload + + def upload_items( + self, + data_chunk: Sequence[DataItem[RecordRequest]], # type: ignore[override] + http_client: HTTPClient, + selector: AssetCentricMigrationSelector | None = None, + ) -> ItemsResultList: + if self.skip_existing: + data_chunk = self._remove_existing(data_chunk, http_client) + if not data_chunk: + return ItemsResultList() + + endpoint = self.UPLOAD_ENDPOINT.format(streamId=self.stream_external_id) + return http_client.request_items_retries( + message=ItemsRequest( + endpoint_url=self.client.config.create_api_url(endpoint), + method="POST", + items=data_chunk, + ) + ) + + class AnnotationMigrationIO( UploadableStorageIO[AssetCentricMigrationSelector, AssetCentricMapping[AnnotationResponse], InstanceRequest] ): diff --git a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_command.py b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_command.py index adcdbf5681..8b83ef2e00 100644 --- a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_command.py +++ b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_command.py @@ -33,6 +33,13 @@ from cognite_toolkit._cdf_tk.client.resource_classes.charts_data import ChartData, ChartSource, ChartTimeseries from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling import InstanceSource, NodeRequest, ViewId from cognite_toolkit._cdf_tk.client.resource_classes.migration import InstanceSource as LegacyInstanceSource +from cognite_toolkit._cdf_tk.client.resource_classes.streams import ( + LifecycleObject, + LimitsObject, + ResourceUsage, + StreamResponse, + StreamSettings, +) from cognite_toolkit._cdf_tk.client.testing import monkeypatch_toolkit_client from cognite_toolkit._cdf_tk.commands._migrate.command import MigrationCommand from cognite_toolkit._cdf_tk.commands._migrate.data_mapper import ( @@ -1044,3 +1051,42 @@ def test_create_logfile_stem(self, existing_files: list[str], stem: str, expecte actual = MigrationCommand._create_logfile_stem(tmp_path, stem, "not_important") assert actual == expected + + def _make_stream( + self, provisioned_records: int, consumed_records: int, provisioned_gb: float = 100.0, consumed_gb: float = 0.0 + ) -> StreamResponse: + return StreamResponse( + external_id="my_stream", + created_time=0, + created_from_template="t", + type="Mutable", + settings=StreamSettings( + lifecycle=LifecycleObject(retained_after_soft_delete="P30D"), + limits=LimitsObject( + max_records_total=ResourceUsage(provisioned=provisioned_records, consumed=consumed_records), + max_giga_bytes_total=ResourceUsage(provisioned=provisioned_gb, consumed=consumed_gb), + ), + ), + ) + + def test_validate_stream_capacity_sufficient(self) -> None: + stream = self._make_stream(provisioned_records=1_000_000, consumed_records=100_000) + cmd = MigrationCommand(silent=True) + with monkeypatch_toolkit_client() as client: + cmd.validate_stream_capacity(client, stream, 500_000) # should not raise + + def test_validate_stream_capacity_insufficient_records(self) -> None: + stream = self._make_stream(provisioned_records=1_000, consumed_records=900) + cmd = MigrationCommand(silent=True) + with monkeypatch_toolkit_client() as client: + with pytest.raises(ToolkitValueError, match="enough record capacity"): + cmd.validate_stream_capacity(client, stream, 200) + + def test_validate_stream_capacity_insufficient_storage(self) -> None: + stream = self._make_stream( + provisioned_records=1_000_000, consumed_records=0, provisioned_gb=10.0, consumed_gb=10.0 + ) + cmd = MigrationCommand(silent=True) + with monkeypatch_toolkit_client() as client: + with pytest.raises(ToolkitValueError, match="enough storage capacity"): + cmd.validate_stream_capacity(client, stream, 1) From 54a150a6e8c1a71853dabfa4e24b140aa47a307a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20Schj=C3=B8lberg?= Date: Thu, 26 Mar 2026 11:44:57 +0100 Subject: [PATCH 07/14] WIP --- cognite_toolkit/_cdf_tk/apps/_migrate_app.py | 8 ++++++-- .../_cdf_tk/commands/_migrate/data_mapper.py | 17 +++++++++++++---- cognite_toolkit/_cdf_tk/feature_flags.py | 4 ++++ 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/cognite_toolkit/_cdf_tk/apps/_migrate_app.py b/cognite_toolkit/_cdf_tk/apps/_migrate_app.py index d44c7f7ad1..81b2bfd6d4 100644 --- a/cognite_toolkit/_cdf_tk/apps/_migrate_app.py +++ b/cognite_toolkit/_cdf_tk/apps/_migrate_app.py @@ -708,12 +708,16 @@ def events_to_records( cmd = MigrationCommand(client=client) cmd.run( - lambda: cmd.migrate( + lambda: cmd.migrate( # type: ignore[misc] selectors=[selected], data=RecordsMigrationIO( client, stream_external_id=migration_config.stream_external_id, skip_existing=skip_existing ), - mapper=AssetCentricToRecordMapper(client, mappings_by_external_id=mappings_by_external_id, default_mapping=migration_config.default_mapping), # type: ignore[arg-type] + mapper=AssetCentricToRecordMapper( + client, + mappings_by_external_id=mappings_by_external_id, + default_mapping=migration_config.default_mapping, + ), log_dir=log_dir, dry_run=dry_run, verbose=verbose, diff --git a/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py b/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py index 39f34b489e..de0fed905b 100644 --- a/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py +++ b/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py @@ -57,7 +57,7 @@ ) from cognite_toolkit._cdf_tk.client.resource_classes.view_to_view_mapping import ViewToViewMapping from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import RecordPropertyMapping -from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordRequest +from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordId, RecordRequest from cognite_toolkit._cdf_tk.commands._migrate.conversion import ( ConnectionCreator, ConversionContext, @@ -248,9 +248,15 @@ def _map_single_item( class AssetCentricToRecordMapper(AssetCentricMapper[T_AssetCentricResourceExtended, RecordRequest]): - def __init__(self, client: ToolkitClient, mappings_by_external_id: dict[str, RecordPropertyMapping]) -> None: + def __init__( + self, + client: ToolkitClient, + mappings_by_external_id: dict[str, RecordPropertyMapping], + default_mapping: str | None = None, + ) -> None: super().__init__(client) self._mappings_by_external_id = mappings_by_external_id + self._default_mapping = default_mapping self._container_properties_by_mapping_external_id: dict[str, dict[str, ContainerPropertyDefinition]] = {} def prepare(self, source_selector: AssetCentricMigrationSelector) -> None: @@ -269,7 +275,10 @@ def prepare(self, source_selector: AssetCentricMigrationSelector) -> None: def _record_mapping_for_row(self, mapping: MigrationMapping) -> RecordPropertyMapping: if not isinstance(mapping, EventMapping): raise ToolkitValueError("Records migration only supports Event mapping rows.") - return mapping.get_record_property_mapping(self._mappings_by_external_id) + return mapping.get_record_property_mapping( + self._mappings_by_external_id, + self._default_mapping, + ) def _map_single_item( self, item: AssetCentricMapping[T_AssetCentricResourceExtended] @@ -279,7 +288,7 @@ def _map_single_item( container_properties = self._container_properties_by_mapping_external_id[record_mapping.external_id] record, conversion_issue = asset_centric_to_record( item.resource, - instance_id=NodeId( + instance_id=RecordId( space=row_mapping.instance_id.space, external_id=row_mapping.instance_id.external_id ), record_mapping=record_mapping, diff --git a/cognite_toolkit/_cdf_tk/feature_flags.py b/cognite_toolkit/_cdf_tk/feature_flags.py index 30005236c8..d657d64534 100644 --- a/cognite_toolkit/_cdf_tk/feature_flags.py +++ b/cognite_toolkit/_cdf_tk/feature_flags.py @@ -54,6 +54,10 @@ class Flags(Enum): visible=False, description="Enables the infield-migrate command for migrating Infield configs to regular Toolkit configs", ) + RECORDS_MIGRATE = FlagMetadata( + visible=False, + description="Enables the 'events-to-records' migration command", + ) INFIELD_DEV = FlagMetadata( visible=False, description="For InField developers: sets the schema space for the InFieldOnCDM data model to test migration before it becomes a system model.", From daf43376e3553c27a15893a91f7866a56877159e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20Schj=C3=B8lberg?= Date: Thu, 26 Mar 2026 15:28:42 +0100 Subject: [PATCH 08/14] Refactor --- .../_cdf_tk/commands/_migrate/data_mapper.py | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py b/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py index de0fed905b..1042ded404 100644 --- a/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py +++ b/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py @@ -45,6 +45,8 @@ ) from cognite_toolkit._cdf_tk.client.resource_classes.group import AllScope from cognite_toolkit._cdf_tk.client.resource_classes.group.acls import ChartsAdminAcl +from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import RecordPropertyMapping +from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordId, RecordRequest from cognite_toolkit._cdf_tk.client.resource_classes.resource_view_mapping import ( RESOURCE_VIEW_MAPPING_SPACE, ResourceViewMappingRequest, @@ -56,8 +58,6 @@ ThreeDModelClassicResponse, ) from cognite_toolkit._cdf_tk.client.resource_classes.view_to_view_mapping import ViewToViewMapping -from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import RecordPropertyMapping -from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordId, RecordRequest from cognite_toolkit._cdf_tk.commands._migrate.conversion import ( ConnectionCreator, ConversionContext, @@ -266,9 +266,7 @@ def prepare(self, source_selector: AssetCentricMigrationSelector) -> None: if container_key not in seen_container: containers = self.client.tool.containers.retrieve([record_mapping.container_id]) if not containers: - raise ToolkitValueError( - f"Container {record_mapping.container_id!r} was not found in CDF" - ) + raise ToolkitValueError(f"Container {record_mapping.container_id!r} was not found in CDF") seen_container[container_key] = containers[0].properties self._container_properties_by_mapping_external_id[external_id] = seen_container[container_key] @@ -288,21 +286,20 @@ def _map_single_item( container_properties = self._container_properties_by_mapping_external_id[record_mapping.external_id] record, conversion_issue = asset_centric_to_record( item.resource, - instance_id=RecordId( - space=row_mapping.instance_id.space, external_id=row_mapping.instance_id.external_id - ), + instance_id=RecordId(space=row_mapping.instance_id.space, external_id=row_mapping.instance_id.external_id), record_mapping=record_mapping, container_properties=container_properties, direct_relation_cache=self._direct_relation_cache, ) if row_mapping.instance_id.space == MISSING_INSTANCE_SPACE: - conversion_issue.missing_instance_space = f"Missing instance space for dataset ID {row_mapping.data_set_id!r}" - if record is not None and not record.sources: + conversion_issue.missing_instance_space = ( + f"Missing instance space for dataset ID {row_mapping.data_set_id!r}" + ) + if record is None and not conversion_issue.has_issues: self.logger.tracker.add_issue( str(item.mapping.as_asset_centric_id()), - "No properties could be successfully mapped (at least one property is required to create a record)", + "No properties could be mapped to the target container, at least one property is required to create a record", ) - return None, conversion_issue return record, conversion_issue From a94a18ca3191cfb9d8d80b26d7583e2b5f3388a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20Schj=C3=B8lberg?= Date: Thu, 26 Mar 2026 15:39:38 +0100 Subject: [PATCH 09/14] Refactoring --- cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py b/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py index 1042ded404..2d9d7ea072 100644 --- a/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py +++ b/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py @@ -159,6 +159,8 @@ def map( if conversion_issue.missing_instance_space: self.logger.tracker.add_issue(identifier, "Missing instance space") + if conversion_issue.no_mappable_properties: + self.logger.tracker.add_issue(identifier, "No properties could be mapped to target") if conversion_issue.failed_conversions: self.logger.tracker.add_issue(identifier, "Failed conversions") if conversion_issue.invalid_instance_property_types: From ead9a35b1f089e3dcfd5d9cab76cdbf1a91b7b46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20Schj=C3=B8lberg?= Date: Thu, 26 Mar 2026 15:52:33 +0100 Subject: [PATCH 10/14] Refactoring --- .../_cdf_tk/client/resource_classes/records.py | 12 ++++++------ .../test_migration_cmd/test_conversion.py | 3 ++- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/cognite_toolkit/_cdf_tk/client/resource_classes/records.py b/cognite_toolkit/_cdf_tk/client/resource_classes/records.py index 08b9e36bdc..0e86642da1 100644 --- a/cognite_toolkit/_cdf_tk/client/resource_classes/records.py +++ b/cognite_toolkit/_cdf_tk/client/resource_classes/records.py @@ -2,6 +2,8 @@ from pydantic import JsonValue, field_serializer, field_validator +from cognite_toolkit._cdf_tk.client.identifiers import NodeUntypedId + from cognite_toolkit._cdf_tk.client._resource_base import BaseModelObject, Identifier, RequestResource, ResponseResource from cognite_toolkit._cdf_tk.client.identifiers import ContainerId @@ -21,7 +23,7 @@ def _as_filename(self, include_type: bool = False) -> str: class RecordSource(BaseModelObject): source: ContainerId - properties: dict[str, JsonValue] + properties: dict[str, JsonValue | NodeUntypedId | list[NodeUntypedId]] @field_serializer("source", mode="plain") def serialize_source(self, value: ContainerId) -> Any: @@ -44,7 +46,7 @@ class RecordResponse(ResponseResource[RecordRequest]): space: str external_id: str - properties: dict[ContainerId, dict[str, JsonValue]] | None = None + properties: dict[ContainerId, dict[str, JsonValue | NodeUntypedId | list[NodeUntypedId]]] | None = None @field_validator("properties", mode="before") def parse_reference(cls, value: Any) -> Any: @@ -66,12 +68,10 @@ def parse_reference(cls, value: Any) -> Any: return parsed @field_serializer("properties", mode="plain") - def serialize_properties( - self, value: dict[ContainerId, dict[str, JsonValue]] | None - ) -> dict[str, dict[str, dict[str, JsonValue]]] | None: + def serialize_properties(self, value: dict[ContainerId, dict[str, Any]] | None) -> Any: if value is None: return None - serialized: dict[str, dict[str, dict[str, JsonValue]]] = {} + serialized: dict[str, dict[str, Any]] = {} for source_ref, props in value.items(): space = source_ref.space if space not in serialized: diff --git a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_conversion.py b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_conversion.py index 6a521a4dc5..6c1fb34414 100644 --- a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_conversion.py +++ b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_conversion.py @@ -1730,7 +1730,7 @@ def test_no_mappable_properties_returns_none(self) -> None: record_mapping = self._make_record_mapping({}) # empty mapping container_properties = self._make_container_properties() - record, _ = asset_centric_to_record( + record, issue = asset_centric_to_record( event, instance_id=self.INSTANCE_ID, record_mapping=record_mapping, @@ -1739,3 +1739,4 @@ def test_no_mappable_properties_returns_none(self) -> None: ) assert record is None + assert issue.no_mappable_properties From 30c13d95ebbfe3dbab57cd8fe0a357136341191e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20Schj=C3=B8lberg?= Date: Thu, 26 Mar 2026 15:53:15 +0100 Subject: [PATCH 11/14] Fix lint --- cognite_toolkit/_cdf_tk/client/resource_classes/records.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cognite_toolkit/_cdf_tk/client/resource_classes/records.py b/cognite_toolkit/_cdf_tk/client/resource_classes/records.py index 0e86642da1..0e3d6ac3b7 100644 --- a/cognite_toolkit/_cdf_tk/client/resource_classes/records.py +++ b/cognite_toolkit/_cdf_tk/client/resource_classes/records.py @@ -2,10 +2,8 @@ from pydantic import JsonValue, field_serializer, field_validator -from cognite_toolkit._cdf_tk.client.identifiers import NodeUntypedId - from cognite_toolkit._cdf_tk.client._resource_base import BaseModelObject, Identifier, RequestResource, ResponseResource -from cognite_toolkit._cdf_tk.client.identifiers import ContainerId +from cognite_toolkit._cdf_tk.client.identifiers import ContainerId, NodeUntypedId class RecordId(Identifier): From 71a3c4fb381335c3b92523197fd24a036501154d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20Schj=C3=B8lberg?= Date: Thu, 26 Mar 2026 16:09:19 +0100 Subject: [PATCH 12/14] Refactoring --- .../record_property_mapping.py | 26 +-------- .../test_record_property_mapping_yaml.py | 58 +++++-------------- 2 files changed, 19 insertions(+), 65 deletions(-) diff --git a/cognite_toolkit/_cdf_tk/client/resource_classes/record_property_mapping.py b/cognite_toolkit/_cdf_tk/client/resource_classes/record_property_mapping.py index 2ce6a0ab33..ddd70f1e2c 100644 --- a/cognite_toolkit/_cdf_tk/client/resource_classes/record_property_mapping.py +++ b/cognite_toolkit/_cdf_tk/client/resource_classes/record_property_mapping.py @@ -1,11 +1,9 @@ from typing import Literal, Self -from pydantic import ValidationError, model_validator +from pydantic import model_validator from cognite_toolkit._cdf_tk.client._resource_base import BaseModelObject from cognite_toolkit._cdf_tk.client.identifiers import ContainerId -from cognite_toolkit._cdf_tk.exceptions import ToolkitValueError -from cognite_toolkit._cdf_tk.utils.file import read_yaml_content class RecordPropertyMapping(BaseModelObject): @@ -26,6 +24,8 @@ class RecordMigrationConfig(BaseModelObject): @model_validator(mode="after") def _validate_mappings(self) -> Self: + if not self.mappings: + raise ValueError("mappings must contain at least one entry.") seen: set[str] = set() for mapping in self.mappings: if mapping.external_id in seen: @@ -37,23 +37,3 @@ def _validate_mappings(self) -> Self: f"Available: {sorted(seen)}." ) return self - - -def load_record_migration_config_yaml(yaml_content: str) -> RecordMigrationConfig: - """Parse YAML into `RecordMigrationConfig` for events-to-records.""" - content = read_yaml_content(yaml_content) - if not isinstance(content, dict): - raise ToolkitValueError( - f"Expected a YAML mapping with streamExternalId, resourceType, and mappings; got {type(content).__name__}." - ) - if "mappings" not in content: - raise ToolkitValueError( - "Missing required key 'mappings'. Top-level keys must include streamExternalId, resourceType, and mappings." - ) - try: - config = RecordMigrationConfig._load(content) - except ValidationError as exc: - raise ToolkitValueError(f"Invalid record migration config: {exc}") from exc - if not config.mappings: - raise ToolkitValueError("mappings must contain at least one record property mapping.") - return config diff --git a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_record_property_mapping_yaml.py b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_record_property_mapping_yaml.py index c5ef0fb978..2d417d2523 100644 --- a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_record_property_mapping_yaml.py +++ b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_record_property_mapping_yaml.py @@ -2,54 +2,28 @@ import pytest -from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import load_record_migration_config_yaml -from cognite_toolkit._cdf_tk.exceptions import ToolkitValueError +from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import RecordMigrationConfig -def test_load_wrapped_config() -> None: +def test_empty_mappings_rejected() -> None: + with pytest.raises(Exception, match="at least one"): + RecordMigrationConfig.load_yaml("streamExternalId: s\nresourceType: event\nmappings: []\n") + + +def test_default_mapping_not_in_mappings_rejected() -> None: yaml = textwrap.dedent( """ - streamExternalId: my-stream + streamExternalId: s resourceType: event + defaultMapping: nonexistent mappings: - - externalId: evt-records - containerId: - space: dm - externalId: MyContainer - propertyMapping: - description: description + - externalId: my-mapping + containerId: {space: s, externalId: c} + propertyMapping: {} """ ) - config = load_record_migration_config_yaml(yaml) - assert config.stream_external_id == "my-stream" - assert config.resource_type == "event" - assert len(config.mappings) == 1 - assert config.mappings[0].external_id == "evt-records" - - -def test_root_list_rejected() -> None: - with pytest.raises(ToolkitValueError, match="Expected a YAML mapping"): - load_record_migration_config_yaml("[]") - - -def test_missing_mappings_key_rejected() -> None: - with pytest.raises(ToolkitValueError, match="Missing required key 'mappings'"): - load_record_migration_config_yaml("streamExternalId: s\nresourceType: event\n") - - -def test_empty_mappings_rejected() -> None: - with pytest.raises(ToolkitValueError, match="at least one"): - load_record_migration_config_yaml("streamExternalId: s\nresourceType: event\nmappings: []\n") - - -def test_non_event_resource_type_rejected() -> None: - with pytest.raises(ToolkitValueError, match="Invalid record migration config"): - load_record_migration_config_yaml( - "streamExternalId: s\nresourceType: asset\nmappings:\n" - " - externalId: x\n" - " containerId: {space: dm, externalId: C}\n" - " propertyMapping: {}\n" - ) + with pytest.raises(Exception, match="nonexistent"): + RecordMigrationConfig.load_yaml(yaml) def test_duplicate_external_id_rejected() -> None: @@ -66,5 +40,5 @@ def test_duplicate_external_id_rejected() -> None: propertyMapping: {} """ ) - with pytest.raises(ToolkitValueError, match="Duplicate externalId"): - load_record_migration_config_yaml(yaml) + with pytest.raises(Exception, match="Duplicate externalId"): + RecordMigrationConfig.load_yaml(yaml) From 0d949e895ca74b7c2069ca862a419190ce628959 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20Schj=C3=B8lberg?= Date: Thu, 26 Mar 2026 16:10:41 +0100 Subject: [PATCH 13/14] WIP --- cognite_toolkit/_cdf_tk/apps/_migrate_app.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/cognite_toolkit/_cdf_tk/apps/_migrate_app.py b/cognite_toolkit/_cdf_tk/apps/_migrate_app.py index 81b2bfd6d4..cf07707ae0 100644 --- a/cognite_toolkit/_cdf_tk/apps/_migrate_app.py +++ b/cognite_toolkit/_cdf_tk/apps/_migrate_app.py @@ -8,9 +8,7 @@ from cognite_toolkit._cdf_tk.client import ToolkitClient from cognite_toolkit._cdf_tk.client.resource_classes.annotation import AnnotationResponse from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling import ContainerId -from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import ( - load_record_migration_config_yaml, -) +from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import RecordMigrationConfig from cognite_toolkit._cdf_tk.client.resource_classes.view_to_view_mapping import ViewToViewMapping from cognite_toolkit._cdf_tk.commands import MigrationPrepareCommand from cognite_toolkit._cdf_tk.commands._migrate import MigrationCommand @@ -682,8 +680,8 @@ def events_to_records( """Migrate Events to records (Streams API).""" client = EnvironmentVariables.create_from_environment().get_client() try: - migration_config = load_record_migration_config_yaml(config_file.read_text()) - except ToolkitValueError as exc: + migration_config = RecordMigrationConfig.load_yaml(config_file.read_text()) + except Exception as exc: raise typer.BadParameter(str(exc)) from exc if data_set_id is not None and migration_config.default_mapping is None: raise typer.BadParameter( From 461e85fb62d499454e462e4ca4b48ad251fc2f86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20Schj=C3=B8lberg?= Date: Wed, 1 Apr 2026 13:22:17 +0200 Subject: [PATCH 14/14] Adding tests --- .../_cdf_tk/commands/_migrate/data_mapper.py | 11 +- .../_cdf_tk/commands/_migrate/migration_io.py | 9 +- .../test_migration_cmd/test_command.py | 136 ++++++++++++++- .../test_migration_cmd/test_data_mapper.py | 107 +++++++++++- .../test_migration_cmd/test_migration_io.py | 158 +++++++++++++++++- 5 files changed, 409 insertions(+), 12 deletions(-) diff --git a/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py b/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py index 2d9d7ea072..4d07d012a4 100644 --- a/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py +++ b/cognite_toolkit/_cdf_tk/commands/_migrate/data_mapper.py @@ -275,10 +275,13 @@ def prepare(self, source_selector: AssetCentricMigrationSelector) -> None: def _record_mapping_for_row(self, mapping: MigrationMapping) -> RecordPropertyMapping: if not isinstance(mapping, EventMapping): raise ToolkitValueError("Records migration only supports Event mapping rows.") - return mapping.get_record_property_mapping( - self._mappings_by_external_id, - self._default_mapping, - ) + mapping_key = mapping.ingestion_mapping or self._default_mapping + if mapping_key is None or mapping_key not in self._mappings_by_external_id: + raise ToolkitValueError( + f"No record property mapping found for key {mapping_key!r}. " + "Set ingestionMapping in the CSV row or defaultMapping in the config file." + ) + return self._mappings_by_external_id[mapping_key] def _map_single_item( self, item: AssetCentricMapping[T_AssetCentricResourceExtended] diff --git a/cognite_toolkit/_cdf_tk/commands/_migrate/migration_io.py b/cognite_toolkit/_cdf_tk/commands/_migrate/migration_io.py index 19435790d7..45ff7f35f5 100644 --- a/cognite_toolkit/_cdf_tk/commands/_migrate/migration_io.py +++ b/cognite_toolkit/_cdf_tk/commands/_migrate/migration_io.py @@ -393,13 +393,14 @@ def _remove_existing( def upload_items( self, - data_chunk: Sequence[DataItem[RecordRequest]], # type: ignore[override] + data_chunk: Page[RecordRequest], http_client: HTTPClient, selector: AssetCentricMigrationSelector | None = None, ) -> ItemsResultList: + items: Sequence[DataItem[RecordRequest]] = data_chunk.items if self.skip_existing: - data_chunk = self._remove_existing(data_chunk, http_client) - if not data_chunk: + items = self._remove_existing(items, http_client) + if not items: return ItemsResultList() endpoint = self.UPLOAD_ENDPOINT.format(streamId=self.stream_external_id) @@ -407,7 +408,7 @@ def upload_items( message=ItemsRequest( endpoint_url=self.client.config.create_api_url(endpoint), method="POST", - items=data_chunk, + items=items, ) ) diff --git a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_command.py b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_command.py index 4b2edf48c9..a6f6374f03 100644 --- a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_command.py +++ b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_command.py @@ -20,8 +20,15 @@ from cognite.client.data_classes.data_modeling.statistics import InstanceStatistics, ProjectStatistics from cognite_toolkit._cdf_tk.client import ToolkitClient, ToolkitClientConfig +from cognite_toolkit._cdf_tk.client.identifiers import ContainerId from cognite_toolkit._cdf_tk.client.resource_classes.annotation import AnnotationResponse from cognite_toolkit._cdf_tk.client.resource_classes.asset import AssetResponse +from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling._container import ( + ContainerPropertyDefinition, + ContainerResponse, +) +from cognite_toolkit._cdf_tk.client.resource_classes.event import EventResponse +from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import RecordPropertyMapping from cognite_toolkit._cdf_tk.client.resource_classes.canvas import ( CANVAS_INSTANCE_SPACE, CANVAS_VIEW_ID, @@ -35,6 +42,7 @@ InstanceSource, NodeRequest, SpaceResponse, + TextProperty, ViewId, ) from cognite_toolkit._cdf_tk.client.resource_classes.migration import InstanceSource as LegacyInstanceSource @@ -49,9 +57,15 @@ from cognite_toolkit._cdf_tk.commands._migrate.command import MigrationCommand from cognite_toolkit._cdf_tk.commands._migrate.data_mapper import ( AssetCentricToInstanceMapper, + AssetCentricToRecordMapper, CanvasMapper, ChartMapper, ) +from cognite_toolkit._cdf_tk.commands._migrate.migration_io import ( + AnnotationMigrationIO, + AssetCentricMigrationIO, + RecordsMigrationIO, +) from cognite_toolkit._cdf_tk.commands._migrate.data_model import ( COGNITE_MIGRATION_MODEL, COGNITE_MIGRATION_SPACE_ID, @@ -65,7 +79,6 @@ FILE_ANNOTATIONS_ID, create_default_mappings, ) -from cognite_toolkit._cdf_tk.commands._migrate.migration_io import AnnotationMigrationIO, AssetCentricMigrationIO from cognite_toolkit._cdf_tk.commands._migrate.selectors import MigrationCSVFileSelector from cognite_toolkit._cdf_tk.exceptions import ToolkitMigrationError, ToolkitValueError from cognite_toolkit._cdf_tk.storageio import CanvasIO, ChartIO @@ -1118,3 +1131,124 @@ def test_validate_stream_capacity_insufficient_storage(self) -> None: with monkeypatch_toolkit_client() as client: with pytest.raises(ToolkitValueError, match="enough storage capacity"): cmd.validate_stream_capacity(client, stream, 1) + + def test_validate_stream_capacity_no_settings_logs_and_returns(self) -> None: + stream = StreamResponse( + external_id="no_settings_stream", + created_time=0, + created_from_template="t", + type="Mutable", + settings=None, + ) + cmd = MigrationCommand(silent=True) + with monkeypatch_toolkit_client() as client: + cmd.validate_stream_capacity(client, stream, 10_000) + + def test_validate_stream_exists_raises_when_stream_missing(self) -> None: + with monkeypatch_toolkit_client() as client: + client.streams.retrieve.return_value = [] + with pytest.raises(ToolkitMigrationError, match="does not exist"): + MigrationCommand.validate_stream_exists(client, "missing_stream") + + def test_validate_stream_exists_returns_stream(self) -> None: + stream = self._make_stream(provisioned_records=100, consumed_records=0) + with monkeypatch_toolkit_client() as client: + client.streams.retrieve.return_value = [stream] + result = MigrationCommand.validate_stream_exists(client, "my_stream") + assert result is stream + + @pytest.mark.usefixtures("cognite_migration_model") + def test_migrate_events_to_records( + self, + toolkit_config: ToolkitClientConfig, + respx_mock: respx.MockRouter, + tmp_path: Path, + ) -> None: + config = toolkit_config + space = "my_space" + stream_external_id = "test_stream" + container_id = ContainerId(space=space, external_id="EventContainer") + events = [ + EventResponse(id=100 + i, external_id=f"event_{i}", description=f"Event {i}", created_time=0, last_updated_time=1) + for i in range(2) + ] + + # Space validation + respx_mock.post(config.create_api_url("/models/spaces/byids")).mock( + return_value=httpx.Response( + status_code=200, + json={"items": [SpaceResponse(space=space, created_time=1, last_updated_time=1, is_global=False).dump()]}, + ) + ) + # Stream retrieve for validate_stream_exists + validate_stream_capacity + stream = self._make_stream(provisioned_records=1_000_000, consumed_records=0) + respx_mock.get(config.create_api_url(f"/streams/{stream_external_id}")).mock( + return_value=httpx.Response(status_code=200, json=stream.dump()) + ) + # Event retrieve + respx_mock.post(config.create_api_url("/events/byids")).mock( + return_value=httpx.Response( + status_code=200, json={"items": [e.dump() for e in events]} + ) + ) + # Container retrieve for mapper.prepare() + container = ContainerResponse( + space=container_id.space, + external_id=container_id.external_id, + properties={ + "description": ContainerPropertyDefinition( + type=TextProperty(), nullable=True, immutable=False, auto_increment=False + ) + }, + created_time=0, + last_updated_time=1, + is_global=False, + ) + respx_mock.post(config.create_api_url("/models/containers/byids")).mock( + return_value=httpx.Response(status_code=200, json={"items": [container.dump()]}) + ) + # Records upload — capture the request + records_route = respx_mock.post(config.create_api_url(f"/streams/{stream_external_id}/records")).mock( + return_value=httpx.Response(status_code=200, json={}) + ) + + csv_file = tmp_path / "events.csv" + csv_file.write_text( + "id,space,externalId\n" + "\n".join(f"{100 + i},{space},event_{i}" for i in range(len(events))) + ) + selector = MigrationCSVFileSelector(datafile=csv_file, kind="Events") + + record_mapping = RecordPropertyMapping( + external_id="my_mapping", + container_id=container_id, + property_mapping={"description": "description"}, + ) + client = ToolkitClient(config) + command = MigrationCommand(silent=True) + results_by_selector = command.migrate( + selectors=[selector], + data=RecordsMigrationIO(client, stream_external_id=stream_external_id), + mapper=AssetCentricToRecordMapper( + client, + mappings_by_external_id={"my_mapping": record_mapping}, + default_mapping="my_mapping", + ), + log_dir=tmp_path / "logs", + dry_run=False, + verbose=False, + ) + + assert records_route.called + upload_body = json.loads(records_route.calls[0].request.content) + uploaded = upload_body["items"] + assert len(uploaded) == len(events) + uploaded_by_id = {item["externalId"]: item for item in uploaded} + for i, event in enumerate(events): + record = uploaded_by_id[f"event_{i}"] + assert record["space"] == space + assert record["sources"][0]["source"]["externalId"] == container_id.external_id + assert record["sources"][0]["properties"]["description"] == event.description + + result = results_by_selector[str(selector)] + actual_results = {status.status: status.count for status in result} + assert actual_results == {"failure": 0, "pending": 0, "success": len(events), "unchanged": 0, "skipped": 0} diff --git a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_data_mapper.py b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_data_mapper.py index 6efaffd497..ecdabaaf0a 100644 --- a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_data_mapper.py +++ b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_data_mapper.py @@ -49,12 +49,20 @@ from cognite_toolkit._cdf_tk.client.resource_classes.view_to_view_mapping import ViewToViewMapping from cognite_toolkit._cdf_tk.client.testing import monkeypatch_toolkit_client from cognite_toolkit._cdf_tk.commands._migrate.conversion import ConnectionCreator +from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling._container import ( + ContainerPropertyDefinition, + ContainerResponse, +) +from cognite_toolkit._cdf_tk.client.resource_classes.record_property_mapping import RecordPropertyMapping from cognite_toolkit._cdf_tk.commands._migrate.data_classes import ( AssetCentricMapping, + AssetMapping, + EventMapping, MigrationMapping, ) from cognite_toolkit._cdf_tk.commands._migrate.data_mapper import ( AssetCentricToInstanceMapper, + AssetCentricToRecordMapper, CanvasMapper, ChartMapper, FDMtoCDMMapper, @@ -932,4 +940,101 @@ def test_map( mapped_schedules = [r for r in result if r is not None] assert len(mapped_schedules) == 2 - data_regression.check({"schedules": [s.dump() for s in mapped_schedules]}) + +class TestAssetCentricToRecordMapper: + def _make_mapping(self, external_id: str, container_id: ContainerId) -> RecordPropertyMapping: + return RecordPropertyMapping( + external_id=external_id, + container_id=container_id, + property_mapping={"description": "description"}, + ) + + def _make_container(self, container_id: ContainerId) -> ContainerResponse: + return ContainerResponse( + space=container_id.space, + external_id=container_id.external_id, + properties={ + "description": ContainerPropertyDefinition( + type=TextProperty(), nullable=True, immutable=False, auto_increment=False + ) + }, + created_time=0, + last_updated_time=1, + is_global=False, + ) + + def test_prepare_retrieves_and_caches_container(self) -> None: + container_id = ContainerId(space="my_space", external_id="EventContainer") + mapping_a = self._make_mapping("mapping_a", container_id) + mapping_b = self._make_mapping("mapping_b", container_id) + with monkeypatch_toolkit_client() as client: + client.tool.containers.retrieve.return_value = [self._make_container(container_id)] + mapper = AssetCentricToRecordMapper( + client, + mappings_by_external_id={"mapping_a": mapping_a, "mapping_b": mapping_b}, + ) + mapper.prepare(MagicMock()) + # Same container referenced twice — should only be retrieved once + assert client.tool.containers.retrieve.call_count == 1 + + def test_prepare_raises_on_missing_container(self) -> None: + container_id = ContainerId(space="my_space", external_id="MissingContainer") + mapping = self._make_mapping("mapping_x", container_id) + with monkeypatch_toolkit_client() as client: + client.tool.containers.retrieve.return_value = [] + mapper = AssetCentricToRecordMapper( + client, mappings_by_external_id={"mapping_x": mapping} + ) + with pytest.raises(ToolkitValueError, match="was not found in CDF"): + mapper.prepare(MagicMock()) + + def test_record_mapping_for_row_rejects_non_event(self) -> None: + container_id = ContainerId(space="my_space", external_id="EventContainer") + mapping = self._make_mapping("mapping_a", container_id) + source = AssetCentricMapping( + mapping=AssetMapping( + resource_type="asset", + instance_id=NodeId(space="my_space", external_id="asset_1"), + id=1, + ingestion_mapping="mapping_a", + ), + resource=AssetResponse(id=1, name="asset_1", created_time=0, last_updated_time=1, root_id=0), + ) + with monkeypatch_toolkit_client() as client: + client.tool.containers.retrieve.return_value = [self._make_container(container_id)] + mapper = AssetCentricToRecordMapper( + client, mappings_by_external_id={"mapping_a": mapping} + ) + mapper.prepare(MagicMock()) + with pytest.raises(ToolkitValueError, match="only supports Event"): + mapper.map([source]) + + def test_map_produces_record_request(self) -> None: + container_id = ContainerId(space="my_space", external_id="EventContainer") + mapping = self._make_mapping("mapping_a", container_id) + source = AssetCentricMapping( + mapping=EventMapping( + resource_type="event", + instance_id=NodeId(space="my_space", external_id="event_1"), + id=42, + ingestion_mapping="mapping_a", + ), + resource=EventResponse( + id=42, external_id="event_1", description="An event", created_time=0, last_updated_time=1 + ), + ) + with monkeypatch_toolkit_client() as client: + client.tool.containers.retrieve.return_value = [self._make_container(container_id)] + mapper = AssetCentricToRecordMapper( + client, mappings_by_external_id={"mapping_a": mapping} + ) + mapper.prepare(MagicMock()) + results = mapper.map([source]) + assert len(results) == 1 + record = results[0] + assert record is not None + assert record.space == "my_space" + assert record.external_id == "event_1" + assert len(record.sources) == 1 + assert record.sources[0].source == container_id + assert record.sources[0].properties["description"] == "An event" diff --git a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_migration_io.py b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_migration_io.py index e7aacee626..d38ca805a6 100644 --- a/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_migration_io.py +++ b/tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_migration_io.py @@ -1,5 +1,6 @@ +import json from pathlib import Path -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import httpx import pytest @@ -8,18 +9,34 @@ from cognite_toolkit._cdf_tk.client import ToolkitClient, ToolkitClientConfig from cognite_toolkit._cdf_tk.client.http_client import HTTPClient +from cognite_toolkit._cdf_tk.client.identifiers import ContainerId from cognite_toolkit._cdf_tk.client.resource_classes.annotation import AnnotationResponse from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling import SpaceResponse +from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordRequest, RecordSource from cognite_toolkit._cdf_tk.commands._migrate.migration_io import ( AnnotationMigrationIO, AssetCentricMigrationIO, + RecordsMigrationIO, ThreeDAssetMappingMigrationIO, ) from cognite_toolkit._cdf_tk.commands._migrate.selectors import MigrationCSVFileSelector -from cognite_toolkit._cdf_tk.storageio import AssetIO, Page +from cognite_toolkit._cdf_tk.storageio import AssetIO, DataItem, Page from cognite_toolkit._cdf_tk.storageio.selectors import ThreeDModelIdSelector +def _record_request_for_test(space: str, external_id: str) -> RecordRequest: + return RecordRequest( + space=space, + external_id=external_id, + sources=[ + RecordSource( + source=ContainerId(space="cspace", external_id="EventContainer"), + properties={"x": "y"}, + ) + ], + ) + + @pytest.fixture(scope="module") def toolkit_client(toolkit_config: ToolkitClientConfig) -> ToolkitClient: return ToolkitClient(config=toolkit_config) @@ -243,3 +260,140 @@ def test_invalid_methods(self, toolkit_client: ToolkitClient) -> None: with pytest.raises(NotImplementedError): io.data_to_json_chunk(MagicMock()) + + +@pytest.mark.usefixtures("disable_gzip") +class TestRecordsMigrationIO: + def test_upload_items_posts_to_stream_records( + self, toolkit_client: ToolkitClient, respx_mock: respx.MockRouter + ) -> None: + client = toolkit_client + config = client.config + stream_external_id = "unit_test_stream" + upload_route = respx_mock.post(config.create_api_url(f"/streams/{stream_external_id}/records")).mock( + return_value=Response(status_code=200, json={}) + ) + io = RecordsMigrationIO(client, stream_external_id, skip_existing=False) + items = [DataItem(tracking_id="t1", item=_record_request_for_test("sp", "e1"))] + with HTTPClient(config) as http_client: + io.upload_items(Page(worker_id="main", items=items), http_client) + assert upload_route.called + + def test_upload_items_skip_existing_filters_then_uploads_new_only( + self, toolkit_client: ToolkitClient, respx_mock: respx.MockRouter + ) -> None: + client = toolkit_client + config = client.config + stream_external_id = "unit_test_stream_skip" + filter_url = config.create_api_url(f"/streams/{stream_external_id}/records/filter") + upload_url = config.create_api_url(f"/streams/{stream_external_id}/records") + respx_mock.post(filter_url).mock( + return_value=Response( + status_code=200, + json={"items": [{"space": "sp", "externalId": "existing"}]}, + ) + ) + upload_route = respx_mock.post(upload_url).mock(return_value=Response(status_code=200, json={})) + mock_loader = MagicMock() + mock_loader.last_updated_time_windows.return_value = [None] + with patch( + "cognite_toolkit._cdf_tk.commands._migrate.migration_io.StreamCRUD.create_loader", + return_value=mock_loader, + ): + io = RecordsMigrationIO(client, stream_external_id, skip_existing=True) + io.logger = MagicMock() + items = [ + DataItem(tracking_id="track_existing", item=_record_request_for_test("sp", "existing")), + DataItem(tracking_id="track_new", item=_record_request_for_test("sp", "new_one")), + ] + with HTTPClient(config) as http_client: + io.upload_items(Page(worker_id="main", items=items), http_client) + io.logger.tracker.finalize_item.assert_called_once_with("track_existing", "skipped") + assert upload_route.called + upload_body = json.loads(upload_route.calls[0].request.content) + assert len(upload_body["items"]) == 1 + assert upload_body["items"][0]["externalId"] == "new_one" + + def test_remove_existing_multiple_spaces( + self, toolkit_client: ToolkitClient, respx_mock: respx.MockRouter + ) -> None: + client = toolkit_client + config = client.config + stream_external_id = "unit_test_stream_spaces" + filter_url = config.create_api_url(f"/streams/{stream_external_id}/records/filter") + upload_url = config.create_api_url(f"/streams/{stream_external_id}/records") + # Filter is called once per space; return one existing item for "alpha", none for "beta" + filter_route = respx_mock.post(filter_url).mock( + side_effect=[ + Response(status_code=200, json={"items": [{"space": "alpha", "externalId": "alpha_1"}]}), + Response(status_code=200, json={"items": []}), + ] + ) + upload_route = respx_mock.post(upload_url).mock(return_value=Response(status_code=200, json={})) + mock_loader = MagicMock() + mock_loader.last_updated_time_windows.return_value = [None] + with patch( + "cognite_toolkit._cdf_tk.commands._migrate.migration_io.StreamCRUD.create_loader", + return_value=mock_loader, + ): + io = RecordsMigrationIO(client, stream_external_id, skip_existing=True) + io.logger = MagicMock() + items = [ + DataItem(tracking_id="t_alpha1", item=_record_request_for_test("alpha", "alpha_1")), + DataItem(tracking_id="t_alpha2", item=_record_request_for_test("alpha", "alpha_2")), + DataItem(tracking_id="t_beta1", item=_record_request_for_test("beta", "beta_1")), + ] + with HTTPClient(config) as http_client: + io.upload_items(Page(worker_id="main", items=items), http_client) + assert filter_route.call_count == 2 + io.logger.tracker.finalize_item.assert_called_once_with("t_alpha1", "skipped") + upload_body = json.loads(upload_route.calls[0].request.content) + uploaded_ids = {item["externalId"] for item in upload_body["items"]} + assert uploaded_ids == {"alpha_2", "beta_1"} + + def test_remove_existing_batches_over_filter_limit( + self, toolkit_client: ToolkitClient, respx_mock: respx.MockRouter + ) -> None: + client = toolkit_client + config = client.config + stream_external_id = "unit_test_stream_batch" + filter_url = config.create_api_url(f"/streams/{stream_external_id}/records/filter") + upload_url = config.create_api_url(f"/streams/{stream_external_id}/records") + filter_route = respx_mock.post(filter_url).mock( + return_value=Response(status_code=200, json={"items": []}) + ) + respx_mock.post(upload_url).mock(return_value=Response(status_code=200, json={})) + mock_loader = MagicMock() + mock_loader.last_updated_time_windows.return_value = [None] + with patch( + "cognite_toolkit._cdf_tk.commands._migrate.migration_io.StreamCRUD.create_loader", + return_value=mock_loader, + ): + io = RecordsMigrationIO(client, stream_external_id, skip_existing=True) + io.logger = MagicMock() + # 150 items > FILTER_IN_MAX_VALUES (100) → requires 2 filter batches + items = [ + DataItem(tracking_id=f"t{i}", item=_record_request_for_test("sp", f"evt_{i}")) + for i in range(150) + ] + with HTTPClient(config) as http_client: + io.upload_items(Page(worker_id="main", items=items), http_client) + assert filter_route.call_count == 2 + + def test_remove_existing_empty_chunk(self, toolkit_client: ToolkitClient, respx_mock: respx.MockRouter) -> None: + client = toolkit_client + config = client.config + stream_external_id = "unit_test_stream_empty" + filter_route = respx_mock.post( + config.create_api_url(f"/streams/{stream_external_id}/records/filter") + ).mock(return_value=Response(status_code=200, json={})) + mock_loader = MagicMock() + mock_loader.last_updated_time_windows.return_value = [None] + with patch( + "cognite_toolkit._cdf_tk.commands._migrate.migration_io.StreamCRUD.create_loader", + return_value=mock_loader, + ): + io = RecordsMigrationIO(client, stream_external_id, skip_existing=True) + with HTTPClient(config) as http_client: + io.upload_items(Page(worker_id="main", items=[]), http_client) + assert not filter_route.called