Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
d2175f1
use replaceable channel wrapper
daniel-sanche Jul 25, 2025
5e107fc
got unit tests working
daniel-sanche Jul 26, 2025
c4a97e1
put back in cache invalidation
daniel-sanche Jul 26, 2025
e71b1d5
added wrapped multicallables to avoid cache invalidation
daniel-sanche Jul 26, 2025
b81a9be
added crosssync, moved close logic back to client
daniel-sanche Jul 29, 2025
a1dffb5
generated sync code
daniel-sanche Jul 29, 2025
e3ec02b
got tests running
daniel-sanche Jul 29, 2025
4e13783
fixed tests
daniel-sanche Jul 29, 2025
7d90a04
remove extra wrapper; added invalidate_stubs helper
daniel-sanche Jul 29, 2025
26cd601
fixed lint
daniel-sanche Jul 29, 2025
375332f
fixed lint
daniel-sanche Jul 29, 2025
428d75a
renamed replaceablechannel to swappablechannel
daniel-sanche Jul 29, 2025
4b39bc5
added tests
daniel-sanche Jul 29, 2025
3f090c2
added docstrings
daniel-sanche Jul 29, 2025
883ceab
Merge branch 'main' into refactor_refresh
daniel-sanche Jul 29, 2025
04c762a
initial commit
daniel-sanche Jul 29, 2025
29dff4d
added back interceptor
daniel-sanche Jul 29, 2025
e4f8238
added metrics to client
daniel-sanche Jul 29, 2025
fcb062e
fixed lint
daniel-sanche Aug 1, 2025
ac8dbe4
Merge branch 'refactor_refresh' into csm_1_data_model
daniel-sanche Aug 1, 2025
d155f8a
set up channel interceptions
daniel-sanche Aug 2, 2025
9fece96
added TrackedBackoffGenerator
daniel-sanche Aug 2, 2025
aec2577
fixed lint
daniel-sanche Aug 2, 2025
ec4e847
fixed import
daniel-sanche Aug 2, 2025
8f99e4e
added operation.cancel
daniel-sanche Aug 6, 2025
f8e6603
added operation cancelled to interceptor
daniel-sanche Aug 6, 2025
f5e057e
gave each operation a uuid
daniel-sanche Aug 6, 2025
8c397bb
return attempt metric on new attempt
daniel-sanche Aug 7, 2025
2c34198
use standard context manager
daniel-sanche Aug 7, 2025
9bd1e07
use default backoff generator
daniel-sanche Aug 7, 2025
96d1355
require backoff; refactor check
daniel-sanche Aug 7, 2025
de5d07b
fixed context manager naming; lint
daniel-sanche Aug 7, 2025
d73f379
moved first_response_latency to operation
daniel-sanche Aug 7, 2025
a2070f9
Merge branch 'main' into refactor_refresh
daniel-sanche Aug 8, 2025
4a4f80a
fixed import
daniel-sanche Aug 8, 2025
5ea9f0e
Merge branch 'main' into csm_1_data_model
daniel-sanche Aug 11, 2025
708a35a
fixed broken unit tests
daniel-sanche Aug 11, 2025
67c08fd
Merge branch 'refactor_refresh' into csm_1_data_model
daniel-sanche Aug 11, 2025
5ae7acc
added set_next to TrackedBackoffGenerator
daniel-sanche Aug 11, 2025
cb32296
added assertions to test_client
daniel-sanche Aug 12, 2025
f07e765
added new test metrics interceptor file
daniel-sanche Aug 26, 2025
a34c01e
first round of tests
daniel-sanche Aug 26, 2025
84f61ee
added metadata capture for failed rpcs
daniel-sanche Aug 26, 2025
d4ae637
added test for starting attempts
daniel-sanche Aug 26, 2025
1fbcadd
added sync tests
daniel-sanche Aug 26, 2025
edacd04
got tests passing
daniel-sanche Aug 26, 2025
c628d21
removed helper class
daniel-sanche Aug 26, 2025
6d585ec
refactoring
daniel-sanche Aug 26, 2025
01e6b36
refactored interceptor
daniel-sanche Aug 26, 2025
05fe577
added unit tests for interceptor
daniel-sanche Aug 26, 2025
4871abd
Merge branch 'main' into csm_1_data_model
daniel-sanche Aug 26, 2025
b6eac6c
fixed lint
daniel-sanche Aug 26, 2025
07b3295
feat: added metrics interceptor
daniel-sanche Sep 9, 2025
e3ac131
pulled out operation logic
daniel-sanche Sep 9, 2025
557b54a
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 9, 2025
3306ad7
fixed missing convert
daniel-sanche Sep 9, 2025
65f15de
updated system test
daniel-sanche Sep 9, 2025
16c5b6a
fixed lint
daniel-sanche Sep 9, 2025
00cc52f
fixed annotation
daniel-sanche Sep 9, 2025
019a8c2
Merge branch 'csm_interceptor' into csm_1_data_model
daniel-sanche Sep 9, 2025
4ccfdab
removed duplicate import
daniel-sanche Sep 9, 2025
bd9ab70
added more tests
daniel-sanche Sep 9, 2025
50b3e48
remove operation metadata key
daniel-sanche Sep 9, 2025
2b35127
assign metadata directly
daniel-sanche Sep 9, 2025
9cbda99
added test
daniel-sanche Sep 9, 2025
73f4b3c
replace details mocks with real type
daniel-sanche Sep 9, 2025
d5e012d
strip operation id from metadata before request
daniel-sanche Sep 9, 2025
486068b
added try; generated sync
daniel-sanche Sep 9, 2025
bb00b8b
re-generated sync classes; removed test
daniel-sanche Sep 15, 2025
c89f6d4
loosen test
daniel-sanche Sep 15, 2025
bebeb70
use contextvars
daniel-sanche Sep 30, 2025
5ba2bbe
pulled in improvements to data model
daniel-sanche Sep 30, 2025
4098fd9
removed cancel from spec
daniel-sanche Sep 30, 2025
144d75e
broke out streaming wrapper into static function
daniel-sanche Sep 30, 2025
e8785ac
Merge branch 'csm_interceptor' into csm_1_data_model
daniel-sanche Sep 30, 2025
c1cc24d
fixed tests
daniel-sanche Sep 30, 2025
c433f3c
fixed lint
daniel-sanche Sep 30, 2025
85d4cf0
Merge branch 'csm_interceptor' into csm_1_data_model
daniel-sanche Sep 30, 2025
ed9d3cf
fixed lint
daniel-sanche Oct 1, 2025
bc6036e
added close to metric spec
daniel-sanche Oct 2, 2025
18ec330
fixed lint
daniel-sanche Oct 2, 2025
f1be54a
added test
daniel-sanche Oct 3, 2025
9a33d86
Merge branch 'main' into csm_1_data_model
daniel-sanche Nov 22, 2025
e94e143
renamed methods
daniel-sanche Nov 22, 2025
596e54e
improved from_context
daniel-sanche Nov 22, 2025
16c8ed8
changed buffer time
daniel-sanche Nov 26, 2025
4887830
ran blacken
daniel-sanche Nov 26, 2025
5c80c79
use time mocks
daniel-sanche Nov 26, 2025
8de4875
fixed lint
daniel-sanche Nov 26, 2025
6a4d742
loosened test tolerances
daniel-sanche Nov 26, 2025
a474560
removed metrics superclass from interceptor
daniel-sanche Nov 26, 2025
5390813
fixed lint
daniel-sanche Nov 26, 2025
3e0d134
improved comments
daniel-sanche Nov 26, 2025
6b48242
moved interceptor into _metrics
daniel-sanche Nov 26, 2025
dcf3d0a
pulled out tracking into new file
daniel-sanche Nov 26, 2025
c94e4ff
simplified wrapper method
daniel-sanche Nov 26, 2025
3a87a35
Revert "moved interceptor into _metrics"
daniel-sanche Nov 26, 2025
b5c361b
moved tracked retry out of autogen folder
daniel-sanche Nov 26, 2025
1b0b857
fixed typing
daniel-sanche Nov 26, 2025
ac315d0
added tests
daniel-sanche Nov 27, 2025
87e78d1
removed unneeded imports
daniel-sanche Nov 27, 2025
54b7208
ran blacken
daniel-sanche Nov 27, 2025
819e1ae
Moved retry trackers into own file
daniel-sanche Nov 27, 2025
22eb2e1
added docstring
daniel-sanche Nov 27, 2025
0ec8d14
fixed type
daniel-sanche Nov 27, 2025
fa25c2b
import annotations
daniel-sanche Dec 3, 2025
f9ac548
Update google/cloud/bigtable/data/_metrics/data_model.py
daniel-sanche Dec 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
from google.cloud.bigtable.data.row_filters import StripValueTransformerFilter
from google.cloud.bigtable.data.row_filters import CellsRowLimitFilter
from google.cloud.bigtable.data.row_filters import RowFilterChain
from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController

from google.cloud.bigtable.data._cross_sync import CrossSync

Expand Down Expand Up @@ -1038,6 +1039,8 @@ def __init__(
default_retryable_errors or ()
)

self._metrics = BigtableClientSideMetricsController()

try:
self._register_instance_future = CrossSync.create_task(
self.client._register_instance,
Expand Down Expand Up @@ -1752,6 +1755,7 @@ async def close(self):
"""
Called to close the Table instance and release any resources held by it.
"""
self._metrics.close()
if self._register_instance_future:
self._register_instance_future.cancel()
self.client._remove_instance_registration(
Expand Down
104 changes: 99 additions & 5 deletions google/cloud/bigtable/data/_async/metrics_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,21 @@
# limitations under the License.
from __future__ import annotations

from typing import Sequence

import time
from functools import wraps

from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric
from google.cloud.bigtable.data._metrics.data_model import OperationState
from google.cloud.bigtable.data._metrics.data_model import OperationType

from google.cloud.bigtable.data._cross_sync import CrossSync

if CrossSync.is_async:
from grpc.aio import UnaryUnaryClientInterceptor
from grpc.aio import UnaryStreamClientInterceptor
from grpc.aio import AioRpcError
else:
from grpc import UnaryUnaryClientInterceptor
from grpc import UnaryStreamClientInterceptor
Expand All @@ -26,6 +36,57 @@
__CROSS_SYNC_OUTPUT__ = "google.cloud.bigtable.data._sync_autogen.metrics_interceptor"


def _with_active_operation(func):
"""
Decorator for interceptor methods to extract the active operation associated with the
in-scope contextvars, and pass it to the decorated function.
"""

@wraps(func)
def wrapper(self, continuation, client_call_details, request):
operation: ActiveOperationMetric | None = ActiveOperationMetric.from_context()

if operation:
# start a new attempt if not started
if (
operation.state == OperationState.CREATED
or operation.state == OperationState.BETWEEN_ATTEMPTS
):
operation.start_attempt()
# wrap continuation in logic to process the operation
return func(self, operation, continuation, client_call_details, request)
else:
# if operation not found, return unwrapped continuation
return continuation(client_call_details, request)

return wrapper


@CrossSync.convert
async def _get_metadata(source) -> dict[str, str | bytes] | None:
"""Helper to extract metadata from a call or RpcError"""
try:
metadata: Sequence[tuple[str, str | bytes]]
if CrossSync.is_async:
# grpc.aio returns metadata in Metadata objects
if isinstance(source, AioRpcError):
metadata = list(source.trailing_metadata()) + list(
source.initial_metadata()
)
else:
metadata = list(await source.trailing_metadata()) + list(
await source.initial_metadata()
)
else:
# sync grpc returns metadata as a sequence of tuples
metadata = source.trailing_metadata() + source.initial_metadata()
# convert metadata to dict format
return {k: v for (k, v) in metadata}
except Exception:
# ignore errors while fetching metadata
return None


@CrossSync.convert_class(sync_name="BigtableMetricsInterceptor")
class AsyncBigtableMetricsInterceptor(
UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor
Expand All @@ -35,21 +96,33 @@ class AsyncBigtableMetricsInterceptor(
"""

@CrossSync.convert
async def intercept_unary_unary(self, continuation, client_call_details, request):
@_with_active_operation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this called? Can you point me to the code location? It feels a bit weird that starting an attempt is called from an interceptor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of this is here. The wrapper is executed before each intercept_unary_unary call.

The main purpose of this is to add the operation argument to this method, since that's not part of the regular intercept_unary_unary signature.

But yeah, there is a line to start an attempt if the associated rpc wasn't started previously. I can't remember if that was actually needed, or if I added that as a safeguard. I'll look into it

async def intercept_unary_unary(
self, operation, continuation, client_call_details, request
):
"""
Interceptor for unary rpcs:
- MutateRow
- CheckAndMutateRow
- ReadModifyWriteRow
"""
metadata = None
try:
call = await continuation(client_call_details, request)
metadata = await _get_metadata(call)
return call
except Exception as rpc_error:
metadata = await _get_metadata(rpc_error)
raise rpc_error
finally:
if metadata is not None:
operation.add_response_metadata(metadata)

@CrossSync.convert
async def intercept_unary_stream(self, continuation, client_call_details, request):
@_with_active_operation
async def intercept_unary_stream(
self, operation, continuation, client_call_details, request
):
"""
Interceptor for streaming rpcs:
- ReadRows
Expand All @@ -58,21 +131,42 @@ async def intercept_unary_stream(self, continuation, client_call_details, reques
"""
try:
return self._streaming_generator_wrapper(
await continuation(client_call_details, request)
operation, await continuation(client_call_details, request)
)
except Exception as rpc_error:
# handle errors while intializing stream
metadata = await _get_metadata(rpc_error)
if metadata is not None:
operation.add_response_metadata(metadata)
raise rpc_error

@staticmethod
@CrossSync.convert
async def _streaming_generator_wrapper(call):
async def _streaming_generator_wrapper(operation, call):
"""
Wrapped generator to be returned by intercept_unary_stream.
"""
# only track has_first response for READ_ROWS
has_first_response = (
operation.first_response_latency_ns is not None
or operation.op_type != OperationType.READ_ROWS
)
encountered_exc = None
try:
async for response in call:
# record time to first response. Currently only used for READ_ROWs
if not has_first_response:
operation.first_response_latency_ns = (
time.monotonic_ns() - operation.start_time_ns
)
has_first_response = True
yield response
except Exception as e:
# handle errors while processing stream
raise e
encountered_exc = e
raise
finally:
if call is not None:
metadata = await _get_metadata(encountered_exc or call)
if metadata is not None:
operation.add_response_metadata(metadata)
59 changes: 59 additions & 0 deletions google/cloud/bigtable/data/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery

from google.api_core import exceptions as core_exceptions
from google.api_core.retry import exponential_sleep_generator
from google.api_core.retry import RetryFailureReason
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup

Expand Down Expand Up @@ -248,3 +249,61 @@ def _get_retryable_errors(
call_codes = table.default_mutate_rows_retryable_errors

return [_get_error_type(e) for e in call_codes]


class TrackedBackoffGenerator:
"""
Generator class for exponential backoff sleep times.
This implementation builds on top of api_core.retries.exponential_sleep_generator,
adding the ability to retrieve previous values using get_attempt_backoff(idx).
This is used by the Metrics class to track the sleep times used for each attempt.
"""

def __init__(self, initial=0.01, maximum=60, multiplier=2):
self.history = []
self.subgenerator = exponential_sleep_generator(
initial=initial, maximum=maximum, multiplier=multiplier
)
self._next_override: float | None = None

def __iter__(self):
return self

def set_next(self, next_value: float):
"""
Set the next backoff value, instead of generating one from subgenerator.
After the value is yielded, it will go back to using self.subgenerator.

If set_next is called twice before the next() is called, only the latest
value will be used and others discarded

Args:
next_value: the upcomming value to yield when next() is called
Raises:
ValueError: if next_value is negative
"""
if next_value < 0:
raise ValueError("backoff value cannot be less than 0")
self._next_override = next_value

def __next__(self) -> float:
if self._next_override is not None:
next_backoff = self._next_override
self._next_override = None
else:
next_backoff = next(self.subgenerator)
self.history.append(next_backoff)
return next_backoff

def get_attempt_backoff(self, attempt_idx) -> float:
"""
returns the backoff time for a specific attempt index, starting at 0.

Args:
attempt_idx: the index of the attempt to return backoff for
Raises:
IndexError: if attempt_idx is negative, or not in history
"""
if attempt_idx < 0:
raise IndexError("received negative attempt number")
return self.history[attempt_idx]
35 changes: 35 additions & 0 deletions google/cloud/bigtable/data/_metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from google.cloud.bigtable.data._metrics.metrics_controller import (
BigtableClientSideMetricsController,
)

from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric
from google.cloud.bigtable.data._metrics.data_model import ActiveAttemptMetric
from google.cloud.bigtable.data._metrics.data_model import CompletedOperationMetric
from google.cloud.bigtable.data._metrics.data_model import CompletedAttemptMetric
from google.cloud.bigtable.data._metrics.data_model import OperationState
from google.cloud.bigtable.data._metrics.data_model import OperationType
from google.cloud.bigtable.data._metrics.tracked_retry import tracked_retry

__all__ = (
"BigtableClientSideMetricsController",
"OperationType",
"OperationState",
"ActiveOperationMetric",
"ActiveAttemptMetric",
"CompletedOperationMetric",
"CompletedAttemptMetric",
"tracked_retry",
)
Loading
Loading