diff --git a/cognite/extractorutils/metrics.py b/cognite/extractorutils/metrics.py index 217f6e1e..2313a11c 100644 --- a/cognite/extractorutils/metrics.py +++ b/cognite/extractorutils/metrics.py @@ -53,14 +53,12 @@ def __init__(self): from prometheus_client.exposition import basic_auth_handler, delete_from_gateway, pushadd_to_gateway from cognite.client import CogniteClient -from cognite.client.data_classes import Asset, Datapoints, DatapointsArray, TimeSeries -from cognite.client.data_classes.data_modeling import NodeId +from cognite.client.data_classes import Asset, TimeSeries from cognite.client.exceptions import CogniteDuplicatedError from cognite.extractorutils.threading import CancellationToken +from cognite.extractorutils.uploader.time_series import DataPointList, TimeSeriesUploadQueue from cognite.extractorutils.util import EitherId -from .util import ensure_time_series - _metrics_singularities = {} @@ -359,70 +357,104 @@ def __init__( self.asset = asset self.external_id_prefix = external_id_prefix self.data_set = data_set + self._asset_id: int | None = None + self._data_set_id: int | None = None self._init_cdf() + self.upload_queue = TimeSeriesUploadQueue( + cdf_client=cdf_client, + create_missing=self._create_missing_timeseries_factory, + data_set_id=self._data_set_id, + cancellation_token=cancellation_token, + ) + self._cdf_project = cdf_client.config.project def _init_cdf(self) -> None: """ - Initialize the CDF tenant with the necessary time series and asset. - """ - time_series: list[TimeSeries] = [] + Initialize the CDF tenant with the necessary asset and dataset. + Timeseries are created automatically by TimeSeriesUploadQueue when datapoints are pushed. + """ if self.asset is not None: - # Ensure that asset exist, and retrieve internal ID + # Ensure that asset exists, and retrieve internal ID asset: Asset | None try: asset = self.cdf_client.assets.create(self.asset) except CogniteDuplicatedError: asset = self.cdf_client.assets.retrieve(external_id=self.asset.external_id) - asset_id = asset.id if asset is not None else None - - else: - asset_id = None + self._asset_id = asset.id if asset is not None else None - data_set_id = None if self.data_set: dataset = self.cdf_client.data_sets.retrieve( id=self.data_set.internal_id, external_id=self.data_set.external_id ) if dataset: - data_set_id = dataset.id + self._data_set_id = dataset.id - for metric in REGISTRY.collect(): - if type(metric) is Metric and metric.type in ["gauge", "counter"]: - external_id = self.external_id_prefix + metric.name + def _create_missing_timeseries_factory(self, external_id: str, datapoints: DataPointList) -> TimeSeries: + """ + Factory function to create missing timeseries. - time_series.append( - TimeSeries( - external_id=external_id, - name=metric.name, - legacy_name=external_id, - description=metric.documentation, - asset_id=asset_id, - data_set_id=data_set_id, - ) - ) + Args: + external_id: External ID of the timeseries to create + datapoints: List of datapoints that triggered the creation + + Returns: + A TimeSeries object + """ + metric_name = external_id[len(self.external_id_prefix) :] - ensure_time_series(self.cdf_client, time_series) + metric_description = "" + for metric in REGISTRY.collect(): + if isinstance(metric, Metric) and metric.name == metric_name: + metric_description = metric.documentation + break + + is_string = ( + isinstance(datapoints[0].get("value"), str) + if isinstance(datapoints[0], dict) + else isinstance(datapoints[0][1], str) + ) + + return TimeSeries( + external_id=external_id, + name=metric_name, + legacy_name=external_id, + description=metric_description, + asset_id=self._asset_id, + data_set_id=self._data_set_id, + is_string=is_string, + ) def _push_to_server(self) -> None: """ - Create datapoints an push them to their respective time series. + Create datapoints and push them to their respective time series using TimeSeriesUploadQueue. + + The queue will automatically create missing timeseries for late-registered metrics. """ timestamp = int(arrow.get().float_timestamp * 1000) - datapoints: list[dict[str, str | int | list[Any] | Datapoints | DatapointsArray | NodeId]] = [] - for metric in REGISTRY.collect(): if isinstance(metric, Metric) and metric.type in ["gauge", "counter"]: if len(metric.samples) == 0: continue external_id = self.external_id_prefix + metric.name - datapoints.append({"externalId": external_id, "datapoints": [(timestamp, metric.samples[0].value)]}) - self.cdf_client.time_series.data.insert_multiple(datapoints) + self.upload_queue.add_to_upload_queue( + external_id=external_id, datapoints=[(timestamp, metric.samples[0].value)] + ) + + self.upload_queue.upload() self.logger.debug("Pushed metrics to CDF tenant '%s'", self._cdf_project) + + def stop(self) -> None: + """ + Stop the push loop and ensure all metrics are uploaded. + """ + self._push_to_server() + self.upload_queue.stop() + self.cancellation_token.cancel() diff --git a/tests/tests_integration/test_metrics_integration.py b/tests/tests_integration/test_metrics_integration.py new file mode 100644 index 00000000..3f97967a --- /dev/null +++ b/tests/tests_integration/test_metrics_integration.py @@ -0,0 +1,304 @@ +""" +Integration tests for CognitePusher with late-registered metrics. + +This test verifies that CognitePusher correctly handles metrics that are registered +after initialization (like python_gc_* and python_info metrics from Prometheus). +""" + +import logging +import time +import uuid +from collections.abc import Callable, Generator +from typing import Any + +import pytest +from prometheus_client import Counter, Gauge +from prometheus_client.core import REGISTRY + +from cognite.client import CogniteClient +from cognite.client.exceptions import CogniteNotFoundError +from cognite.extractorutils.metrics import CognitePusher + +logger = logging.getLogger(__name__) + + +def poll_for_condition(condition: Callable[[], bool], timeout: int = 10, interval: float = 0.5) -> None: + """ + Poll a condition function until it returns True or timeout is reached. + + Args: + condition: A callable that returns True when the condition is met + timeout: Maximum time to wait in seconds + interval: Time to wait between checks in seconds + """ + start_time = time.time() + while time.time() - start_time < timeout: + if condition(): + return + time.sleep(interval) + pytest.fail(f"Condition not met within {timeout} seconds.") + + +def timeseries_exist(client: CogniteClient, external_ids: list[str]) -> bool: + """ + Check if all specified timeseries exist in CDF. + + Args: + client: CogniteClient instance + external_ids: List of external IDs to check + + Returns: + True if all timeseries exist, False otherwise + """ + try: + client.time_series.retrieve_multiple(external_ids=external_ids, ignore_unknown_ids=False) + return True + except CogniteNotFoundError: + return False + + +def assert_timeseries_exists( + client: CogniteClient, + external_id: str, + expected_name: str | None = None, + expected_description: str | None = None, +) -> None: + """ + Assert that a timeseries exists in CDF with the expected properties. + + Args: + client: CogniteClient instance + external_id: External ID of the timeseries + expected_name: Expected name of the timeseries (optional) + expected_description: Expected description of the timeseries (optional) + """ + ts = client.time_series.retrieve(external_id=external_id) + assert ts is not None, f"Timeseries {external_id} was not created" + if expected_name is not None: + assert ts.name == expected_name, f"Expected name '{expected_name}', got '{ts.name}'" + if expected_description is not None: + assert ts.description == expected_description, ( + f"Expected description '{expected_description}', got '{ts.description}'" + ) + + +def assert_datapoint_value( + client: CogniteClient, + external_id: str, + expected_value: float, +) -> None: + """ + Assert that a timeseries has datapoints with the expected value. + + Args: + client: CogniteClient instance + external_id: External ID of the timeseries + expected_value: Expected value of the first datapoint + """ + datapoints = client.time_series.data.retrieve(external_id=external_id, start="1h-ago", end="now", limit=10) + assert len(datapoints) > 0, f"No datapoints found for timeseries {external_id}" + assert datapoints.value[0] == pytest.approx(expected_value), ( + f"Expected value {expected_value}, got {datapoints.value[0]}" + ) + + +@pytest.fixture +def test_prefix() -> str: + """Generate a unique prefix for this test run to avoid conflicts.""" + test_id = uuid.uuid4().hex[:8] + return f"integration_test_{test_id}_" + + +@pytest.fixture +def metrics_registry() -> Generator[Callable[[Any], Any], None, None]: + """ + Fixture that tracks and cleans up Prometheus metrics. + + Ensures metrics are unregistered even if the test fails. + """ + metrics_to_unregister: list[Any] = [] + + def _register(metric: Any) -> Any: + metrics_to_unregister.append(metric) + return metric + + yield _register + + for metric in metrics_to_unregister: + try: + REGISTRY.unregister(metric) + except KeyError: + logger.debug("Metric %s was already unregistered", metric) + + +@pytest.fixture +def cognite_pusher_test( + set_client: CogniteClient, test_prefix: str +) -> Generator[tuple[CogniteClient, str, list[str]], None, None]: + """ + Fixture that sets up and tears down a CognitePusher test. + + Yields: + Tuple of (client, test_prefix, list of created timeseries external_ids) + """ + client = set_client + created_external_ids: list[str] = [] + + yield client, test_prefix, created_external_ids + + if created_external_ids: + try: + client.time_series.delete(external_id=created_external_ids, ignore_unknown_ids=True) + except Exception as e: + logger.warning("Failed to cleanup timeseries: %s", e) + + +def test_cognite_pusher_with_late_registered_metrics( + cognite_pusher_test: tuple[CogniteClient, str, list[str]], + metrics_registry: Callable[[Any], Any], +) -> None: + """ + Test that CognitePusher handles both early and late-registered metrics. + + This simulates the real-world scenario where: + 1. Some metrics (like extractor-specific metrics) are registered before CognitePusher init + 2. Other metrics (like python_gc_*, python_info) are registered after initialization + 3. All metrics should be uploaded correctly during push + """ + client, test_prefix, created_external_ids = cognite_pusher_test + + early_gauge_name = f"early_gauge_{uuid.uuid4().hex[:8]}" + early_gauge = metrics_registry(Gauge(early_gauge_name, "A metric registered before CognitePusher init")) + early_gauge.set(42.0) + + early_external_id = test_prefix + early_gauge_name + created_external_ids.append(early_external_id) + + pusher = CognitePusher( + cdf_client=client, + external_id_prefix=test_prefix, + push_interval=60, + ) + + late_gauge_name = f"late_gauge_{uuid.uuid4().hex[:8]}" + late_gauge = metrics_registry( + Gauge(late_gauge_name, "A metric registered AFTER CognitePusher init (like python_gc)") + ) + late_gauge.set(99.0) + + late_counter_name = f"late_counter_{uuid.uuid4().hex[:8]}" + late_counter = metrics_registry( + Counter(late_counter_name, "A counter registered AFTER CognitePusher init (like python_info)") + ) + late_counter.inc(5) + + late_gauge_external_id = test_prefix + late_gauge_name + late_counter_external_id = test_prefix + late_counter_name + created_external_ids.append(late_gauge_external_id) + created_external_ids.append(late_counter_external_id) + + # This should create timeseries for ALL metrics (early + late) + pusher._push_to_server() + + poll_for_condition( + lambda: timeseries_exist(client, [early_external_id, late_gauge_external_id, late_counter_external_id]) + ) + + assert_timeseries_exists( + client, early_external_id, early_gauge_name, "A metric registered before CognitePusher init" + ) + assert_timeseries_exists( + client, late_gauge_external_id, late_gauge_name, "A metric registered AFTER CognitePusher init (like python_gc)" + ) + assert_timeseries_exists(client, late_counter_external_id, late_counter_name) + + assert_datapoint_value(client, early_external_id, 42.0) + assert_datapoint_value(client, late_gauge_external_id, 99.0) + assert_datapoint_value(client, late_counter_external_id, 5.0) + + pusher.stop() + + +def test_cognite_pusher_stop_uploads_late_metrics( + cognite_pusher_test: tuple[CogniteClient, str, list[str]], + metrics_registry: Callable[[Any], Any], +) -> None: + """ + Test that stop() correctly uploads all metrics including late-registered ones. + + This is the scenario where: + 1. CognitePusher is initialized + 2. Metrics are registered after + 3. stop() is called during shutdown + 4. All metrics (including late ones) should be uploaded + """ + client, test_prefix, created_external_ids = cognite_pusher_test + + pusher = CognitePusher( + cdf_client=client, + external_id_prefix=test_prefix, + push_interval=60, + ) + + late_metric_name = f"shutdown_metric_{uuid.uuid4().hex[:8]}" + late_metric = metrics_registry(Gauge(late_metric_name, "A metric registered after init, uploaded during shutdown")) + late_metric.set(123.0) + + late_external_id = test_prefix + late_metric_name + created_external_ids.append(late_external_id) + + pusher.stop() + + poll_for_condition(lambda: timeseries_exist(client, [late_external_id])) + + assert_timeseries_exists(client, late_external_id) + assert_datapoint_value(client, late_external_id, 123.0) + + +def test_cognite_pusher_multiple_pushes_with_late_metrics( + cognite_pusher_test: tuple[CogniteClient, str, list[str]], + metrics_registry: Callable[[Any], Any], +) -> None: + """ + Test that multiple pushes work correctly with late-registered metrics. + + Scenario: + 1. Push with some metrics + 2. Register new metrics + 3. Push again - new metrics should be created and uploaded + """ + client, test_prefix, created_external_ids = cognite_pusher_test + + initial_metric_name = f"initial_{uuid.uuid4().hex[:8]}" + initial_metric = metrics_registry(Gauge(initial_metric_name, "Initial metric")) + initial_metric.set(10.0) + + initial_external_id = test_prefix + initial_metric_name + created_external_ids.append(initial_external_id) + + pusher = CognitePusher( + cdf_client=client, + external_id_prefix=test_prefix, + push_interval=60, + ) + + pusher._push_to_server() + poll_for_condition(lambda: timeseries_exist(client, [initial_external_id])) + + assert_timeseries_exists(client, initial_external_id) + + late_metric_name = f"later_{uuid.uuid4().hex[:8]}" + late_metric = metrics_registry(Gauge(late_metric_name, "Late metric added between pushes")) + late_metric.set(20.0) + + late_external_id = test_prefix + late_metric_name + created_external_ids.append(late_external_id) + + initial_metric.set(11.0) + pusher._push_to_server() + poll_for_condition(lambda: timeseries_exist(client, [late_external_id])) + + assert_timeseries_exists(client, late_external_id) + assert_datapoint_value(client, late_external_id, 20.0) + + pusher.stop() diff --git a/tests/tests_unit/test_metrics.py b/tests/tests_unit/test_metrics.py index 362d188e..d94439df 100644 --- a/tests/tests_unit/test_metrics.py +++ b/tests/tests_unit/test_metrics.py @@ -21,7 +21,7 @@ from prometheus_client import Gauge from cognite.client import CogniteClient -from cognite.client.data_classes import Asset, TimeSeries +from cognite.client.data_classes import Asset from cognite.client.exceptions import CogniteDuplicatedError, CogniteNotFoundError from cognite.extractorutils import metrics from cognite.extractorutils.metrics import CognitePusher, safe_get @@ -121,9 +121,9 @@ def test_clear(altered_metrics: ModuleType) -> None: # CognitePusher test @patch("cognite.client.CogniteClient") def test_init_empty_cdf(MockCogniteClient: Mock) -> None: + """Test that initialization sets up asset but doesn't create timeseries (created on-demand).""" init_gauge() client = MockCogniteClient() - client.time_series.retrieve_multiple = Mock(side_effect=CogniteNotFoundError([{"externalId": "pre_gauge"}])) return_asset = Asset(id=123, external_id="asset", name="asset") new_asset = Asset(external_id="asset", name="asset") @@ -132,26 +132,16 @@ def test_init_empty_cdf(MockCogniteClient: Mock) -> None: pusher = CognitePusher(client, external_id_prefix="pre_", asset=new_asset, push_interval=1) - # Assert time series created - # Hacky assert_called_once_with as the TimeSeries object is not the same obj, just equal content - client.time_series.create.assert_called_once() - print(client.time_series.create.call_args_list) - assert ( - client.time_series.create.call_args_list[0][0][0][0].dump() - == TimeSeries( - external_id="pre_gauge", name="gauge", legacy_name="pre_gauge", description="Test gauge", asset_id=123 - ).dump() - ) - - # Assert asset created + # Assert asset created and asset_id was set client.assets.create.assert_called_once_with(new_asset) + assert pusher._asset_id == 123 @patch("cognite.client.CogniteClient") def test_init_existing_asset(MockCogniteClient: Mock) -> None: + """Test that initialization retrieves existing asset.""" init_gauge() client = MockCogniteClient() - client.time_series.retrieve_multiple = Mock(side_effect=CogniteNotFoundError([{"externalId": "pre_gauge"}])) return_asset = Asset(id=123, external_id="assetid", name="asset") new_asset = Asset(external_id="assetid", name="asset") @@ -161,23 +151,10 @@ def test_init_existing_asset(MockCogniteClient: Mock) -> None: pusher = CognitePusher(client, external_id_prefix="pre_", asset=new_asset, push_interval=1) - # Assert time series created - # Hacky assert_called_once_with as the TimeSeries object is not the same obj, just equal content - client.time_series.create.assert_called_once() - assert ( - client.time_series.create.call_args_list[0][0][0][0].dump() - == TimeSeries( - external_id="pre_gauge", - name="gauge", - legacy_name="pre_gauge", - description="Test gauge", - asset_id=123, - ).dump() - ) - - # Assert asset created + # Assert asset retrieved client.assets.create.assert_called_once_with(new_asset) client.assets.retrieve.assert_called_once_with(external_id="assetid") + assert pusher._asset_id == 123 @patch("cognite.client.CogniteClient") @@ -221,6 +198,40 @@ def test_push(MockCogniteClient: Mock) -> None: assert ts["datapoints"][0][1] == pytest.approx(5) +@patch("cognite.client.CogniteClient") +def test_push_creates_missing_timeseries(MockCogniteClient: Mock) -> None: + """Test that push logic creates missing time series when enabled.""" + init_gauge() + client: CogniteClient = MockCogniteClient() + + # Create a mock CogniteNotFoundError with not_found and failed attributes + not_found_error = CogniteNotFoundError([{"externalId": "pre_gauge"}]) + not_found_error.not_found = [{"externalId": "pre_gauge"}] + not_found_error.failed = [] + + # Simulate CogniteNotFoundError on first push, then success on retry + client.time_series.data.insert_multiple.side_effect = [ + not_found_error, + None, # Success on retry + ] + + pusher = CognitePusher(client, "pre_", push_interval=1) + + GaugeSetUp.gauge.set(5) + pusher._push_to_server() + + # Assert that we tried to create the timeseries + client.time_series.create.assert_called_once() + created_ts_list = client.time_series.create.call_args[0][0] + assert len(created_ts_list) == 1 + assert created_ts_list[0].external_id == "pre_gauge" + assert created_ts_list[0].name == "gauge" + assert created_ts_list[0].description == "Test gauge" + + # Assert that insert_multiple was called twice (initial attempt + retry) + assert client.time_series.data.insert_multiple.call_count == 2 + + # MetricsUtils test @pytest.fixture def init_counter() -> None: