From 7d8cc9332368250fc2ab8de64b491dcfed6d3f47 Mon Sep 17 00:00:00 2001 From: Hemant Mishra Date: Mon, 8 Dec 2025 15:29:54 +0530 Subject: [PATCH 1/3] Fix metrics pickling issue while passing through runtime --- cognite/extractorutils/unstable/core/base.py | 51 ++++++-- .../extractorutils/unstable/core/runtime.py | 12 +- tests/test_unstable/conftest.py | 27 ++++- tests/test_unstable/test_base.py | 5 +- tests/test_unstable/test_runtime.py | 112 ++++++++++++++++-- 5 files changed, 179 insertions(+), 28 deletions(-) diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index 6b4af39e..771561f1 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -59,7 +59,7 @@ def my_task_function(self, task_context: TaskContext) -> None: from typing_extensions import Self, assert_never from cognite.extractorutils._inner_util import _resolve_log_level -from cognite.extractorutils.metrics import BaseMetrics +from cognite.extractorutils.metrics import BaseMetrics, safe_get from cognite.extractorutils.statestore import ( AbstractStateStore, LocalStateStore, @@ -117,11 +117,13 @@ def __init__( application_config: _T, current_config_revision: ConfigRevision, log_level_override: str | None = None, + metrics_class: type[BaseMetrics] | None = None, ) -> None: self.connection_config = connection_config self.application_config = application_config self.current_config_revision: ConfigRevision = current_config_revision self.log_level_override = log_level_override + self.metrics_class = metrics_class class Extractor(Generic[ConfigType], CogniteLogger): @@ -146,12 +148,11 @@ class Extractor(Generic[ConfigType], CogniteLogger): RESTART_POLICY: RestartPolicy = WHEN_CONTINUOUS_TASKS_CRASHES USE_DEFAULT_STATE_STORE: bool = True _statestore_singleton: AbstractStateStore | None = None + _metrics_singleton: BaseMetrics cancellation_token: CancellationToken - def __init__( - self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker, metrics: BaseMetrics | None = None - ) -> None: + def __init__(self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker) -> None: self._logger = logging.getLogger(f"{self.EXTERNAL_ID}.main") self._checkin_worker = checkin_worker @@ -175,7 +176,8 @@ def __init__( self._tasks: list[Task] = [] self._start_time: datetime - self._metrics: BaseMetrics | None = metrics + + self.metrics: BaseMetrics = self._load_metrics(config.metrics_class) self.metrics_push_manager = ( self.metrics_config.create_manager(self.cognite_client, cancellation_token=self.cancellation_token) @@ -262,6 +264,39 @@ def _setup_logging(self) -> None: "Defaulted to console logging." ) + def _load_metrics(self, metrics_class: type[BaseMetrics] | None = None) -> BaseMetrics: + """ + Loads metrics based on the provided metrics class. + + Reuses existing singleton if available to avoid Prometheus registry conflicts. + """ + if Extractor._metrics_singleton is not None: + self.metrics = Extractor._metrics_singleton + return self.metrics + + if metrics_class: + self.metrics = safe_get(metrics_class) + else: + self.metrics = BaseMetrics(extractor_name=self.EXTERNAL_ID, extractor_version=self.VERSION) + + Extractor._metrics_singleton = self.metrics + return self.metrics + + @classmethod + def get_current_metrics(cls) -> BaseMetrics: + """ + Get the current metrics singleton. + + Returns: + The current metrics singleton + + Raises: + ValueError: If no metrics singleton has been created, meaning no metrics have been initialized. + """ + if Extractor._metrics_singleton is None: + raise ValueError("No metrics singleton created. Have metrics been initialized?") + return Extractor._metrics_singleton + def _load_state_store(self) -> None: """ Searches through the config object for a StateStoreConfig. @@ -379,10 +414,8 @@ def restart(self) -> None: self.cancellation_token.cancel() @classmethod - def _init_from_runtime( - cls, config: FullConfig[ConfigType], checkin_worker: CheckinWorker, metrics: BaseMetrics - ) -> Self: - return cls(config, checkin_worker, metrics) + def _init_from_runtime(cls, config: FullConfig[ConfigType], checkin_worker: CheckinWorker) -> Self: + return cls(config, checkin_worker) def add_task(self, task: Task) -> None: """ diff --git a/cognite/extractorutils/unstable/core/runtime.py b/cognite/extractorutils/unstable/core/runtime.py index 78ec701b..38ccf85b 100644 --- a/cognite/extractorutils/unstable/core/runtime.py +++ b/cognite/extractorutils/unstable/core/runtime.py @@ -79,16 +79,13 @@ def _extractor_process_entrypoint( controls: _RuntimeControls, config: FullConfig, checkin_worker: CheckinWorker, - metrics: BaseMetrics | None = None, ) -> None: logger = logging.getLogger(f"{extractor_class.EXTERNAL_ID}.runtime") checkin_worker.active_revision = config.current_config_revision checkin_worker.set_on_fatal_error_handler(lambda _: on_fatal_error(controls)) checkin_worker.set_on_revision_change_handler(lambda _: on_revision_changed(controls)) checkin_worker.set_retry_startup(extractor_class.RETRY_STARTUP) - if not metrics: - metrics = BaseMetrics(extractor_name=extractor_class.NAME, extractor_version=extractor_class.VERSION) - extractor = extractor_class._init_from_runtime(config, checkin_worker, metrics) + extractor = extractor_class._init_from_runtime(config, checkin_worker) extractor._attach_runtime_controls( cancel_event=controls.cancel_event, message_queue=controls.message_queue, @@ -138,13 +135,13 @@ class Runtime(Generic[ExtractorType]): def __init__( self, extractor: type[ExtractorType], - metrics: BaseMetrics | None = None, + metrics: type[BaseMetrics] | None = None, ) -> None: self._extractor_class = extractor self._cancellation_token = CancellationToken() self._cancellation_token.cancel_on_interrupt() self._message_queue: Queue[RuntimeMessage] = Queue() - self._metrics = metrics + self._metrics_class = metrics self.logger = logging.getLogger(f"{self._extractor_class.EXTERNAL_ID}.runtime") self._setup_logging() self._cancel_event: MpEvent | None = None @@ -273,7 +270,7 @@ def _spawn_extractor( process = Process( target=_extractor_process_entrypoint, - args=(self._extractor_class, controls, config, checkin_worker, self._metrics), + args=(self._extractor_class, controls, config, checkin_worker), ) process.start() @@ -507,6 +504,7 @@ def _main_runtime(self, args: Namespace) -> None: application_config=application_config, current_config_revision=current_config_revision, log_level_override=args.log_level, + metrics_class=self._metrics_class, ), checkin_worker, ) diff --git a/tests/test_unstable/conftest.py b/tests/test_unstable/conftest.py index 4ebf88b7..f75f1801 100644 --- a/tests/test_unstable/conftest.py +++ b/tests/test_unstable/conftest.py @@ -1,7 +1,7 @@ +import contextlib import gzip import json import os -from collections import Counter from collections.abc import Callable, Generator, Iterator from threading import RLock from time import sleep, time @@ -10,6 +10,8 @@ import pytest import requests_mock +from prometheus_client import REGISTRY +from prometheus_client.core import Counter from cognite.client import CogniteClient from cognite.client.config import ClientConfig @@ -30,12 +32,31 @@ @pytest.fixture(autouse=True) def reset_singleton() -> Iterator[None]: """ - This fixture ensures that the _statestore_singleton class - variable is reset, providing test isolation. + This fixture ensures that the _statestore_singleton and _metrics_singleton + class variables are reset, and Prometheus collectors are unregistered, + providing test isolation. """ + # Clean up before test Extractor._statestore_singleton = None + Extractor._metrics_singleton = None + + # Unregister all collectors to prevent "Duplicated timeseries" errors + collectors = list(REGISTRY._collector_to_names.keys()) + for collector in collectors: + with contextlib.suppress(Exception): + REGISTRY.unregister(collector) + yield + + # Clean up after test Extractor._statestore_singleton = None + Extractor._metrics_singleton = None + + # Unregister all collectors again + collectors = list(REGISTRY._collector_to_names.keys()) + for collector in collectors: + with contextlib.suppress(Exception): + REGISTRY.unregister(collector) @pytest.fixture(autouse=True) diff --git a/tests/test_unstable/test_base.py b/tests/test_unstable/test_base.py index 1f8527d8..68348f67 100644 --- a/tests/test_unstable/test_base.py +++ b/tests/test_unstable/test_base.py @@ -303,10 +303,11 @@ def counting_push(self: CognitePusher) -> None: application_config=app_config, current_config_revision=1, log_level_override=override_level, + metrics_class=TestMetrics, ) worker = get_checkin_worker(connection_config) - extractor = TestExtractor(full_config, worker, metrics=TestMetrics) - assert isinstance(extractor._metrics, TestMetrics) or extractor._metrics == TestMetrics + extractor = TestExtractor(full_config, worker) + assert isinstance(extractor.metrics, TestMetrics) with contextlib.ExitStack() as stack: stack.enter_context(contextlib.suppress(Exception)) diff --git a/tests/test_unstable/test_runtime.py b/tests/test_unstable/test_runtime.py index 5536916a..fc42044a 100644 --- a/tests/test_unstable/test_runtime.py +++ b/tests/test_unstable/test_runtime.py @@ -20,9 +20,35 @@ from cognite.extractorutils.unstable.core.base import ConfigRevision, FullConfig from cognite.extractorutils.unstable.core.checkin_worker import CheckinWorker from cognite.extractorutils.unstable.core.runtime import Runtime +from cognite.extractorutils.unstable.core.tasks import StartupTask, TaskContext from test_unstable.conftest import TestConfig, TestExtractor, TestMetrics +class MetricsTestExtractor(SimpleExtractor): + """Custom extractor for testing metrics in multiprocessing context.""" + + def __init_tasks__(self) -> None: + super().__init_tasks__() + + def test_metrics_task(context: TaskContext) -> None: + # Increment counter twice + self.metrics.a_counter.inc() + self.metrics.a_counter.inc() + + # Log the counter value so we can verify it in output + counter_value = self.metrics.a_counter._value.get() + context.info(f"METRICS_TEST: Counter value is {counter_value}") + + # Add startup task to test metrics + self.add_task( + StartupTask( + name="test-metrics", + description="Test metrics increment", + target=test_metrics_task, + ) + ) + + @pytest.fixture def local_config_file() -> Generator[Path, None, None]: file = Path(__file__).parent.parent.parent / f"test-{randint(0, 1000000)}.yaml" @@ -396,11 +422,83 @@ def test_logging_on_windows_with_import_error( assert mock_root_logger.addHandler.call_count == 1 -def test_extractor_with_metrics() -> None: - runtime = Runtime(TestExtractor, metrics=TestMetrics) - assert isinstance(runtime._metrics, TestMetrics) or runtime._metrics == TestMetrics +def test_extractor_with_metrics( + connection_config: ConnectionConfig, tmp_path: Path, monkeypatch: MonkeyPatch, capfd: pytest.CaptureFixture[str] +) -> None: + """ + Test metrics_class is properly passed through Runtime to child process. + This test verifies multiprocessing integration with metrics and counter increments. + """ + cfg_dir = Path("cognite/examples/unstable/extractors/simple_extractor/config") + base_conn = cfg_dir / "connection_config.yaml" + base_app = cfg_dir / "config.yaml" + + conn_file = tmp_path / f"test-{randint(0, 1000000)}-connection_config.yaml" + _write_conn_from_fixture(base_conn, conn_file, connection_config) + + app_file = tmp_path / f"test-{randint(0, 1000000)}-config.yaml" + app_file.write_text(base_app.read_text(encoding="utf-8")) + + argv = [ + "simple-extractor", + "--cwd", + str(tmp_path), + "-c", + conn_file.name, + "-f", + app_file.name, + "--skip-init-checks", + "-l", + "info", + ] - # The metrics instance should be a singleton - another_runtime = Runtime(TestExtractor, metrics=TestMetrics) - assert another_runtime._metrics is runtime._metrics - assert isinstance(another_runtime._metrics, TestMetrics) or another_runtime._metrics == TestMetrics + monkeypatch.setattr(sys, "argv", argv) + + runtime = Runtime(MetricsTestExtractor, metrics=TestMetrics) + + # Verify runtime stores metrics class + assert runtime._metrics_class is TestMetrics, "Runtime should store TestMetrics class" + + child_holder = {} + original_spawn = Runtime._spawn_extractor + + def spy_spawn(self: Self, config: FullConfig, checkin_worker: CheckinWorker) -> Process: + assert config.metrics_class is TestMetrics, "FullConfig should carry TestMetrics class" + + p = original_spawn( + self, + config, + checkin_worker, + ) + child_holder["proc"] = p + return p + + monkeypatch.setattr(Runtime, "_spawn_extractor", spy_spawn, raising=True) + + t = Thread(target=runtime.run, name="RuntimeMain") + t.start() + + start = time.time() + while "proc" not in child_holder and time.time() - start < 10: + time.sleep(0.05) + + assert "proc" in child_holder, "Extractor process was not spawned in time." + proc = child_holder["proc"] + + time.sleep(1.5) # Give more time for the startup task to run + + runtime._cancellation_token.cancel() + + t.join(timeout=30) + assert not t.is_alive(), "Runtime did not shut down within timeout after cancellation." + + proc.join(timeout=0) + assert not proc.is_alive(), "Extractor process is still alive" + + out, err = capfd.readouterr() + combined = (out or "") + (err or "") + + # Verify metrics counter was incremented + assert "METRICS_TEST: Counter value is 2" in combined, ( + f"Expected metrics counter to be 2 in child process.\nCaptured output:\n{combined}" + ) From 44b90c00b04e887d5f671024a659f40c63ed84e8 Mon Sep 17 00:00:00 2001 From: Hemant Mishra Date: Mon, 8 Dec 2025 18:07:42 +0530 Subject: [PATCH 2/3] Fix minor issues --- cognite/extractorutils/unstable/core/base.py | 27 +++++++++++++------- tests/test_unstable/conftest.py | 3 +++ 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index 771561f1..e5383100 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -22,6 +22,11 @@ class MyConfig(ExtractorConfig): another_parameter: int schedule: ScheduleConfig + class MyMetrics(BaseMetrics): + def __init__(self, extractor_name: str, extractor_version: str): + super().__init__(extractor_name, extractor_version) + self.custom_counter = Counter("custom_counter", "A custom counter") + class MyExtractor(Extractor[MyConfig]): NAME = "My Extractor" EXTERNAL_ID = "my-extractor" @@ -30,6 +35,9 @@ class MyExtractor(Extractor[MyConfig]): CONFIG_TYPE = MyConfig + # Override metrics type annotation for IDE support + metrics: MyMetrics + def __init_tasks__(self) -> None: self.add_task( ScheduledTask( @@ -42,6 +50,8 @@ def __init_tasks__(self) -> None: def my_task_function(self, task_context: TaskContext) -> None: task_context.logger.info("Running my task") + # IDE will now autocomplete custom_counter + self.metrics.custom_counter.inc() """ import logging @@ -123,7 +133,7 @@ def __init__( self.application_config = application_config self.current_config_revision: ConfigRevision = current_config_revision self.log_level_override = log_level_override - self.metrics_class = metrics_class + self.metrics_class: type[BaseMetrics] | None = metrics_class class Extractor(Generic[ConfigType], CogniteLogger): @@ -148,7 +158,7 @@ class Extractor(Generic[ConfigType], CogniteLogger): RESTART_POLICY: RestartPolicy = WHEN_CONTINUOUS_TASKS_CRASHES USE_DEFAULT_STATE_STORE: bool = True _statestore_singleton: AbstractStateStore | None = None - _metrics_singleton: BaseMetrics + _metrics_singleton: BaseMetrics | None = None cancellation_token: CancellationToken @@ -271,16 +281,15 @@ def _load_metrics(self, metrics_class: type[BaseMetrics] | None = None) -> BaseM Reuses existing singleton if available to avoid Prometheus registry conflicts. """ if Extractor._metrics_singleton is not None: - self.metrics = Extractor._metrics_singleton - return self.metrics + return Extractor._metrics_singleton - if metrics_class: - self.metrics = safe_get(metrics_class) + if metrics_class and issubclass(metrics_class, BaseMetrics): + metrics_instance = safe_get(metrics_class) else: - self.metrics = BaseMetrics(extractor_name=self.EXTERNAL_ID, extractor_version=self.VERSION) + metrics_instance = safe_get(BaseMetrics, extractor_name=self.EXTERNAL_ID, extractor_version=self.VERSION) - Extractor._metrics_singleton = self.metrics - return self.metrics + Extractor._metrics_singleton = metrics_instance + return metrics_instance @classmethod def get_current_metrics(cls) -> BaseMetrics: diff --git a/tests/test_unstable/conftest.py b/tests/test_unstable/conftest.py index f75f1801..817dc42f 100644 --- a/tests/test_unstable/conftest.py +++ b/tests/test_unstable/conftest.py @@ -16,6 +16,7 @@ from cognite.client import CogniteClient from cognite.client.config import ClientConfig from cognite.client.credentials import OAuthClientCredentials +from cognite.extractorutils import metrics from cognite.extractorutils.metrics import BaseMetrics from cognite.extractorutils.unstable.configuration.models import ( ConnectionConfig, @@ -39,6 +40,7 @@ class variables are reset, and Prometheus collectors are unregistered, # Clean up before test Extractor._statestore_singleton = None Extractor._metrics_singleton = None + metrics._metrics_singularities.clear() # Unregister all collectors to prevent "Duplicated timeseries" errors collectors = list(REGISTRY._collector_to_names.keys()) @@ -51,6 +53,7 @@ class variables are reset, and Prometheus collectors are unregistered, # Clean up after test Extractor._statestore_singleton = None Extractor._metrics_singleton = None + metrics._metrics_singularities.clear() # Unregister all collectors again collectors = list(REGISTRY._collector_to_names.keys()) From f203f39ce01d684725269308a24d6179798504a4 Mon Sep 17 00:00:00 2001 From: Hemant Mishra Date: Mon, 8 Dec 2025 18:58:34 +0530 Subject: [PATCH 3/3] Test Fixes --- cognite/extractorutils/unstable/core/base.py | 20 ---------- tests/tests_unit/test_metrics.py | 42 +++++++++++++++++++- 2 files changed, 41 insertions(+), 21 deletions(-) diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index e5383100..c7aac912 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -280,32 +280,12 @@ def _load_metrics(self, metrics_class: type[BaseMetrics] | None = None) -> BaseM Reuses existing singleton if available to avoid Prometheus registry conflicts. """ - if Extractor._metrics_singleton is not None: - return Extractor._metrics_singleton - if metrics_class and issubclass(metrics_class, BaseMetrics): metrics_instance = safe_get(metrics_class) else: metrics_instance = safe_get(BaseMetrics, extractor_name=self.EXTERNAL_ID, extractor_version=self.VERSION) - - Extractor._metrics_singleton = metrics_instance return metrics_instance - @classmethod - def get_current_metrics(cls) -> BaseMetrics: - """ - Get the current metrics singleton. - - Returns: - The current metrics singleton - - Raises: - ValueError: If no metrics singleton has been created, meaning no metrics have been initialized. - """ - if Extractor._metrics_singleton is None: - raise ValueError("No metrics singleton created. Have metrics been initialized?") - return Extractor._metrics_singleton - def _load_state_store(self) -> None: """ Searches through the config object for a StateStoreConfig. diff --git a/tests/tests_unit/test_metrics.py b/tests/tests_unit/test_metrics.py index d94439df..192b8ffe 100644 --- a/tests/tests_unit/test_metrics.py +++ b/tests/tests_unit/test_metrics.py @@ -12,13 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import contextlib import time +from collections.abc import Generator from types import ModuleType from unittest.mock import Mock, patch import arrow import pytest from prometheus_client import Gauge +from prometheus_client.core import REGISTRY from cognite.client import CogniteClient from cognite.client.data_classes import Asset @@ -36,6 +39,43 @@ def altered_metrics() -> ModuleType: return altered_metrics +@pytest.fixture(autouse=True, scope="function") +def cleanup_registry() -> Generator[None, None, None]: + """ + Cleanup fixture that removes extractor-specific metrics from REGISTRY after each test. + + This prevents metric pollution between tests. We keep Python default metrics + (python_gc_*, python_info) and the test gauge, but remove metrics from BaseMetrics + and upload queue metrics that pollute the registry. + """ + yield + + from prometheus_client import Metric + + metrics_to_remove = [] + for collector in list(REGISTRY._collector_to_names.keys()): + with contextlib.suppress(Exception): + for metric in collector.collect(): + if (isinstance(metric, Metric) and metric.name.startswith("cognite_")) or ( + ( + metric.name.endswith("_start_time") + or metric.name.endswith("_finish_time") + or metric.name.endswith("_num_threads") + or metric.name.endswith("_memory_bytes") + or metric.name.endswith("_memory_bytes_available") + or metric.name.endswith("_cpu_percent") + or metric.name.endswith("_info") + ) + and not metric.name.startswith("python_") + ): + metrics_to_remove.append(collector) + break + + for collector in metrics_to_remove: + with contextlib.suppress(Exception): + REGISTRY.unregister(collector) + + # For testing CognitePusher class GaugeSetUp: gauge = Gauge("gauge", "Test gauge") @@ -188,7 +228,7 @@ def test_push(MockCogniteClient: Mock) -> None: client.time_series.data.insert_multiple.assert_called_once() for time_series in client.time_series.data.insert_multiple.call_args_list[0][0][0]: - if time_series["externalId"] == "pre_gauge": + if time_series.get("externalId") == "pre_gauge": ts = time_series break