diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index a40190898..14db59575 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -86,6 +86,12 @@ from google.cloud.bigtable.data.row_filters import CellsRowLimitFilter from google.cloud.bigtable.data.row_filters import RowFilterChain from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController +from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import ( + BigtableMetricsExporter, +) +from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import ( + GoogleCloudMetricsHandler, +) from google.cloud.bigtable.data._metrics import OperationType from google.cloud.bigtable.data._cross_sync import CrossSync @@ -242,6 +248,12 @@ def __init__( "configured the universe domain explicitly, `googleapis.com` " "is the default." ) + # create a metrics exporter using the same client configuration + self._gcp_metrics_exporter = BigtableMetricsExporter( + project_id=self.project, + credentials=credentials, + client_options=client_options, + ) self._is_closed = CrossSync.Event() self.transport = cast(TransportType, self._gapic_client.transport) # keep track of active instances to for warmup on channel refresh @@ -970,8 +982,17 @@ def __init__( self.default_retryable_errors: Sequence[type[Exception]] = ( default_retryable_errors or () ) - - self._metrics = BigtableClientSideMetricsController() + self._metrics = BigtableClientSideMetricsController( + handlers=[ + GoogleCloudMetricsHandler( + exporter=client._gcp_metrics_exporter, + instance_id=instance_id, + table_id=table_id, + app_profile_id=app_profile_id, + client_version=client._client_version(), + ) + ] + ) try: self._register_instance_future = CrossSync.create_task( diff --git a/google/cloud/bigtable/data/_metrics/__init__.py b/google/cloud/bigtable/data/_metrics/__init__.py index 20d36d4c8..9baf179bc 100644 --- a/google/cloud/bigtable/data/_metrics/__init__.py +++ b/google/cloud/bigtable/data/_metrics/__init__.py @@ -11,6 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from google.cloud.bigtable.data._metrics.handlers.opentelemetry import ( + OpenTelemetryMetricsHandler, +) +from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import ( + GoogleCloudMetricsHandler, +) +from google.cloud.bigtable.data._metrics.handlers._stdout import _StdoutMetricsHandler from google.cloud.bigtable.data._metrics.metrics_controller import ( BigtableClientSideMetricsController, ) @@ -23,6 +30,9 @@ __all__ = ( "BigtableClientSideMetricsController", + "OpenTelemetryMetricsHandler", + "GoogleCloudMetricsHandler", + "_StdoutMetricsHandler", "OperationType", "ActiveOperationMetric", "ActiveAttemptMetric", diff --git a/google/cloud/bigtable/data/_metrics/handlers/gcp_exporter.py b/google/cloud/bigtable/data/_metrics/handlers/gcp_exporter.py new file mode 100644 index 000000000..99e0dfc24 --- /dev/null +++ b/google/cloud/bigtable/data/_metrics/handlers/gcp_exporter.py @@ -0,0 +1,272 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import time + +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics import view +from opentelemetry.sdk.metrics.export import ( + HistogramDataPoint, + MetricExporter, + MetricExportResult, + MetricsData, + NumberDataPoint, + PeriodicExportingMetricReader, +) +from google.protobuf.timestamp_pb2 import Timestamp +from google.api.distribution_pb2 import Distribution +from google.api.metric_pb2 import Metric as GMetric +from google.api.monitored_resource_pb2 import MonitoredResource +from google.api.metric_pb2 import MetricDescriptor +from google.api_core import gapic_v1 +from google.cloud.monitoring_v3 import ( + CreateTimeSeriesRequest, + MetricServiceClient, + Point, + TimeInterval, + TimeSeries, + TypedValue, +) + +from google.cloud.bigtable.data._metrics.handlers.opentelemetry import ( + OpenTelemetryMetricsHandler, +) +from google.cloud.bigtable.data._metrics.handlers.opentelemetry import ( + _OpenTelemetryInstruments, +) + + +# create OpenTelemetry views for Bigtable metrics +# avoid reformatting into individual lines +# fmt: off +MILLIS_AGGREGATION = view.ExplicitBucketHistogramAggregation( + [ + 0, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, + 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, + 800, 1_000, 2_000, 5_000, 10_000, 20_000, 50_000, 100_000, + 200_000, 400_000, 800_000, 1_600_000, 3_200_000 + ] +) +# fmt: on +COUNT_AGGREGATION = view.SumAggregation() +INSTRUMENT_NAMES = ( + "operation_latencies", + "first_response_latencies", + "attempt_latencies", + "retry_count", + "server_latencies", + "connectivity_error_count", + "application_latencies", + "throttling_latencies", +) +VIEW_LIST = [ + view.View( + instrument_name=n, + name=n, + aggregation=MILLIS_AGGREGATION + if n.endswith("latencies") + else COUNT_AGGREGATION, + ) + for n in INSTRUMENT_NAMES +] + + +class GoogleCloudMetricsHandler(OpenTelemetryMetricsHandler): + """ + Maintains an internal set of OpenTelemetry metrics for the Bigtable client library, + and periodically exports them to Google Cloud Monitoring. + + The OpenTelemetry metrics that are tracked are as follows: + - operation_latencies: latency of each client method call, over all of it's attempts. + - first_response_latencies: latency of receiving the first row in a ReadRows operation. + - attempt_latencies: latency of each client attempt RPC. + - retry_count: Number of additional RPCs sent after the initial attempt. + - server_latencies: latency recorded on the server side for each attempt. + - connectivity_error_count: number of attempts that failed to reach Google's network. + - application_latencies: the time spent waiting for the application to process the next response. + - throttling_latencies: latency introduced by waiting when there are too many outstanding requests in a bulk operation. + + Args: + exporter: The exporter object used to write metrics to Cloud Montitoring. + Should correspond 1:1 with a bigtable client, and share auth configuration + export_interval: The interval (in seconds) at which to export metrics to Cloud Monitoring. + *args: configuration positional arguments passed down to super class + *kwargs: configuration keyword arguments passed down to super class + """ + + def __init__(self, exporter, *args, export_interval=60, **kwargs): + # periodically executes exporter + gcp_reader = PeriodicExportingMetricReader( + exporter, export_interval_millis=export_interval * 1000 + ) + # use private meter provider to store instruments and views + self.meter_provider = MeterProvider( + metric_readers=[gcp_reader], views=VIEW_LIST + ) + otel = _OpenTelemetryInstruments(meter_provider=self.meter_provider) + super().__init__(*args, instruments=otel, **kwargs) + + def close(self): + self.meter_provider.shutdown() + + +class BigtableMetricsExporter(MetricExporter): + """ + OpenTelemetry Exporter implementation for sending metrics to Google Cloud Monitoring. + + We must use a custom exporter because the public one doesn't support writing to internal + metrics like `bigtable.googleapis.com/internal/client/` + + Each GoogleCloudMetricsHandler will maintain its own exporter instance associated with the + project_id it is configured with. + + Args: + project_id: GCP project id to associate metrics with + """ + + def __init__(self, project_id: str, *client_args, **client_kwargs): + super().__init__() + self.client = MetricServiceClient(*client_args, **client_kwargs) + self.prefix = "bigtable.googleapis.com/internal/client" + self.project_id = project_id + + def export( + self, metrics_data: MetricsData, timeout_millis: float = 10_000, **kwargs + ) -> MetricExportResult: + """ + Write a set of metrics to Cloud Monitoring. + This method is called by the OpenTelemetry SDK + """ + deadline = time.time() + (timeout_millis / 1000) + metric_kind = MetricDescriptor.MetricKind.CUMULATIVE + all_series: list[TimeSeries] = [] + # process each metric from OTel format into Cloud Monitoring format + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + for data_point in [ + pt for pt in metric.data.data_points if pt.attributes + ]: + if data_point.attributes: + monitored_resource = MonitoredResource( + type="bigtable_client_raw", + labels={ + "project_id": self.project_id, + "instance": data_point.attributes[ + "resource_instance" + ], + "cluster": data_point.attributes[ + "resource_cluster" + ], + "table": data_point.attributes["resource_table"], + "zone": data_point.attributes["resource_zone"], + }, + ) + point = self._to_point(data_point) + series = TimeSeries( + resource=monitored_resource, + metric_kind=metric_kind, + points=[point], + metric=GMetric( + type=f"{self.prefix}/{metric.name}", + labels={ + k: v + for k, v in data_point.attributes.items() + if not k.startswith("resource_") + }, + ), + unit=metric.unit, + ) + all_series.append(series) + # send all metrics to Cloud Monitoring + try: + self._batch_write(all_series, deadline) + return MetricExportResult.SUCCESS + except Exception: + return MetricExportResult.FAILURE + + def _batch_write( + self, series: list[TimeSeries], deadline=None, max_batch_size=200 + ) -> None: + """ + Adapted from CloudMonitoringMetricsExporter + https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82 + + Args: + series: list of TimeSeries to write. Will be split into batches if necessary + deadline: designates the time.time() at which to stop writing. If None, uses API default + max_batch_size: maximum number of time series to write at once. + Cloud Monitoring allows up to 200 per request + """ + write_ind = 0 + while write_ind < len(series): + # find time left for next batch + timeout = deadline - time.time() if deadline else gapic_v1.method.DEFAULT + # write next batch + self.client.create_service_time_series( + CreateTimeSeriesRequest( + name=f"projects/{self.project_id}", + time_series=series[write_ind : write_ind + max_batch_size], + ), + timeout=timeout, + ) + write_ind += max_batch_size + + @staticmethod + def _to_point(data_point: NumberDataPoint | HistogramDataPoint) -> Point: + """ + Adapted from CloudMonitoringMetricsExporter + https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82 + """ + if isinstance(data_point, HistogramDataPoint): + mean = data_point.sum / data_point.count if data_point.count else 0.0 + point_value = TypedValue( + distribution_value=Distribution( + count=data_point.count, + mean=mean, + bucket_counts=data_point.bucket_counts, + bucket_options=Distribution.BucketOptions( + explicit_buckets=Distribution.BucketOptions.Explicit( + bounds=data_point.explicit_bounds, + ) + ), + ) + ) + else: + if isinstance(data_point.value, int): + point_value = TypedValue(int64_value=data_point.value) + else: + point_value = TypedValue(double_value=data_point.value) + start_time = Timestamp() + start_time.FromNanoseconds(data_point.start_time_unix_nano) + end_time = Timestamp() + end_time.FromNanoseconds(data_point.time_unix_nano) + interval = TimeInterval(start_time=start_time, end_time=end_time) + return Point(interval=interval, value=point_value) + + def shutdown(self, timeout_millis: float = 30_000, **kwargs): + """ + Adapted from CloudMonitoringMetricsExporter + https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82 + """ + pass + + def force_flush(self, timeout_millis: float = 10_000): + """ + Adapted from CloudMonitoringMetricsExporter + https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82 + """ + return True diff --git a/google/cloud/bigtable/data/_metrics/handlers/opentelemetry.py b/google/cloud/bigtable/data/_metrics/handlers/opentelemetry.py new file mode 100644 index 000000000..b5cd92970 --- /dev/null +++ b/google/cloud/bigtable/data/_metrics/handlers/opentelemetry.py @@ -0,0 +1,240 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import os +import socket +import uuid + +from google.cloud.bigtable import __version__ as bigtable_version +from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler +from google.cloud.bigtable.data._metrics.data_model import OperationType +from google.cloud.bigtable.data._metrics.data_model import DEFAULT_CLUSTER_ID +from google.cloud.bigtable.data._metrics.data_model import DEFAULT_ZONE +from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric +from google.cloud.bigtable.data._metrics.data_model import CompletedAttemptMetric +from google.cloud.bigtable.data._metrics.data_model import CompletedOperationMetric + +# conversion factor for converting from nanoseconds to milliseconds +NS_TO_MS = 1e6 + + +class _OpenTelemetryInstruments: + """ + class that holds OpenTelelmetry instrument objects + """ + + def __init__(self, meter_provider=None): + if meter_provider is None: + # use global meter provider + from opentelemetry import metrics + + meter_provider = metrics + # grab meter for this module + meter = meter_provider.get_meter("bigtable.googleapis.com") + # create instruments + self.operation_latencies = meter.create_histogram( + name="operation_latencies", + description=""" + The total end-to-end latency across all RPC attempts associated with a Bigtable operation. + This metric measures an operation's round trip from the client to Bigtable and back to the client and includes all retries. + + For ReadRows requests, the operation latencies include the application processing time for each returned message. + """, + unit="ms", + ) + self.first_response_latencies = meter.create_histogram( + name="first_response_latencies", + description="Latencies from when a client sends a request and receives the first row of the response.", + unit="ms", + ) + self.attempt_latencies = meter.create_histogram( + name="attempt_latencies", + description=""" + The latencies of a client RPC attempt. + + Under normal circumstances, this value is identical to operation_latencies. + If the client receives transient errors, however, then operation_latencies is the sum of all attempt_latencies and the exponential delays. + """, + unit="ms", + ) + self.retry_count = meter.create_counter( + name="retry_count", + description=""" + A counter that records the number of attempts that an operation required to complete. + Under normal circumstances, this value is empty. + """, + ) + self.server_latencies = meter.create_histogram( + name="server_latencies", + description="Latencies between the time when the Google frontend receives an RPC and when it sends the first byte of the response.", + unit="ms", + ) + self.connectivity_error_count = meter.create_counter( + name="connectivity_error_count", + description=""" + The number of requests that failed to reach Google's network. + In normal cases, this number is 0. When the number is not 0, it can indicate connectivity issues between the application and the Google network. + """, + ) + self.application_latencies = meter.create_histogram( + name="application_latencies", + description=""" + The time from when the client receives the response to a request until the application reads the response. + This metric is most relevant for ReadRows requests. + The start and stop times for this metric depend on the way that you send the read request; see Application blocking latencies timer examples for details. + """, + unit="ms", + ) + self.throttling_latencies = meter.create_histogram( + name="throttling_latencies", + description="Latencies introduced when the client blocks the sending of more requests to the server because of too many pending requests in a bulk operation.", + unit="ms", + ) + + +class OpenTelemetryMetricsHandler(MetricsHandler): + """ + Maintains a set of OpenTelemetry metrics for the Bigtable client library, + and updates them with each completed operation and attempt. + + The OpenTelemetry metrics that are tracked are as follows: + - operation_latencies: latency of each client method call, over all of it's attempts. + - first_response_latencies: latency of receiving the first row in a ReadRows operation. + - attempt_latencies: latency of each client attempt RPC. + - retry_count: Number of additional RPCs sent after the initial attempt. + - server_latencies: latency recorded on the server side for each attempt. + - connectivity_error_count: number of attempts that failed to reach Google's network. + - application_latencies: the time spent waiting for the application to process the next response. + - throttling_latencies: latency introduced by waiting when there are too many outstanding requests in a bulk operation. + """ + + def __init__( + self, + *, + instance_id: str, + table_id: str, + app_profile_id: str | None = None, + client_uid: str | None = None, + client_version: str | None = None, + instruments: _OpenTelemetryInstruments = _OpenTelemetryInstruments(), + ): + super().__init__() + self.otel = instruments + client_version = client_version or bigtable_version + # fixed labels sent with each metric update + self.shared_labels = { + "client_name": f"python-bigtable/{client_version}", + "client_uid": client_uid or self._generate_client_uid(), + "resource_instance": instance_id, + "resource_table": table_id, + "app_profile": app_profile_id or "default", + } + + @staticmethod + def _generate_client_uid(): + """ + client_uid will take the format `python-@` where uuid is a + random value, pid is the process id, and hostname is the hostname of the machine. + + If not found, localhost will be used in place of hostname, and a random number + will be used in place of pid. + """ + try: + hostname = socket.gethostname() or "localhost" + except Exception: + hostname = "localhost" + try: + pid = os.getpid() or "" + except Exception: + pid = "" + return f"python-{uuid.uuid4()}-{pid}@{hostname}" + + def on_operation_complete(self, op: CompletedOperationMetric) -> None: + """ + Update the metrics associated with a completed operation: + - operation_latencies + - retry_count + - first_response_latencies + """ + labels = { + "method": op.op_type.value, + "status": op.final_status.name, + "resource_zone": op.zone, + "resource_cluster": op.cluster_id, + **self.shared_labels, + } + is_streaming = str(op.is_streaming) + + self.otel.operation_latencies.record( + op.duration_ns / NS_TO_MS, {"streaming": is_streaming, **labels} + ) + if ( + op.op_type == OperationType.READ_ROWS + and op.first_response_latency_ns is not None + ): + self.otel.first_response_latencies.record( + op.first_response_latency_ns / NS_TO_MS, labels + ) + # only record completed attempts if there were retries + if op.completed_attempts: + self.otel.retry_count.add(len(op.completed_attempts) - 1, labels) + + def on_attempt_complete( + self, attempt: CompletedAttemptMetric, op: ActiveOperationMetric + ): + """ + Update the metrics associated with a completed attempt: + - attempt_latencies + - server_latencies + - connectivity_error_count + - application_latencies + - throttling_latencies + """ + labels = { + "method": op.op_type.value, + "resource_zone": op.zone or DEFAULT_ZONE, # fallback to default if unset + "resource_cluster": op.cluster_id or DEFAULT_CLUSTER_ID, + **self.shared_labels, + } + status = attempt.end_status.name + is_streaming = str(op.is_streaming) + + self.otel.attempt_latencies.record( + attempt.duration_ns / NS_TO_MS, + {"streaming": is_streaming, "status": status, **labels}, + ) + combined_throttling = attempt.grpc_throttling_time_ns / NS_TO_MS + if not op.completed_attempts: + # add flow control latency to first attempt's throttling latency + combined_throttling += ( + op.flow_throttling_time_ns / NS_TO_MS + if op.flow_throttling_time_ns + else 0 + ) + self.otel.throttling_latencies.record(combined_throttling, labels) + self.otel.application_latencies.record( + (attempt.application_blocking_time_ns + attempt.backoff_before_attempt_ns) + / NS_TO_MS, + labels, + ) + if attempt.gfe_latency_ns is not None: + self.otel.server_latencies.record( + attempt.gfe_latency_ns / NS_TO_MS, + {"streaming": is_streaming, "status": status, **labels}, + ) + else: + # gfe headers not attached. Record a connectivity error. + # TODO: this should not be recorded as an error when direct path is enabled + self.otel.connectivity_error_count.add(1, {"status": status, **labels}) diff --git a/google/cloud/bigtable/data/_sync_autogen/client.py b/google/cloud/bigtable/data/_sync_autogen/client.py index b1c3cec47..02dc8fb1e 100644 --- a/google/cloud/bigtable/data/_sync_autogen/client.py +++ b/google/cloud/bigtable/data/_sync_autogen/client.py @@ -73,6 +73,12 @@ from google.cloud.bigtable.data.row_filters import CellsRowLimitFilter from google.cloud.bigtable.data.row_filters import RowFilterChain from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController +from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import ( + BigtableMetricsExporter, +) +from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import ( + GoogleCloudMetricsHandler, +) from google.cloud.bigtable.data._metrics import OperationType from google.cloud.bigtable.data._cross_sync import CrossSync from typing import Iterable @@ -174,6 +180,11 @@ def __init__( raise ValueError( f"The configured universe domain ({self.universe_domain}) does not match the universe domain found in the credentials ({self._credentials.universe_domain}). If you haven't configured the universe domain explicitly, `googleapis.com` is the default." ) + self._gcp_metrics_exporter = BigtableMetricsExporter( + project_id=self.project, + credentials=credentials, + client_options=client_options, + ) self._is_closed = CrossSync._Sync_Impl.Event() self.transport = cast(TransportType, self._gapic_client.transport) self._active_instances: Set[_WarmedInstanceKey] = set() @@ -758,7 +769,17 @@ def __init__( self.default_retryable_errors: Sequence[type[Exception]] = ( default_retryable_errors or () ) - self._metrics = BigtableClientSideMetricsController() + self._metrics = BigtableClientSideMetricsController( + handlers=[ + GoogleCloudMetricsHandler( + exporter=client._gcp_metrics_exporter, + instance_id=instance_id, + table_id=table_id, + app_profile_id=app_profile_id, + client_version=client._client_version(), + ) + ] + ) try: self._register_instance_future = CrossSync._Sync_Impl.create_task( self.client._register_instance, diff --git a/noxfile.py b/noxfile.py index 548bfd0ec..51e412b93 100644 --- a/noxfile.py +++ b/noxfile.py @@ -66,6 +66,7 @@ ] SYSTEM_TEST_EXTERNAL_DEPENDENCIES: List[str] = [ "pytest-asyncio==0.21.2", + "pytest-order==1.3.0", BLACK_VERSION, "pyyaml==6.0.2", ] diff --git a/setup.py b/setup.py index 3cb9d465d..91c038b83 100644 --- a/setup.py +++ b/setup.py @@ -39,12 +39,16 @@ dependencies = [ "google-api-core[grpc] >= 2.17.0, <3.0.0", "google-cloud-core >= 1.4.4, <3.0.0", + "google-cloud-monitoring >= 2.0.0, <3.0.0dev", "google-auth >= 2.23.0, <3.0.0,!=2.24.0,!=2.25.0", "grpc-google-iam-v1 >= 0.12.4, <1.0.0", "proto-plus >= 1.22.3, <2.0.0", "proto-plus >= 1.25.0, <2.0.0; python_version>='3.13'", "protobuf>=3.20.2,<7.0.0,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5", "google-crc32c>=1.5.0, <2.0.0dev", + "googleapis-common-protos[grpc] >= 1.57.0, <2.0.0dev", + "opentelemetry-api >= 1.0.0, <2.0.0dev", + "opentelemetry-sdk >= 1.0.0, <2.0.0dev", ] extras = {"libcst": "libcst >= 0.2.5"} diff --git a/tests/system/data/__init__.py b/tests/system/data/__init__.py index 6f836fb96..4d5fce5d9 100644 --- a/tests/system/data/__init__.py +++ b/tests/system/data/__init__.py @@ -16,6 +16,7 @@ import pytest import os import uuid +import datetime TEST_FAMILY = "test-family" TEST_FAMILY_2 = "test-family-2" @@ -34,9 +35,12 @@ class SystemTestRunner: """ @pytest.fixture(scope="session") - def init_table_id(self): + def init_table_id(self, start_timestamp): """ The table_id to use when creating a new test table + + Args + start_timestamp: accessed when building table to ensure timestamp data is loaded before tests are run """ return f"test-table-{uuid.uuid4().hex}" @@ -74,6 +78,13 @@ def column_family_config(self): ), } + @pytest.fixture(scope="session") + def start_timestamp(self): + """ + A timestamp taken before any tests are run. Used to fetch back metrics relevant to the tests + """ + return datetime.datetime.now(datetime.timezone.utc) + @pytest.fixture(scope="session") def admin_client(self): """ diff --git a/tests/system/data/test_metrics_async.py b/tests/system/data/test_metrics_async.py index ee339965d..0d84777a5 100644 --- a/tests/system/data/test_metrics_async.py +++ b/tests/system/data/test_metrics_async.py @@ -1,4 +1,4 @@ -# Copyright 2024 Google LLC +# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -135,14 +135,6 @@ def __getattr__(self, name): @CrossSync.convert_class(sync_name="TestMetrics") class TestMetricsAsync(SystemTestRunner): - @CrossSync.drop - @pytest.fixture(scope="session") - def event_loop(self): - loop = asyncio.get_event_loop() - yield loop - loop.stop() - loop.close() - def _make_client(self): project = os.getenv("GOOGLE_CLOUD_PROJECT") or None return CrossSync.DataClient(project=project) @@ -218,7 +210,8 @@ async def temp_rows(self, table): @CrossSync.pytest_fixture(scope="session") async def table(self, client, table_id, instance_id, handler): async with client.get_table(instance_id, table_id) as table: - table._metrics.add_handler(handler) + # override handlers with custom test object + table._metrics.handlers = [handler] yield table @CrossSync.convert diff --git a/tests/system/data/test_metrics_autogen.py b/tests/system/data/test_metrics_autogen.py index 54a9f2256..5228c0cda 100644 --- a/tests/system/data/test_metrics_autogen.py +++ b/tests/system/data/test_metrics_autogen.py @@ -1,4 +1,4 @@ -# Copyright 2024 Google LLC +# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -171,7 +171,7 @@ def temp_rows(self, table): @pytest.fixture(scope="session") def table(self, client, table_id, instance_id, handler): with client.get_table(instance_id, table_id) as table: - table._metrics.add_handler(handler) + table._metrics.handlers = [handler] yield table @pytest.fixture(scope="session") diff --git a/tests/system/data/test_system_async.py b/tests/system/data/test_system_async.py index beea316bb..100de9e3d 100644 --- a/tests/system/data/test_system_async.py +++ b/tests/system/data/test_system_async.py @@ -18,10 +18,11 @@ import uuid import os from google.api_core import retry -from google.api_core.exceptions import ClientError, PermissionDenied +from google.api_core.exceptions import ClientError, PermissionDenied, ServerError from google.cloud.bigtable.data.execute_query.metadata import SqlType from google.cloud.bigtable.data.read_modify_write_rules import _MAX_INCREMENT_VALUE +from google.cloud.bigtable.data._metrics import OperationType from google.cloud.environment_vars import BIGTABLE_EMULATOR from google.type import date_pb2 @@ -29,6 +30,7 @@ from . import TEST_FAMILY, TEST_FAMILY_2, TEST_AGGREGATE_FAMILY, SystemTestRunner + if CrossSync.is_async: from google.cloud.bigtable_v2.services.bigtable.transports.grpc_asyncio import ( _LoggingClientAIOInterceptor as GapicInterceptor, @@ -154,14 +156,6 @@ async def create_row_and_mutation( @CrossSync.convert_class(sync_name="TestSystem") class TestSystemAsync(SystemTestRunner): - @CrossSync.drop - @pytest.fixture(scope="session") - def event_loop(self): - loop = asyncio.get_event_loop() - yield loop - loop.stop() - loop.close() - def _make_client(self): project = os.getenv("GOOGLE_CLOUD_PROJECT") or None return CrossSync.DataClient(project=project) @@ -1324,3 +1318,46 @@ async def test_execute_metadata_on_empty_response( assert md[TEST_AGGREGATE_FAMILY].column_type == SqlType.Map( SqlType.Bytes(), SqlType.Int64() ) + + @pytest.fixture(scope="session") + def metrics_client(self, client): + yield client._gcp_metrics_exporter.client + + @pytest.mark.order("last") + @pytest.mark.parametrize( + "metric,methods", + [ + ("attempt_latencies", [m.value for m in OperationType]), + ("operation_latencies", [m.value for m in OperationType]), + ("retry_count", [m.value for m in OperationType]), + ("first_response_latencies", [OperationType.READ_ROWS]), + ("server_latencies", [m.value for m in OperationType]), + ("connectivity_error_count", [m.value for m in OperationType]), + ("application_blocking_latencies", [OperationType.READ_ROWS]), + ], + ) + @retry.Retry(predicate=retry.if_exception_type(AssertionError, ServerError)) + def test_metric_existence( + self, client, table_id, metrics_client, start_timestamp, metric, methods + ): + """ + Checks to make sure metrics were exported by tests + + Runs at the end of test suite, to let other tests write metrics + """ + end_timestamp = datetime.datetime.now(datetime.timezone.utc) + for m in methods: + metric_filter = ( + f'metric.type = "bigtable.googleapis.com/client/{metric}" ' + + f'AND metric.labels.client_name = "python-bigtable/{client._client_version()}" ' + + f'AND resource.labels.table = "{table_id}" ' + ) + results = list( + metrics_client.list_time_series( + name=f"projects/{client.project}", + filter=metric_filter, + interval={"start_time": start_timestamp, "end_time": end_timestamp}, + view=0, + ) + ) + assert len(results) > 0, f"No data found for {metric} {m}" diff --git a/tests/system/data/test_system_autogen.py b/tests/system/data/test_system_autogen.py index 66ca27a66..127e401be 100644 --- a/tests/system/data/test_system_autogen.py +++ b/tests/system/data/test_system_autogen.py @@ -20,9 +20,10 @@ import uuid import os from google.api_core import retry -from google.api_core.exceptions import ClientError, PermissionDenied +from google.api_core.exceptions import ClientError, PermissionDenied, ServerError from google.cloud.bigtable.data.execute_query.metadata import SqlType from google.cloud.bigtable.data.read_modify_write_rules import _MAX_INCREMENT_VALUE +from google.cloud.bigtable.data._metrics import OperationType from google.cloud.environment_vars import BIGTABLE_EMULATOR from google.type import date_pb2 from google.cloud.bigtable.data._cross_sync import CrossSync @@ -1073,3 +1074,44 @@ def test_execute_metadata_on_empty_response( assert md[TEST_AGGREGATE_FAMILY].column_type == SqlType.Map( SqlType.Bytes(), SqlType.Int64() ) + + @pytest.fixture(scope="session") + def metrics_client(self, client): + yield client._gcp_metrics_exporter.client + + @pytest.mark.order("last") + @pytest.mark.parametrize( + "metric,methods", + [ + ("attempt_latencies", [m.value for m in OperationType]), + ("operation_latencies", [m.value for m in OperationType]), + ("retry_count", [m.value for m in OperationType]), + ("first_response_latencies", [OperationType.READ_ROWS]), + ("server_latencies", [m.value for m in OperationType]), + ("connectivity_error_count", [m.value for m in OperationType]), + ("application_blocking_latencies", [OperationType.READ_ROWS]), + ], + ) + @retry.Retry(predicate=retry.if_exception_type(AssertionError, ServerError)) + def test_metric_existence( + self, client, table_id, metrics_client, start_timestamp, metric, methods + ): + """Checks to make sure metrics were exported by tests + + Runs at the end of test suite, to let other tests write metrics""" + end_timestamp = datetime.datetime.now(datetime.timezone.utc) + for m in methods: + metric_filter = ( + f'metric.type = "bigtable.googleapis.com/client/{metric}" ' + + f'AND metric.labels.client_name = "python-bigtable/{client._client_version()}" ' + + f'AND resource.labels.table = "{table_id}" ' + ) + results = list( + metrics_client.list_time_series( + name=f"projects/{client.project}", + filter=metric_filter, + interval={"start_time": start_timestamp, "end_time": end_timestamp}, + view=0, + ) + ) + assert len(results) > 0, f"No data found for {metric} {m}" diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index b24f1f47e..90a674e61 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -110,6 +110,10 @@ def _make_client(cls, *args, use_emulator=True, **kwargs): @CrossSync.pytest async def test_ctor(self): + from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import ( + BigtableMetricsExporter, + ) + expected_project = "project-id" expected_credentials = AnonymousCredentials() client = self._make_client( @@ -123,6 +127,8 @@ async def test_ctor(self): assert client._channel_refresh_task is not None assert client.transport._credentials == expected_credentials assert isinstance(client._metrics_interceptor, CrossSync.MetricsInterceptor) + assert client._gcp_metrics_exporter is not None + assert isinstance(client._gcp_metrics_exporter, BigtableMetricsExporter) await client.close() @CrossSync.pytest @@ -189,6 +195,33 @@ async def test_ctor_dict_options(self): start_background_refresh.assert_called_once() await client.close() + @CrossSync.pytest + async def test_metrics_exporter_init_shares_arguments(self): + expected_credentials = AnonymousCredentials() + expected_project = "custom_project" + expected_options = client_options.ClientOptions() + expected_options.credentials_file = None + expected_options.quota_project_id = None + with mock.patch( + "google.cloud.bigtable.data._metrics.handlers.gcp_exporter.BigtableMetricsExporter.__init__", + return_value=None, + ) as exporter_mock: + async with self._make_client( + project=expected_project, + credentials=expected_credentials, + client_options=expected_options, + ): + exporter_mock.assert_called_once_with( + project_id=expected_project, + credentials=expected_credentials, + client_options=expected_options, + ) + + @CrossSync.pytest + async def test_metrics_exporter_init_implicit_project(self): + async with self._make_client() as client: + assert client._gcp_metrics_exporter.project_id == client.project + @CrossSync.pytest async def test_veneer_grpc_headers(self): client_component = "data-async" if CrossSync.is_async else "data" @@ -1163,6 +1196,7 @@ async def test_ctor(self): from google.cloud.bigtable.data._metrics import ( BigtableClientSideMetricsController, ) + from google.cloud.bigtable.data._metrics import GoogleCloudMetricsHandler expected_table_id = "table-id" expected_instance_id = "instance-id" @@ -1205,6 +1239,8 @@ async def test_ctor(self): assert instance_key in client._active_instances assert client._instance_owners[instance_key] == {id(table)} assert isinstance(table._metrics, BigtableClientSideMetricsController) + assert len(table._metrics.handlers) == 1 + assert isinstance(table._metrics.handlers[0], GoogleCloudMetricsHandler) assert table.default_operation_timeout == expected_operation_timeout assert table.default_attempt_timeout == expected_attempt_timeout assert ( @@ -1510,6 +1546,7 @@ async def test_ctor(self): from google.cloud.bigtable.data._helpers import _WarmedInstanceKey from google.cloud.bigtable.data._metrics import ( BigtableClientSideMetricsController, + GoogleCloudMetricsHandler, ) expected_table_id = "table-id" @@ -1560,6 +1597,8 @@ async def test_ctor(self): assert instance_key in client._active_instances assert client._instance_owners[instance_key] == {id(view)} assert isinstance(view._metrics, BigtableClientSideMetricsController) + assert len(view._metrics.handlers) == 1 + assert isinstance(view._metrics.handlers[0], GoogleCloudMetricsHandler) assert view.default_operation_timeout == expected_operation_timeout assert view.default_attempt_timeout == expected_attempt_timeout assert ( diff --git a/tests/unit/data/_metrics/test_gcp_exporter_handler.py b/tests/unit/data/_metrics/test_gcp_exporter_handler.py new file mode 100644 index 000000000..bcee9cf03 --- /dev/null +++ b/tests/unit/data/_metrics/test_gcp_exporter_handler.py @@ -0,0 +1,436 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest +import mock + +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import ( + NumberDataPoint, + HistogramDataPoint, + MetricExportResult, + MetricsData, + ResourceMetrics, + ScopeMetrics, + Metric, + Sum, + AggregationTemporality, +) +from google.cloud.monitoring_v3 import ( + Point, + TimeSeries, +) +from google.api.distribution_pb2 import Distribution + + +from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import ( + BigtableMetricsExporter, +) +from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import ( + GoogleCloudMetricsHandler, +) +from google.cloud.bigtable.data._metrics.handlers.opentelemetry import ( + _OpenTelemetryInstruments, +) + + +class TestGoogleCloudMetricsHandler: + def _make_one(self, *args, **kwargs): + return GoogleCloudMetricsHandler(*args, **kwargs) + + def test_ctor_defaults(self): + from google.cloud.bigtable import __version__ as CLIENT_VERSION + + expected_instance = "my_instance" + expected_table = "my_table" + expected_exporter = BigtableMetricsExporter("project") + with mock.patch.object( + GoogleCloudMetricsHandler, "_generate_client_uid" + ) as uid_mock: + handler = self._make_one( + expected_exporter, + instance_id=expected_instance, + table_id=expected_table, + ) + assert isinstance(handler.meter_provider, MeterProvider) + assert isinstance(handler.otel, _OpenTelemetryInstruments) + assert handler.shared_labels["resource_instance"] == expected_instance + assert handler.shared_labels["resource_table"] == expected_table + assert handler.shared_labels["app_profile"] == "default" + assert ( + handler.shared_labels["client_name"] == f"python-bigtable/{CLIENT_VERSION}" + ) + assert handler.shared_labels["client_uid"] == uid_mock() + + def test_ctor_explicit(self): + expected_instance = "my_instance" + expected_table = "my_table" + expected_version = "my_version" + expected_uid = "my_uid" + expected_app_profile = "my_profile" + expected_exporter = BigtableMetricsExporter("project") + handler = self._make_one( + expected_exporter, + instance_id=expected_instance, + table_id=expected_table, + app_profile_id=expected_app_profile, + client_uid=expected_uid, + client_version=expected_version, + ) + assert handler.shared_labels["resource_instance"] == expected_instance + assert handler.shared_labels["resource_table"] == expected_table + assert handler.shared_labels["app_profile"] == expected_app_profile + assert ( + handler.shared_labels["client_name"] + == f"python-bigtable/{expected_version}" + ) + assert handler.shared_labels["client_uid"] == expected_uid + + @mock.patch( + "google.cloud.bigtable.data._metrics.handlers.gcp_exporter.PeriodicExportingMetricReader" + ) + @mock.patch( + "google.cloud.bigtable.data._metrics.handlers.gcp_exporter.MeterProvider" + ) + @mock.patch( + "google.cloud.bigtable.data._metrics.handlers.gcp_exporter._OpenTelemetryInstruments" + ) + @mock.patch( + "google.cloud.bigtable.data._metrics.handlers.gcp_exporter.OpenTelemetryMetricsHandler.__init__" + ) + def test_ctor_with_mocks( + self, mock_super_init, mock_otel_instruments, mock_meter_provider, mock_reader + ): + from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import ( + VIEW_LIST, + ) + + exporter = mock.Mock() + export_interval = 90 + kwargs = {"instance_id": "test_instance", "table_id": "test_table"} + handler = self._make_one(exporter, export_interval=export_interval, **kwargs) + # check PeriodicExportingMetricReader + mock_reader.assert_called_once_with( + exporter, export_interval_millis=export_interval * 1000 + ) + # check MeterProvider + mock_meter_provider.assert_called_once_with( + metric_readers=[mock_reader.return_value], views=VIEW_LIST + ) + # check _OpenTelemetryInstruments + mock_otel_instruments.assert_called_once_with( + meter_provider=mock_meter_provider.return_value + ) + # check super().__init__ call + mock_super_init.assert_called_once_with( + instruments=mock_otel_instruments.return_value, **kwargs + ) + assert handler.meter_provider == mock_meter_provider.return_value + + def test_close(self): + mock_instance = mock.Mock() + assert mock_instance.meter_provider.shutdown.call_count == 0 + GoogleCloudMetricsHandler.close(mock_instance) + assert mock_instance.meter_provider.shutdown.call_count == 1 + + +class TestBigtableMetricsExporter: + def _make_one(self, *args, **kwargs): + return BigtableMetricsExporter(*args, **kwargs) + + def test_ctor_defaults(self): + from google.cloud.monitoring_v3 import MetricServiceClient + + expected_project = "custom" + instance = self._make_one(expected_project) + assert instance.project_id == expected_project + assert instance.prefix == "bigtable.googleapis.com/internal/client" + assert isinstance(instance.client, MetricServiceClient) + + def test_ctor_mocks(self): + expected_project = "custom" + with mock.patch( + "google.cloud.monitoring_v3.MetricServiceClient.__init__", + return_value=None, + ) as mock_client: + args = [mock.Mock(), object()] + kwargs = {"a": "b"} + instance = self._make_one(expected_project, *args, **kwargs) + assert instance.project_id == expected_project + assert instance.prefix == "bigtable.googleapis.com/internal/client" + mock_client.assert_called_once_with(*args, **kwargs) + + @pytest.mark.parametrize( + "value,expected_field", + [ + (123, "int64_value"), + (123.456, "double_value"), + ], + ) + def test__to_point_w_number(self, value, expected_field): + """Test that NumberDataPoint is converted to a Point correctly.""" + instance = self._make_one("project") + expected_start_time_nanos = 100 + expected_end_time_nanos = 200 + dp = NumberDataPoint( + attributes={}, + start_time_unix_nano=expected_start_time_nanos, + time_unix_nano=expected_end_time_nanos, + value=value, + ) + point = instance._to_point(dp) + assert isinstance(point, Point) + assert getattr(point.value, expected_field) == value + assert ( + point.interval.start_time.second * 10**9 + ) + point.interval.start_time.nanosecond == expected_start_time_nanos + assert ( + point.interval.end_time.second * 10**9 + ) + point.interval.end_time.nanosecond == expected_end_time_nanos + + def test__to_point_w_histogram(self): + """Test that HistogramDataPoint is converted to a Point correctly.""" + instance = self._make_one("project") + expected_start_time_nanos = 100 + expected_end_time_nanos = 200 + expected_count = 10 + expected_sum = 100.0 + expected_bucket_counts = [1, 2, 7] + expected_explicit_bounds = [10, 20] + dp = HistogramDataPoint( + attributes={}, + start_time_unix_nano=expected_start_time_nanos, + time_unix_nano=expected_end_time_nanos, + count=expected_count, + sum=expected_sum, + bucket_counts=expected_bucket_counts, + explicit_bounds=expected_explicit_bounds, + min=0, + max=50, + ) + point = instance._to_point(dp) + assert isinstance(point, Point) + dist = point.value.distribution_value + assert isinstance(dist, Distribution) + assert dist.count == expected_count + assert dist.mean == expected_sum / expected_count + assert list(dist.bucket_counts) == expected_bucket_counts + assert ( + list(dist.bucket_options.explicit_buckets.bounds) + == expected_explicit_bounds + ) + assert ( + point.interval.start_time.second * 10**9 + ) + point.interval.start_time.nanosecond == expected_start_time_nanos + assert ( + point.interval.end_time.second * 10**9 + ) + point.interval.end_time.nanosecond == expected_end_time_nanos + + def test__to_point_w_histogram_zero_count(self): + """Test that HistogramDataPoint with zero count is converted to a Point correctly.""" + instance = self._make_one("project") + dp = HistogramDataPoint( + attributes={}, + start_time_unix_nano=100, + time_unix_nano=200, + count=0, + sum=0, + bucket_counts=[], + explicit_bounds=[], + min=0, + max=0, + ) + point = instance._to_point(dp) + assert isinstance(point, Point) + dist = point.value.distribution_value + assert isinstance(dist, Distribution) + assert dist.count == 0 + assert dist.mean == 0.0 + + @pytest.mark.parametrize( + "num_series, batch_size, expected_calls, expected_batch_sizes", + [ + (10, 200, 1, [10]), + (200, 200, 1, [200]), + (500, 200, 3, [200, 200, 100]), + (0, 200, 0, []), + ], + ) + def test__batch_write( + self, num_series, batch_size, expected_calls, expected_batch_sizes + ): + """Test that _batch_write splits series into batches correctly.""" + instance = self._make_one("project") + instance.client = mock.Mock() + series = [TimeSeries() for _ in range(num_series)] + instance._batch_write(series, max_batch_size=batch_size) + assert instance.client.create_service_time_series.call_count == expected_calls + for i, call in enumerate( + instance.client.create_service_time_series.call_args_list + ): + call_args, _ = call + assert len(call_args[0].time_series) == expected_batch_sizes[i] + + def test__batch_write_with_deadline(self): + """Test that _batch_write passes deadlines to gapic correctly.""" + import time + from google.api_core import gapic_v1 + + instance = self._make_one("project") + instance.client = mock.Mock() + series = [TimeSeries() for _ in range(10)] + # test with deadline + deadline = time.time() + 10 + instance._batch_write(series, deadline=deadline) + ( + call_args, + call_kwargs, + ) = instance.client.create_service_time_series.call_args_list[0] + assert "timeout" in call_kwargs + assert 9 < call_kwargs["timeout"] < 10 + # test without deadline + instance.client.create_service_time_series.reset_mock() + instance._batch_write(series, deadline=None) + ( + call_args, + call_kwargs, + ) = instance.client.create_service_time_series.call_args_list[0] + assert "timeout" in call_kwargs + assert call_kwargs["timeout"] == gapic_v1.method.DEFAULT + + def test_export(self): + """Test that export correctly converts metrics and calls _batch_write.""" + project_id = "project" + instance = self._make_one(project_id) + instance._batch_write = mock.Mock() + # create mock metrics data + expected_value = 123 + attributes = { + "resource_instance": "instance1", + "resource_cluster": "cluster1", + "resource_table": "table1", + "resource_zone": "zone1", + "method": "ReadRows", + } + data_point = NumberDataPoint( + attributes=attributes, + start_time_unix_nano=100, + time_unix_nano=200, + value=expected_value, + ) + metric = Metric( + name="operation_latencies", + description="", + unit="ms", + data=Sum( + data_points=[data_point], + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=False, + ), + ) + scope_metric = ScopeMetrics( + scope=mock.Mock(), metrics=[metric], schema_url=None + ) + resource_metric = ResourceMetrics( + resource=mock.Mock(), scope_metrics=[scope_metric], schema_url=None + ) + metrics_data = MetricsData(resource_metrics=[resource_metric]) + result = instance.export(metrics_data) + assert result == MetricExportResult.SUCCESS + instance._batch_write.assert_called_once() + # check the TimeSeries passed to _batch_write + call_args, call_kwargs = instance._batch_write.call_args_list[0] + series_list = call_args[0] + assert len(series_list) == 1 + series = series_list[0] + assert series.metric.type == f"{instance.prefix}/operation_latencies" + assert series.metric.labels["method"] == "ReadRows" + assert "resource_instance" not in series.metric.labels + assert series.resource.type == "bigtable_client_raw" + assert series.resource.labels["project_id"] == project_id + assert series.resource.labels["instance"] == "instance1" + assert series.resource.labels["cluster"] == "cluster1" + assert series.resource.labels["table"] == "table1" + assert series.resource.labels["zone"] == "zone1" + assert len(series.points) == 1 + point = series.points[0] + assert point.value.int64_value == expected_value + + def test_export_no_attributes(self): + """Test that export skips data points with no attributes.""" + instance = self._make_one("project") + instance._batch_write = mock.Mock() + data_point = NumberDataPoint( + attributes={}, start_time_unix_nano=100, time_unix_nano=200, value=123 + ) + metric = Metric( + name="operation_latencies", + description="", + unit="ms", + data=Sum( + data_points=[data_point], + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=False, + ), + ) + scope_metric = ScopeMetrics( + scope=mock.Mock(), metrics=[metric], schema_url=None + ) + resource_metric = ResourceMetrics( + resource=mock.Mock(), scope_metrics=[scope_metric], schema_url=None + ) + metrics_data = MetricsData(resource_metrics=[resource_metric]) + result = instance.export(metrics_data) + assert result == MetricExportResult.SUCCESS + instance._batch_write.assert_called_once() + series_list = instance._batch_write.call_args[0][0] + assert len(series_list) == 0 + + def test_exception_in_export(self): + """ + make sure exceptions don't raise + """ + instance = self._make_one("project") + instance._batch_write = mock.Mock(side_effect=Exception("test")) + # create mock metrics data with one valid data point + attributes = { + "resource_instance": "instance1", + "resource_cluster": "cluster1", + "resource_table": "table1", + "resource_zone": "zone1", + } + data_point = NumberDataPoint( + attributes=attributes, + start_time_unix_nano=100, + time_unix_nano=200, + value=123, + ) + metric = Metric( + name="operation_latencies", + description="", + unit="ms", + data=Sum( + data_points=[data_point], + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=False, + ), + ) + scope_metric = ScopeMetrics( + scope=mock.Mock(), metrics=[metric], schema_url=None + ) + resource_metric = ResourceMetrics( + resource=mock.Mock(), scope_metrics=[scope_metric], schema_url=None + ) + metrics_data = MetricsData(resource_metrics=[resource_metric]) + result = instance.export(metrics_data) + assert result == MetricExportResult.FAILURE diff --git a/tests/unit/data/_metrics/test_opentelemetry_handler.py b/tests/unit/data/_metrics/test_opentelemetry_handler.py new file mode 100644 index 000000000..593947282 --- /dev/null +++ b/tests/unit/data/_metrics/test_opentelemetry_handler.py @@ -0,0 +1,401 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest +import mock + +from grpc import StatusCode + +from google.cloud.bigtable.data._metrics.data_model import ( + ActiveOperationMetric, + CompletedAttemptMetric, + CompletedOperationMetric, + OperationType, +) +from google.cloud.bigtable.data._metrics.handlers.opentelemetry import ( + _OpenTelemetryInstruments, + OpenTelemetryMetricsHandler, +) + + +class TestOpentelemetryInstruments: + EXPECTED_METRICS = [ + "operation_latencies", + "first_response_latencies", + "attempt_latencies", + "server_latencies", + "application_latencies", + "throttling_latencies", + "retry_count", + "connectivity_error_count", + ] + + def _make_one(self, meter_provider=None): + return _OpenTelemetryInstruments(meter_provider) + + def test_meter_name(self): + expected_name = "bigtable.googleapis.com" + mock_meter_provider = mock.Mock() + self._make_one(mock_meter_provider) + mock_meter_provider.get_meter.assert_called_once_with(expected_name) + + @pytest.mark.parametrize( + "metric_name", [m for m in EXPECTED_METRICS if "latencies" in m] + ) + def test_histogram_creation(self, metric_name): + mock_meter_provider = mock.Mock() + instruments = self._make_one(mock_meter_provider) + mock_meter = mock_meter_provider.get_meter() + assert any( + [ + call.kwargs["name"] == metric_name + for call in mock_meter.create_histogram.call_args_list + ] + ) + assert all( + [ + call.kwargs["unit"] == "ms" + for call in mock_meter.create_histogram.call_args_list + ] + ) + assert all( + [ + call.kwargs["description"] is not None + for call in mock_meter.create_histogram.call_args_list + ] + ) + assert getattr(instruments, metric_name) is not None + + @pytest.mark.parametrize( + "metric_name", [m for m in EXPECTED_METRICS if "count" in m] + ) + def test_counter_creation(self, metric_name): + mock_meter_provider = mock.Mock() + instruments = self._make_one(mock_meter_provider) + mock_meter = mock_meter_provider.get_meter() + assert any( + [ + call.kwargs["name"] == metric_name + for call in mock_meter.create_counter.call_args_list + ] + ) + assert all( + [ + call.kwargs["description"] is not None + for call in mock_meter.create_histogram.call_args_list + ] + ) + assert getattr(instruments, metric_name) is not None + + def test_global_provider(self): + instruments = self._make_one() + # wait to import otel until after creating instance + import opentelemetry + + for metric_name in self.EXPECTED_METRICS: + metric = getattr(instruments, metric_name) + assert metric is not None + if "latencies" in metric_name: + assert isinstance(metric, opentelemetry.metrics.Histogram) + else: + assert isinstance(metric, opentelemetry.metrics.Counter) + + +class TestOpentelemetryMetricsHandler: + def _make_one(self, **kwargs): + return OpenTelemetryMetricsHandler(**kwargs) + + def test_ctor_defaults(self): + from google.cloud.bigtable import __version__ as CLIENT_VERSION + + expected_instance = "my_instance" + expected_table = "my_table" + with mock.patch.object( + OpenTelemetryMetricsHandler, "_generate_client_uid" + ) as uid_mock: + handler = self._make_one( + instance_id=expected_instance, table_id=expected_table + ) + assert isinstance(handler.otel, _OpenTelemetryInstruments) + assert handler.shared_labels["resource_instance"] == expected_instance + assert handler.shared_labels["resource_table"] == expected_table + assert handler.shared_labels["app_profile"] == "default" + assert ( + handler.shared_labels["client_name"] == f"python-bigtable/{CLIENT_VERSION}" + ) + assert handler.shared_labels["client_uid"] == uid_mock() + + def test_ctor_explicit(self): + expected_instance = "my_instance" + expected_table = "my_table" + expected_version = "my_version" + expected_uid = "my_uid" + expected_app_profile = "my_profile" + expected_instruments = object() + handler = self._make_one( + instance_id=expected_instance, + table_id=expected_table, + app_profile_id=expected_app_profile, + client_uid=expected_uid, + client_version=expected_version, + instruments=expected_instruments, + ) + assert handler.otel == expected_instruments + assert handler.shared_labels["resource_instance"] == expected_instance + assert handler.shared_labels["resource_table"] == expected_table + assert handler.shared_labels["app_profile"] == expected_app_profile + assert ( + handler.shared_labels["client_name"] + == f"python-bigtable/{expected_version}" + ) + assert handler.shared_labels["client_uid"] == expected_uid + + @mock.patch("socket.gethostname", return_value="hostname") + @mock.patch("os.getpid", return_value="pid") + @mock.patch("uuid.uuid4", return_value="uid") + def test_generate_client_uid_mock(self, socket_mock, os_mock, uuid_mock): + uid = OpenTelemetryMetricsHandler._generate_client_uid() + assert uid == "python-uid-pid@hostname" + + @mock.patch("socket.gethostname", side_effect=[ValueError("fail")]) + @mock.patch("os.getpid", side_effect=[ValueError("fail")]) + @mock.patch("uuid.uuid4", return_value="uid") + def test_generate_client_uid_mock_with_exceptions( + self, socket_mock, os_mock, uuid_mock + ): + uid = OpenTelemetryMetricsHandler._generate_client_uid() + assert uid == "python-uid-@localhost" + + def test_generate_client_uid(self): + import re + + uid = OpenTelemetryMetricsHandler._generate_client_uid() + # The expected pattern is python--@ + expected_pattern = ( + r"python-[\da-f]{8}-[\da-f]{4}-[\da-f]{4}-[\da-f]{4}-[\da-f]{12}-\d+@.+" + ) + assert re.match(expected_pattern, uid) + + def test_on_operation_complete_operation_latencies(self): + mock_instruments = mock.Mock(operation_latencies=mock.Mock()) + handler = self._make_one( + instance_id="inst", table_id="table", instruments=mock_instruments + ) + op = CompletedOperationMetric( + op_type=OperationType.READ_ROWS, + uuid="test-uuid", + duration_ns=1234567, + completed_attempts=[], + final_status=StatusCode.OK, + cluster_id="cluster", + zone="zone", + is_streaming=True, + ) + handler.on_operation_complete(op) + expected_labels = { + "method": op.op_type.value, + "status": op.final_status.name, + "resource_zone": op.zone, + "resource_cluster": op.cluster_id, + **handler.shared_labels, + } + mock_instruments.operation_latencies.record.assert_called_once_with( + op.duration_ns / 1e6, + {"streaming": str(op.is_streaming), **expected_labels}, + ) + + @pytest.mark.parametrize( + "op_type,first_response_latency_ns,should_record", + [ + (OperationType.READ_ROWS, 12345, True), + (OperationType.READ_ROWS, None, False), + (OperationType.MUTATE_ROW, 12345, False), + ], + ) + def test_on_operation_complete_first_response_latencies( + self, op_type, first_response_latency_ns, should_record + ): + mock_instruments = mock.Mock(first_response_latencies=mock.Mock()) + handler = self._make_one( + instance_id="inst", table_id="table", instruments=mock_instruments + ) + op = CompletedOperationMetric( + op_type=op_type, + uuid="test-uuid", + duration_ns=1234567, + completed_attempts=[], + final_status=StatusCode.OK, + cluster_id="cluster", + zone="zone", + is_streaming=True, + first_response_latency_ns=first_response_latency_ns, + ) + handler.on_operation_complete(op) + if should_record: + expected_labels = { + "method": op.op_type.value, + "status": op.final_status.name, + "resource_zone": op.zone, + "resource_cluster": op.cluster_id, + **handler.shared_labels, + } + mock_instruments.first_response_latencies.record.assert_called_once_with( + first_response_latency_ns / 1e6, expected_labels + ) + else: + mock_instruments.first_response_latencies.record.assert_not_called() + + @pytest.mark.parametrize("attempts_count", [0, 1, 5]) + def test_on_operation_complete_retry_count(self, attempts_count): + mock_instruments = mock.Mock(retry_count=mock.Mock()) + handler = self._make_one( + instance_id="inst", table_id="table", instruments=mock_instruments + ) + attempts = [mock.Mock()] * attempts_count + op = CompletedOperationMetric( + op_type=OperationType.READ_ROWS, + uuid="test-uuid", + duration_ns=1234567, + completed_attempts=attempts, + final_status=StatusCode.OK, + cluster_id="cluster", + zone="zone", + is_streaming=True, + ) + handler.on_operation_complete(op) + if attempts: + expected_labels = { + "method": op.op_type.value, + "status": op.final_status.name, + "resource_zone": op.zone, + "resource_cluster": op.cluster_id, + **handler.shared_labels, + } + mock_instruments.retry_count.add.assert_called_once_with( + len(attempts) - 1, expected_labels + ) + else: + mock_instruments.retry_count.add.assert_not_called() + + def test_on_attempt_complete_attempt_latencies(self): + mock_instruments = mock.Mock(attempt_latencies=mock.Mock()) + handler = self._make_one( + instance_id="inst", table_id="table", instruments=mock_instruments + ) + attempt = CompletedAttemptMetric(duration_ns=1234567, end_status=StatusCode.OK) + op = ActiveOperationMetric( + op_type=OperationType.READ_ROWS, + zone="zone", + cluster_id="cluster", + is_streaming=True, + ) + handler.on_attempt_complete(attempt, op) + expected_labels = { + "method": op.op_type.value, + "resource_zone": op.zone, + "resource_cluster": op.cluster_id, + **handler.shared_labels, + } + mock_instruments.attempt_latencies.record.assert_called_once_with( + attempt.duration_ns / 1e6, + { + "streaming": str(op.is_streaming), + "status": attempt.end_status.name, + **expected_labels, + }, + ) + + @pytest.mark.parametrize( + "is_first_attempt,flow_throttling_ns", + [(True, 54321), (False, 0), (True, 0)], + ) + def test_on_attempt_complete_throttling_latencies( + self, is_first_attempt, flow_throttling_ns + ): + mock_instruments = mock.Mock(throttling_latencies=mock.Mock()) + handler = self._make_one( + instance_id="inst", table_id="table", instruments=mock_instruments + ) + attempt = CompletedAttemptMetric( + duration_ns=1234567, + end_status=StatusCode.OK, + grpc_throttling_time_ns=456789, + ) + op = ActiveOperationMetric( + op_type=OperationType.READ_ROWS, + flow_throttling_time_ns=flow_throttling_ns, + ) + if not is_first_attempt: + op.completed_attempts.append(mock.Mock()) + handler.on_attempt_complete(attempt, op) + expected_throttling = attempt.grpc_throttling_time_ns / 1e6 + if is_first_attempt: + expected_throttling += flow_throttling_ns / 1e6 + mock_instruments.throttling_latencies.record.assert_called_once_with( + pytest.approx(expected_throttling), mock.ANY + ) + + def test_on_attempt_complete_application_latencies(self): + mock_instruments = mock.Mock(application_latencies=mock.Mock()) + handler = self._make_one( + instance_id="inst", table_id="table", instruments=mock_instruments + ) + attempt = CompletedAttemptMetric( + duration_ns=1234567, + end_status=StatusCode.OK, + application_blocking_time_ns=234567, + backoff_before_attempt_ns=345678, + ) + op = ActiveOperationMetric(op_type=OperationType.READ_ROWS) + handler.on_attempt_complete(attempt, op) + mock_instruments.application_latencies.record.assert_called_once_with( + (attempt.application_blocking_time_ns + attempt.backoff_before_attempt_ns) + / 1e6, + mock.ANY, + ) + + @pytest.mark.parametrize( + "gfe_latency_ns,should_record_server_latency", + [(12345, True), (None, False), (0, True)], + ) + def test_on_attempt_complete_server_latencies_and_connectivity_error( + self, gfe_latency_ns, should_record_server_latency + ): + mock_instruments = mock.Mock( + server_latencies=mock.Mock(), connectivity_error_count=mock.Mock() + ) + handler = self._make_one( + instance_id="inst", table_id="table", instruments=mock_instruments + ) + attempt = CompletedAttemptMetric( + duration_ns=1234567, + end_status=StatusCode.OK, + gfe_latency_ns=gfe_latency_ns, + ) + op = ActiveOperationMetric( + op_type=OperationType.READ_ROWS, + zone="zone", + cluster_id="cluster", + is_streaming=True, + ) + handler.on_attempt_complete(attempt, op) + if should_record_server_latency: + mock_instruments.server_latencies.record.assert_called_once_with( + gfe_latency_ns / 1e6, mock.ANY + ) + mock_instruments.connectivity_error_count.add.assert_not_called() + else: + mock_instruments.server_latencies.record.assert_not_called() + mock_instruments.connectivity_error_count.add.assert_called_once_with( + 1, mock.ANY + ) diff --git a/tests/unit/data/_sync_autogen/test_client.py b/tests/unit/data/_sync_autogen/test_client.py index ab624130c..bb73a0e43 100644 --- a/tests/unit/data/_sync_autogen/test_client.py +++ b/tests/unit/data/_sync_autogen/test_client.py @@ -79,6 +79,10 @@ def _make_client(cls, *args, use_emulator=True, **kwargs): return cls._get_target_class()(*args, **kwargs) def test_ctor(self): + from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import ( + BigtableMetricsExporter, + ) + expected_project = "project-id" expected_credentials = AnonymousCredentials() client = self._make_client( @@ -92,6 +96,8 @@ def test_ctor(self): assert isinstance( client._metrics_interceptor, CrossSync._Sync_Impl.MetricsInterceptor ) + assert client._gcp_metrics_exporter is not None + assert isinstance(client._gcp_metrics_exporter, BigtableMetricsExporter) client.close() def test_ctor_super_inits(self): @@ -154,6 +160,31 @@ def test_ctor_dict_options(self): start_background_refresh.assert_called_once() client.close() + def test_metrics_exporter_init_shares_arguments(self): + expected_credentials = AnonymousCredentials() + expected_project = "custom_project" + expected_options = client_options.ClientOptions() + expected_options.credentials_file = None + expected_options.quota_project_id = None + with mock.patch( + "google.cloud.bigtable.data._metrics.handlers.gcp_exporter.BigtableMetricsExporter.__init__", + return_value=None, + ) as exporter_mock: + with self._make_client( + project=expected_project, + credentials=expected_credentials, + client_options=expected_options, + ): + exporter_mock.assert_called_once_with( + project_id=expected_project, + credentials=expected_credentials, + client_options=expected_options, + ) + + def test_metrics_exporter_init_implicit_project(self): + with self._make_client() as client: + assert client._gcp_metrics_exporter.project_id == client.project + def test_veneer_grpc_headers(self): client_component = "data-async" if CrossSync._Sync_Impl.is_async else "data" VENEER_HEADER_REGEX = re.compile( @@ -941,6 +972,7 @@ def test_ctor(self): from google.cloud.bigtable.data._metrics import ( BigtableClientSideMetricsController, ) + from google.cloud.bigtable.data._metrics import GoogleCloudMetricsHandler expected_table_id = "table-id" expected_instance_id = "instance-id" @@ -982,6 +1014,8 @@ def test_ctor(self): assert instance_key in client._active_instances assert client._instance_owners[instance_key] == {id(table)} assert isinstance(table._metrics, BigtableClientSideMetricsController) + assert len(table._metrics.handlers) == 1 + assert isinstance(table._metrics.handlers[0], GoogleCloudMetricsHandler) assert table.default_operation_timeout == expected_operation_timeout assert table.default_attempt_timeout == expected_attempt_timeout assert ( @@ -1214,6 +1248,7 @@ def test_ctor(self): from google.cloud.bigtable.data._helpers import _WarmedInstanceKey from google.cloud.bigtable.data._metrics import ( BigtableClientSideMetricsController, + GoogleCloudMetricsHandler, ) expected_table_id = "table-id" @@ -1263,6 +1298,8 @@ def test_ctor(self): assert instance_key in client._active_instances assert client._instance_owners[instance_key] == {id(view)} assert isinstance(view._metrics, BigtableClientSideMetricsController) + assert len(view._metrics.handlers) == 1 + assert isinstance(view._metrics.handlers[0], GoogleCloudMetricsHandler) assert view.default_operation_timeout == expected_operation_timeout assert view.default_attempt_timeout == expected_attempt_timeout assert (