diff --git a/cognite/extractorutils/unstable/configuration/models.py b/cognite/extractorutils/unstable/configuration/models.py index ad593752..4691f399 100644 --- a/cognite/extractorutils/unstable/configuration/models.py +++ b/cognite/extractorutils/unstable/configuration/models.py @@ -51,6 +51,7 @@ "MetricsConfig", "ScheduleConfig", "TimeIntervalConfig", + "WithDataSetId", ] @@ -619,6 +620,32 @@ def _log_handler_default() -> list[LogHandlerConfig]: return [LogConsoleHandlerConfig(type="console", level=LogLevel.INFO)] +class WithDataSetId(ConfigModel): + """ + Base class for configuration models that include a data set reference. + """ + + data_set: EitherIdConfig | None = None + + def get_data_set(self, cdf_client: CogniteClient) -> DataSet | None: + """ + Retrieves the DataSet object based on the configuration. + + Args: + cdf_client: An instance of CogniteClient to use for retrieving the DataSet. + + Returns: + DataSet object if data_set is provided; otherwise None. + """ + if not self.data_set: + return None + + return cdf_client.data_sets.retrieve( + id=self.data_set.either_id.internal_id, + external_id=self.data_set.either_id.external_id, + ) + + class FileSizeConfig: """ Configuration parameter for setting a file size. @@ -868,31 +895,6 @@ class ExtractorConfig(ConfigModel): state_store: StateStoreConfig | None = None metrics: MetricsConfig | None = None log_handlers: list[LogHandlerConfig] = Field(default_factory=_log_handler_default) - retry_startup: bool = True - upload_queue_size: int = 50_000 - data_set: EitherIdConfig | None = None - data_set_external_id: str | None = None - - def get_data_set(self, cdf_client: CogniteClient) -> DataSet | None: - """ - Retrieves the DataSet object based on the configuration. - - Args: - cdf_client: An instance of CogniteClient to use for retrieving the DataSet. - - Returns: - DataSet object if data_set, data_set_id, or data_set_external_id is provided; otherwise None. - """ - if self.data_set_external_id: - return cdf_client.data_sets.retrieve(external_id=self.data_set_external_id) - - if not self.data_set: - return None - - return cdf_client.data_sets.retrieve( - id=self.data_set.either_id.internal_id, - external_id=self.data_set.either_id.external_id, - ) ConfigType = TypeVar("ConfigType", bound=ExtractorConfig) diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index 78a7b4a6..6b4af39e 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -142,6 +142,7 @@ class Extractor(Generic[ConfigType], CogniteLogger): CONFIG_TYPE: type[ConfigType] + RETRY_STARTUP: bool = True RESTART_POLICY: RestartPolicy = WHEN_CONTINUOUS_TASKS_CRASHES USE_DEFAULT_STATE_STORE: bool = True _statestore_singleton: AbstractStateStore | None = None diff --git a/cognite/extractorutils/unstable/core/runtime.py b/cognite/extractorutils/unstable/core/runtime.py index d545b3de..78ec701b 100644 --- a/cognite/extractorutils/unstable/core/runtime.py +++ b/cognite/extractorutils/unstable/core/runtime.py @@ -85,8 +85,7 @@ def _extractor_process_entrypoint( checkin_worker.active_revision = config.current_config_revision checkin_worker.set_on_fatal_error_handler(lambda _: on_fatal_error(controls)) checkin_worker.set_on_revision_change_handler(lambda _: on_revision_changed(controls)) - if config.application_config.retry_startup: - checkin_worker.set_retry_startup(config.application_config.retry_startup) + checkin_worker.set_retry_startup(extractor_class.RETRY_STARTUP) if not metrics: metrics = BaseMetrics(extractor_name=extractor_class.NAME, extractor_version=extractor_class.VERSION) extractor = extractor_class._init_from_runtime(config, checkin_worker, metrics) diff --git a/tests/test_unstable/test_base.py b/tests/test_unstable/test_base.py index 9d512272..1f8527d8 100644 --- a/tests/test_unstable/test_base.py +++ b/tests/test_unstable/test_base.py @@ -15,7 +15,6 @@ from cognite.extractorutils.unstable.configuration.loaders import ConfigFormat, load_io from cognite.extractorutils.unstable.configuration.models import ( ConnectionConfig, - ExtractorConfig, LocalStateStoreConfig, LogConsoleHandlerConfig, LogFileHandlerConfig, @@ -345,16 +344,3 @@ def test_pushgatewayconfig_none_credentials_from_yaml() -> None: assert pusher.password is None assert pusher.url == "http://localhost:9091" assert pusher.job_name == "test-job" - - -def test_extractor_config_upload_queue_size_with_yaml() -> None: - """Test upload_queue_size parsing from YAML configuration.""" - config_yaml = """ -upload-queue-size: 200000 -retry-startup: false -""" - stream = StringIO(config_yaml) - config = load_io(stream, ConfigFormat.YAML, ExtractorConfig) - - assert config.upload_queue_size == 200_000 - assert config.retry_startup is False diff --git a/tests/test_unstable/test_configuration.py b/tests/test_unstable/test_configuration.py index a4cefb00..43a7588a 100644 --- a/tests/test_unstable/test_configuration.py +++ b/tests/test_unstable/test_configuration.py @@ -13,10 +13,10 @@ ConfigModel, ConnectionConfig, EitherIdConfig, - ExtractorConfig, FileSizeConfig, LogLevel, TimeIntervalConfig, + WithDataSetId, _ClientCredentialsConfig, ) @@ -310,19 +310,10 @@ def test_setting_log_level_from_any_case() -> None: @pytest.mark.parametrize( - "data_set_external_id,data_set_config,expected_call,expected_result_attrs,should_return_none", + "data_set_config,expected_call,expected_result_attrs,should_return_none", [ - # Test with data_set_external_id provided - ( - "test-dataset", - None, - {"external_id": "test-dataset"}, - {"external_id": "test-dataset", "name": "Test Dataset"}, - False, - ), # Test with data_set config using internal ID ( - None, EitherIdConfig(id=12345), {"id": 12345, "external_id": None}, {"id": 12345, "name": "Test Dataset"}, @@ -330,23 +321,13 @@ def test_setting_log_level_from_any_case() -> None: ), # Test with data_set config using external ID ( - None, EitherIdConfig(external_id="config-dataset"), {"id": None, "external_id": "config-dataset"}, {"external_id": "config-dataset", "name": "Config Dataset"}, False, ), - # Test that data_set_external_id takes priority over data_set + # Test with data_set not provided ( - "priority-dataset", - EitherIdConfig(external_id="should-be-ignored"), - {"external_id": "priority-dataset"}, - {"external_id": "priority-dataset", "name": "Priority Dataset"}, - False, - ), - # Test with neither data_set_external_id nor data_set provided - ( - None, None, {}, {}, @@ -354,17 +335,14 @@ def test_setting_log_level_from_any_case() -> None: ), ], ) -def test_get_data_set_various_configurations( - data_set_external_id: str | None, +def test_with_data_set_id_various_configurations( data_set_config: EitherIdConfig | None, expected_call: dict | None, expected_result_attrs: dict | None, should_return_none: bool, ) -> None: - """Test get_data_set method with various configuration scenarios.""" - extractor_config = ExtractorConfig( - retry_startup=False, - data_set_external_id=data_set_external_id, + """Test WithDataSetId.get_data_set method with various configuration scenarios.""" + with_data_set_config = WithDataSetId( data_set=data_set_config, ) @@ -375,7 +353,7 @@ def test_get_data_set_various_configurations( mock_dataset = DataSet(**expected_result_attrs) mock_client.data_sets.retrieve.return_value = mock_dataset - result = extractor_config.get_data_set(mock_client) + result = with_data_set_config.get_data_set(mock_client) if should_return_none: assert result is None