Skip to content

Conversation

@daniel-sanche
Copy link
Contributor

@daniel-sanche daniel-sanche commented Aug 11, 2025

Blocked on #1206

This PR revives #923, which was de-priotirized to work on the sync client. This PR brings it back, working with both async and sync. It also adds a grpc interceptor, as an improved way to capture metadata across both clients


Design

The main architecture looks like this:

300137129-bebbb05a-20f0-45c2-9d38-e95a314edf64 drawio (1)

Most of the work is done by the ActiveOperationMetric class, which is instantiated with each rpc call, and updated through the lifecycle of the call. When the rpc is complete, it will call on_operation_complete and on_attempt_complete on the MetricsHandler, which can then log the completed data into OpenTelemetry (or theoretically, other locations if needed)

Note that there are separate classes for active vs completed metrics (ActiveOperationMetric, ActiveAttemptMetric, CompletedOperationMetric, CompletedAttemptMetric). This is so that we can keep fields mutable and optional while the request is ongoing, but pass down static immutable copies once the attempt is completed and no new data is coming

@yoshi-kokoro yoshi-kokoro removed the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Nov 26, 2025
@daniel-sanche daniel-sanche added the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Nov 26, 2025
@yoshi-kokoro yoshi-kokoro removed the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Nov 26, 2025
@daniel-sanche daniel-sanche marked this pull request as ready for review November 27, 2025 00:57
@daniel-sanche daniel-sanche requested review from a team as code owners November 27, 2025 00:57
@daniel-sanche
Copy link
Contributor Author

Before merging, we should re-run the benchmarking code to make sure we are satisfied with the performance

@vermas2012 vermas2012 assigned mutianf and unassigned vermas2012 Dec 5, 2025
DEFAULT_CLUSTER_ID = "unspecified"

# keys for parsing metadata blobs
BIGTABLE_METADATA_KEY = "x-goog-ext-425905942-bin"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe a more descriptive name like BIGTABLE_LOCATION_METADATA_KEY?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, I'll change this

class OperationType(Enum):
"""Enum for the type of operation being performed."""

READ_ROWS = "ReadRows"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there should also be a READ_ROW so we know if it's a point read or a scan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had this discussion when I did the first draft of this, and we landed on just keeping READ_ROWS. But I'm totally fine either way, if you feel differently now

MUTATE_ROW = "MutateRow"
CHECK_AND_MUTATE = "CheckAndMutateRow"
READ_MODIFY_WRITE = "ReadModifyWriteRow"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about BulkMutateRows for write batcher?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one called BULK_MUTATE_ROWS (although it maps to MutateRows). Do we need another one?

backoff_before_attempt_ns: int = 0
# time waiting on grpc channel, in nanoseconds
# TODO: capture grpc_throttling_time
grpc_throttling_time_ns: int = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi: we realized that in java this metric also doesn't capture the time a request queued on the channel. So if it's hard to get it in python we can skip it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, maybe I'll remove this then

op_type=self.op_type,
uuid=self.uuid,
completed_attempts=self.completed_attempts,
duration_ns=time.monotonic_ns() - self.start_time_ns,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, can we add a sanity check to make sure it's >= 0 ( or if its negative use 0 ) so that in case there's a bug in the code csm won't break the client.


@CrossSync.convert
async def intercept_unary_unary(self, continuation, client_call_details, request):
@_with_active_operation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this called? Can you point me to the code location? It feels a bit weird that starting an attempt is called from an interceptor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of this is here. The wrapper is executed before each intercept_unary_unary call.

The main purpose of this is to add the operation argument to this method, since that's not part of the regular intercept_unary_unary signature.

But yeah, there is a line to start an attempt if the associated rpc wasn't started previously. I can't remember if that was actually needed, or if I added that as a safeguard. I'll look into it

def __init__(self, **kwargs):
pass

def on_operation_complete(self, op: CompletedOperationMetric) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is on_operation_complete called vs end_with_status in ActiveOperationMetric?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This represents the split between recording metric data, and then exporting it somewhere else

  • ActiveOperationMetric is the class that tracks an ongoing operation. It's passed around through an rpc's lifecycle, and is finalized by calling operation.end_with_status()
    • this will be called throughout the library codebase
  • MetricHandler represents a destination for the metrics, like OTel. The implementation of this is mostly in a separate PR. The ActiveOperationMetric will call metric_handler.on_operation_complete() when it's time to export a new metric
    • this is really internal to the _metrics implementation, and won't be used in other places

# record metadata from failed rpc
if isinstance(exc, GoogleAPICallError) and exc.errors:
rpc_error = exc.errors[-1]
metadata = list(rpc_error.trailing_metadata()) + list(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this call metrics_interceptor._get_metadata()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so

The metrics_interceptor and tracked_retry are two separate methods for capturing metadata in different contexts, but the source of truth is always operation.add_response_metadata(), which they both write to

# record ending attempt for timeout failures
attempt_exc = exc_list[-1]
_track_retryable_error(operation)(attempt_exc)
operation.end_with_status(source_exc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is end_with_success called?

Copy link
Contributor Author

@daniel-sanche daniel-sanche Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tracked_retry object won't be notified when the stream ends successfully, so I think we'll have to instrument those manually


Stepping back a bit, I found rpcs behave pretty differently depending on if its unary vs stream, sync vs async, or retryable vs single attempt. I found I had to provide a couple different instrumentation methods to capture everything, and use different strategies in the instrumentation code instead of capturing it all the same way. I documented some of this in go/bigtable-csm-python

If you're finding it too confusing, let me know. Maybe I can improve the documentation around this, or find ways to streamline it a bit more

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will take a look at the doc!

Co-authored-by: Mattie Fu <mattiefu@google.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: bigtable Issues related to the googleapis/python-bigtable API. size: xl Pull request size is extra large.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants