Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 27 additions & 25 deletions cognite/extractorutils/unstable/configuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"MetricsConfig",
"ScheduleConfig",
"TimeIntervalConfig",
"WithDataSetId",
]


Expand Down Expand Up @@ -619,6 +620,32 @@ def _log_handler_default() -> list[LogHandlerConfig]:
return [LogConsoleHandlerConfig(type="console", level=LogLevel.INFO)]


class WithDataSetId(ConfigModel):
"""
Base class for configuration models that include a data set reference.
"""

data_set: EitherIdConfig | None = None

def get_data_set(self, cdf_client: CogniteClient) -> DataSet | None:
"""
Retrieves the DataSet object based on the configuration.

Args:
cdf_client: An instance of CogniteClient to use for retrieving the DataSet.

Returns:
DataSet object if data_set is provided; otherwise None.
"""
if not self.data_set:
return None

return cdf_client.data_sets.retrieve(
id=self.data_set.either_id.internal_id,
external_id=self.data_set.either_id.external_id,
)


class FileSizeConfig:
"""
Configuration parameter for setting a file size.
Expand Down Expand Up @@ -868,31 +895,6 @@ class ExtractorConfig(ConfigModel):
state_store: StateStoreConfig | None = None
metrics: MetricsConfig | None = None
log_handlers: list[LogHandlerConfig] = Field(default_factory=_log_handler_default)
retry_startup: bool = True
upload_queue_size: int = 50_000
data_set: EitherIdConfig | None = None
data_set_external_id: str | None = None

def get_data_set(self, cdf_client: CogniteClient) -> DataSet | None:
"""
Retrieves the DataSet object based on the configuration.

Args:
cdf_client: An instance of CogniteClient to use for retrieving the DataSet.

Returns:
DataSet object if data_set, data_set_id, or data_set_external_id is provided; otherwise None.
"""
if self.data_set_external_id:
return cdf_client.data_sets.retrieve(external_id=self.data_set_external_id)

if not self.data_set:
return None

return cdf_client.data_sets.retrieve(
id=self.data_set.either_id.internal_id,
external_id=self.data_set.either_id.external_id,
)


ConfigType = TypeVar("ConfigType", bound=ExtractorConfig)
Expand Down
1 change: 1 addition & 0 deletions cognite/extractorutils/unstable/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class Extractor(Generic[ConfigType], CogniteLogger):

CONFIG_TYPE: type[ConfigType]

RETRY_STARTUP: bool = True
RESTART_POLICY: RestartPolicy = WHEN_CONTINUOUS_TASKS_CRASHES
USE_DEFAULT_STATE_STORE: bool = True
_statestore_singleton: AbstractStateStore | None = None
Expand Down
3 changes: 1 addition & 2 deletions cognite/extractorutils/unstable/core/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ def _extractor_process_entrypoint(
checkin_worker.active_revision = config.current_config_revision
checkin_worker.set_on_fatal_error_handler(lambda _: on_fatal_error(controls))
checkin_worker.set_on_revision_change_handler(lambda _: on_revision_changed(controls))
if config.application_config.retry_startup:
checkin_worker.set_retry_startup(config.application_config.retry_startup)
checkin_worker.set_retry_startup(extractor_class.RETRY_STARTUP)
if not metrics:
metrics = BaseMetrics(extractor_name=extractor_class.NAME, extractor_version=extractor_class.VERSION)
extractor = extractor_class._init_from_runtime(config, checkin_worker, metrics)
Expand Down
14 changes: 0 additions & 14 deletions tests/test_unstable/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from cognite.extractorutils.unstable.configuration.loaders import ConfigFormat, load_io
from cognite.extractorutils.unstable.configuration.models import (
ConnectionConfig,
ExtractorConfig,
LocalStateStoreConfig,
LogConsoleHandlerConfig,
LogFileHandlerConfig,
Expand Down Expand Up @@ -345,16 +344,3 @@ def test_pushgatewayconfig_none_credentials_from_yaml() -> None:
assert pusher.password is None
assert pusher.url == "http://localhost:9091"
assert pusher.job_name == "test-job"


def test_extractor_config_upload_queue_size_with_yaml() -> None:
"""Test upload_queue_size parsing from YAML configuration."""
config_yaml = """
upload-queue-size: 200000
retry-startup: false
"""
stream = StringIO(config_yaml)
config = load_io(stream, ConfigFormat.YAML, ExtractorConfig)

assert config.upload_queue_size == 200_000
assert config.retry_startup is False
36 changes: 7 additions & 29 deletions tests/test_unstable/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
ConfigModel,
ConnectionConfig,
EitherIdConfig,
ExtractorConfig,
FileSizeConfig,
LogLevel,
TimeIntervalConfig,
WithDataSetId,
_ClientCredentialsConfig,
)

Expand Down Expand Up @@ -310,61 +310,39 @@ def test_setting_log_level_from_any_case() -> None:


@pytest.mark.parametrize(
"data_set_external_id,data_set_config,expected_call,expected_result_attrs,should_return_none",
"data_set_config,expected_call,expected_result_attrs,should_return_none",
[
# Test with data_set_external_id provided
(
"test-dataset",
None,
{"external_id": "test-dataset"},
{"external_id": "test-dataset", "name": "Test Dataset"},
False,
),
# Test with data_set config using internal ID
(
None,
EitherIdConfig(id=12345),
{"id": 12345, "external_id": None},
{"id": 12345, "name": "Test Dataset"},
False,
),
# Test with data_set config using external ID
(
None,
EitherIdConfig(external_id="config-dataset"),
{"id": None, "external_id": "config-dataset"},
{"external_id": "config-dataset", "name": "Config Dataset"},
False,
),
# Test that data_set_external_id takes priority over data_set
# Test with data_set not provided
(
"priority-dataset",
EitherIdConfig(external_id="should-be-ignored"),
{"external_id": "priority-dataset"},
{"external_id": "priority-dataset", "name": "Priority Dataset"},
False,
),
# Test with neither data_set_external_id nor data_set provided
(
None,
None,
{},
{},
True,
),
],
)
def test_get_data_set_various_configurations(
data_set_external_id: str | None,
def test_with_data_set_id_various_configurations(
data_set_config: EitherIdConfig | None,
expected_call: dict | None,
expected_result_attrs: dict | None,
should_return_none: bool,
) -> None:
"""Test get_data_set method with various configuration scenarios."""
extractor_config = ExtractorConfig(
retry_startup=False,
data_set_external_id=data_set_external_id,
"""Test WithDataSetId.get_data_set method with various configuration scenarios."""
with_data_set_config = WithDataSetId(
data_set=data_set_config,
)

Expand All @@ -375,7 +353,7 @@ def test_get_data_set_various_configurations(
mock_dataset = DataSet(**expected_result_attrs)
mock_client.data_sets.retrieve.return_value = mock_dataset

result = extractor_config.get_data_set(mock_client)
result = with_data_set_config.get_data_set(mock_client)

if should_return_none:
assert result is None
Expand Down
Loading