Skip to content

Commit ebe4b07

Browse files
samikshya-dbclaude
andauthored
feat: Implement host-level telemetry batching to reduce rate limiting (#718)
* feat: Implement host-level telemetry batching to reduce rate limiting Changes telemetry client architecture from per-session to per-host batching, matching the JDBC driver implementation. This reduces the number of HTTP requests to the telemetry endpoint and prevents rate limiting in test environments. Key changes: - Add _TelemetryClientHolder with reference counting for shared clients - Change TelemetryClientFactory to key clients by host_url instead of session_id - Add getHostUrlSafely() helper for defensive null handling - Update all callers (client.py, exc.py, latency_logger.py) to pass host_url Before: 100 connections to same host = 100 separate TelemetryClients After: 100 connections to same host = 1 shared TelemetryClient (refcount=100) This fixes rate limiting issues seen in e2e tests where 300+ parallel connections were overwhelming the telemetry endpoint with 429 errors. * chore: Change all telemetry logging to DEBUG level Reduces log noise by changing all telemetry-related log statements (info, warning, error) to debug level. Telemetry operations are background tasks and should not clutter logs with operational messages. Changes: - Circuit breaker state changes: info/warning -> debug - Telemetry send failures: error -> debug - All telemetry operations now consistently use debug level * chore: Fix remaining telemetry warning log to debug Changes remaining logger.warning in telemetry_push_client.py to debug level for consistency with other telemetry logging. * fix: Update tests to use host_url instead of session_id_hex - Update circuit breaker test to check logger.debug instead of logger.info - Replace all session_id_hex test parameters with host_url - Apply Black formatting to exc.py and telemetry_client.py This fixes test failures caused by the signature change from session_id_hex to host_url in the Error class and TelemetryClientFactory. * fix: Revert session_id_hex in tests for functions that still use it Only Error classes changed from session_id_hex to host_url. Other classes (TelemetryClient, ResultSetDownloadHandler, etc.) still use session_id_hex. Reverted: - test_telemetry.py: TelemetryClient and initialize_telemetry_client - test_downloader.py: ResultSetDownloadHandler - test_download_manager.py: ResultFileDownloadManager Kept as host_url: - test_client.py: Error class instantiation * fix: Update all Error raises and test calls to use host_url Changes: 1. client.py: Changed all error raises from session_id_hex to host_url - Connection class: session_id_hex=self.get_session_id_hex() -> host_url=self.session.host - Cursor class: session_id_hex=self.connection.get_session_id_hex() -> host_url=self.connection.session.host 2. test_telemetry.py: Updated get_telemetry_client() and close() calls - get_telemetry_client(session_id) -> get_telemetry_client(host_url) - close(session_id) -> close(host_url=host_url) 3. test_telemetry_push_client.py: Changed logger.warning to logger.debug - Updated test assertion to match debug logging level These changes complete the migration from session-level to host-level telemetry client management. * fix: Update thrift_backend.py to use host_url instead of session_id_hex Changes: 1. Added self._host attribute to store server_hostname 2. Updated all error raises to use host_url=self._host 3. Changed method signatures from session_id_hex to host_url: - _check_response_for_error - _hive_schema_to_arrow_schema - _col_to_description - _hive_schema_to_description - _check_direct_results_for_error 4. Updated all method calls to pass self._host instead of self._session_id_hex This completes the migration from session-level to host-level error reporting. * Fix Black formatting by adjusting fmt directive placement Moved the `# fmt: on` directive to the except block level instead of inside the if statement to resolve Black parsing confusion. * Fix telemetry feature flag tests to set mock session host The tests were failing because they called get_telemetry_client("test") but the mock session didn't have .host set, so the telemetry client was registered under a different key (likely None or MagicMock). This caused the factory to return NoopTelemetryClient instead of the expected client. Fixed by setting mock_session_instance.host = "test" in all three tests. * Add teardown_method to clear telemetry factory state between tests Without this cleanup, tests were sharing telemetry clients because they all used the same host key ("test"), causing test pollution. The first test would create an enabled client, and subsequent tests would reuse it even when they expected a disabled client. * Clear feature flag context cache in teardown to fix test pollution The FeatureFlagsContextFactory caches feature flag contexts per session, causing tests to share the same feature flag state. This resulted in the first test creating a context with telemetry enabled, and subsequent tests incorrectly reusing that enabled state even when they expected disabled. * fix: Access actual client from holder in flush worker The flush worker was calling _flush() on _TelemetryClientHolder objects instead of the actual TelemetryClient. Fixed by accessing holder.client before calling _flush(). Fixes AttributeError in e2e tests: '_TelemetryClientHolder' object has no attribute '_flush' * Clear telemetry client cache in e2e test teardown Added _clients.clear() to the teardown fixture to prevent telemetry clients from persisting across e2e tests, which was causing session ID pollution in test_concurrent_queries_sends_telemetry. * Pass session_id parameter to telemetry export methods With host-level telemetry batching, multiple connections share one TelemetryClient. Each client stores session_id_hex from the first connection that created it. This caused all subsequent connections' telemetry events to use the wrong session ID. Changes: - Modified telemetry export method signatures to accept optional session_id - Updated Connection.export_initial_telemetry_log() to pass session_id - Updated latency_logger.py export_latency_log() to pass session_id - Updated Error.__init__() to accept optional session_id_hex and pass it - Updated all error raises in Connection and Cursor to pass session_id_hex 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * Fix Black formatting in telemetry_client.py * Use 'test-host' instead of 'test' for mock host in telemetry tests * Replace test-session-id with test-host in test_client.py * Fix telemetry client lookup to use test-host in tests * Make session_id_hex keyword-only parameter in Error.__init__ --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent d524f0e commit ebe4b07

File tree

12 files changed

+262
-108
lines changed

12 files changed

+262
-108
lines changed

src/databricks/sql/backend/thrift_backend.py

Lines changed: 34 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ def __init__(
163163
else:
164164
raise ValueError("No valid connection settings.")
165165

166+
self._host = server_hostname
166167
self._initialize_retry_args(kwargs)
167168
self._use_arrow_native_complex_types = kwargs.get(
168169
"_use_arrow_native_complex_types", True
@@ -279,14 +280,14 @@ def _initialize_retry_args(self, kwargs):
279280
)
280281

281282
@staticmethod
282-
def _check_response_for_error(response, session_id_hex=None):
283+
def _check_response_for_error(response, host_url=None):
283284
if response.status and response.status.statusCode in [
284285
ttypes.TStatusCode.ERROR_STATUS,
285286
ttypes.TStatusCode.INVALID_HANDLE_STATUS,
286287
]:
287288
raise DatabaseError(
288289
response.status.errorMessage,
289-
session_id_hex=session_id_hex,
290+
host_url=host_url,
290291
)
291292

292293
@staticmethod
@@ -340,7 +341,7 @@ def _handle_request_error(self, error_info, attempt, elapsed):
340341
network_request_error = RequestError(
341342
user_friendly_error_message,
342343
full_error_info_context,
343-
self._session_id_hex,
344+
self._host,
344345
error_info.error,
345346
)
346347
logger.info(network_request_error.message_with_context())
@@ -461,13 +462,12 @@ def attempt_request(attempt):
461462
errno.ECONNRESET, # | 104 | 54 |
462463
errno.ETIMEDOUT, # | 110 | 60 |
463464
]
465+
# fmt: on
464466

465467
gos_name = TCLIServiceClient.GetOperationStatus.__name__
466468
# retry on timeout. Happens a lot in Azure and it is safe as data has not been sent to server yet
467469
if method.__name__ == gos_name or err.errno == errno.ETIMEDOUT:
468470
retry_delay = bound_retry_delay(attempt, self._retry_delay_default)
469-
470-
# fmt: on
471471
log_string = f"{gos_name} failed with code {err.errno} and will attempt to retry"
472472
if err.errno in info_errs:
473473
logger.info(log_string)
@@ -516,9 +516,7 @@ def attempt_request(attempt):
516516
if not isinstance(response_or_error_info, RequestErrorInfo):
517517
# log nothing here, presume that main request logging covers
518518
response = response_or_error_info
519-
ThriftDatabricksClient._check_response_for_error(
520-
response, self._session_id_hex
521-
)
519+
ThriftDatabricksClient._check_response_for_error(response, self._host)
522520
return response
523521

524522
error_info = response_or_error_info
@@ -533,7 +531,7 @@ def _check_protocol_version(self, t_open_session_resp):
533531
"Error: expected server to use a protocol version >= "
534532
"SPARK_CLI_SERVICE_PROTOCOL_V2, "
535533
"instead got: {}".format(protocol_version),
536-
session_id_hex=self._session_id_hex,
534+
host_url=self._host,
537535
)
538536

539537
def _check_initial_namespace(self, catalog, schema, response):
@@ -547,15 +545,15 @@ def _check_initial_namespace(self, catalog, schema, response):
547545
raise InvalidServerResponseError(
548546
"Setting initial namespace not supported by the DBR version, "
549547
"Please use a Databricks SQL endpoint or a cluster with DBR >= 9.0.",
550-
session_id_hex=self._session_id_hex,
548+
host_url=self._host,
551549
)
552550

553551
if catalog:
554552
if not response.canUseMultipleCatalogs:
555553
raise InvalidServerResponseError(
556554
"Unexpected response from server: Trying to set initial catalog to {}, "
557555
+ "but server does not support multiple catalogs.".format(catalog), # type: ignore
558-
session_id_hex=self._session_id_hex,
556+
host_url=self._host,
559557
)
560558

561559
def _check_session_configuration(self, session_configuration):
@@ -570,7 +568,7 @@ def _check_session_configuration(self, session_configuration):
570568
TIMESTAMP_AS_STRING_CONFIG,
571569
session_configuration[TIMESTAMP_AS_STRING_CONFIG],
572570
),
573-
session_id_hex=self._session_id_hex,
571+
host_url=self._host,
574572
)
575573

576574
def open_session(self, session_configuration, catalog, schema) -> SessionId:
@@ -639,7 +637,7 @@ def _check_command_not_in_error_or_closed_state(
639637
and guid_to_hex_id(op_handle.operationId.guid),
640638
"diagnostic-info": get_operations_resp.diagnosticInfo,
641639
},
642-
session_id_hex=self._session_id_hex,
640+
host_url=self._host,
643641
)
644642
else:
645643
raise ServerOperationError(
@@ -649,7 +647,7 @@ def _check_command_not_in_error_or_closed_state(
649647
and guid_to_hex_id(op_handle.operationId.guid),
650648
"diagnostic-info": None,
651649
},
652-
session_id_hex=self._session_id_hex,
650+
host_url=self._host,
653651
)
654652
elif get_operations_resp.operationState == ttypes.TOperationState.CLOSED_STATE:
655653
raise DatabaseError(
@@ -660,7 +658,7 @@ def _check_command_not_in_error_or_closed_state(
660658
"operation-id": op_handle
661659
and guid_to_hex_id(op_handle.operationId.guid)
662660
},
663-
session_id_hex=self._session_id_hex,
661+
host_url=self._host,
664662
)
665663

666664
def _poll_for_status(self, op_handle):
@@ -683,7 +681,7 @@ def _create_arrow_table(self, t_row_set, lz4_compressed, schema_bytes, descripti
683681
else:
684682
raise OperationalError(
685683
"Unsupported TRowSet instance {}".format(t_row_set),
686-
session_id_hex=self._session_id_hex,
684+
host_url=self._host,
687685
)
688686
return convert_decimals_in_arrow_table(arrow_table, description), num_rows
689687

@@ -692,7 +690,7 @@ def _get_metadata_resp(self, op_handle):
692690
return self.make_request(self._client.GetResultSetMetadata, req)
693691

694692
@staticmethod
695-
def _hive_schema_to_arrow_schema(t_table_schema, session_id_hex=None):
693+
def _hive_schema_to_arrow_schema(t_table_schema, host_url=None):
696694
def map_type(t_type_entry):
697695
if t_type_entry.primitiveEntry:
698696
return {
@@ -724,7 +722,7 @@ def map_type(t_type_entry):
724722
# even for complex types
725723
raise OperationalError(
726724
"Thrift protocol error: t_type_entry not a primitiveEntry",
727-
session_id_hex=session_id_hex,
725+
host_url=host_url,
728726
)
729727

730728
def convert_col(t_column_desc):
@@ -735,7 +733,7 @@ def convert_col(t_column_desc):
735733
return pyarrow.schema([convert_col(col) for col in t_table_schema.columns])
736734

737735
@staticmethod
738-
def _col_to_description(col, field=None, session_id_hex=None):
736+
def _col_to_description(col, field=None, host_url=None):
739737
type_entry = col.typeDesc.types[0]
740738

741739
if type_entry.primitiveEntry:
@@ -745,7 +743,7 @@ def _col_to_description(col, field=None, session_id_hex=None):
745743
else:
746744
raise OperationalError(
747745
"Thrift protocol error: t_type_entry not a primitiveEntry",
748-
session_id_hex=session_id_hex,
746+
host_url=host_url,
749747
)
750748

751749
if type_entry.primitiveEntry.type == ttypes.TTypeId.DECIMAL_TYPE:
@@ -759,7 +757,7 @@ def _col_to_description(col, field=None, session_id_hex=None):
759757
raise OperationalError(
760758
"Decimal type did not provide typeQualifier precision, scale in "
761759
"primitiveEntry {}".format(type_entry.primitiveEntry),
762-
session_id_hex=session_id_hex,
760+
host_url=host_url,
763761
)
764762
else:
765763
precision, scale = None, None
@@ -778,9 +776,7 @@ def _col_to_description(col, field=None, session_id_hex=None):
778776
return col.columnName, cleaned_type, None, None, precision, scale, None
779777

780778
@staticmethod
781-
def _hive_schema_to_description(
782-
t_table_schema, schema_bytes=None, session_id_hex=None
783-
):
779+
def _hive_schema_to_description(t_table_schema, schema_bytes=None, host_url=None):
784780
field_dict = {}
785781
if pyarrow and schema_bytes:
786782
try:
@@ -795,7 +791,7 @@ def _hive_schema_to_description(
795791
ThriftDatabricksClient._col_to_description(
796792
col,
797793
field_dict.get(col.columnName) if field_dict else None,
798-
session_id_hex,
794+
host_url,
799795
)
800796
for col in t_table_schema.columns
801797
]
@@ -818,7 +814,7 @@ def _results_message_to_execute_response(self, resp, operation_state):
818814
t_result_set_metadata_resp.resultFormat
819815
]
820816
),
821-
session_id_hex=self._session_id_hex,
817+
host_url=self._host,
822818
)
823819
direct_results = resp.directResults
824820
has_been_closed_server_side = direct_results and direct_results.closeOperation
@@ -833,7 +829,7 @@ def _results_message_to_execute_response(self, resp, operation_state):
833829
schema_bytes = (
834830
t_result_set_metadata_resp.arrowSchema
835831
or self._hive_schema_to_arrow_schema(
836-
t_result_set_metadata_resp.schema, self._session_id_hex
832+
t_result_set_metadata_resp.schema, self._host
837833
)
838834
.serialize()
839835
.to_pybytes()
@@ -844,7 +840,7 @@ def _results_message_to_execute_response(self, resp, operation_state):
844840
description = self._hive_schema_to_description(
845841
t_result_set_metadata_resp.schema,
846842
schema_bytes,
847-
self._session_id_hex,
843+
self._host,
848844
)
849845

850846
lz4_compressed = t_result_set_metadata_resp.lz4Compressed
@@ -895,7 +891,7 @@ def get_execution_result(
895891
schema_bytes = (
896892
t_result_set_metadata_resp.arrowSchema
897893
or self._hive_schema_to_arrow_schema(
898-
t_result_set_metadata_resp.schema, self._session_id_hex
894+
t_result_set_metadata_resp.schema, self._host
899895
)
900896
.serialize()
901897
.to_pybytes()
@@ -906,7 +902,7 @@ def get_execution_result(
906902
description = self._hive_schema_to_description(
907903
t_result_set_metadata_resp.schema,
908904
schema_bytes,
909-
self._session_id_hex,
905+
self._host,
910906
)
911907

912908
lz4_compressed = t_result_set_metadata_resp.lz4Compressed
@@ -971,27 +967,27 @@ def get_query_state(self, command_id: CommandId) -> CommandState:
971967
return state
972968

973969
@staticmethod
974-
def _check_direct_results_for_error(t_spark_direct_results, session_id_hex=None):
970+
def _check_direct_results_for_error(t_spark_direct_results, host_url=None):
975971
if t_spark_direct_results:
976972
if t_spark_direct_results.operationStatus:
977973
ThriftDatabricksClient._check_response_for_error(
978974
t_spark_direct_results.operationStatus,
979-
session_id_hex,
975+
host_url,
980976
)
981977
if t_spark_direct_results.resultSetMetadata:
982978
ThriftDatabricksClient._check_response_for_error(
983979
t_spark_direct_results.resultSetMetadata,
984-
session_id_hex,
980+
host_url,
985981
)
986982
if t_spark_direct_results.resultSet:
987983
ThriftDatabricksClient._check_response_for_error(
988984
t_spark_direct_results.resultSet,
989-
session_id_hex,
985+
host_url,
990986
)
991987
if t_spark_direct_results.closeOperation:
992988
ThriftDatabricksClient._check_response_for_error(
993989
t_spark_direct_results.closeOperation,
994-
session_id_hex,
990+
host_url,
995991
)
996992

997993
def execute_command(
@@ -1260,7 +1256,7 @@ def _handle_execute_response(self, resp, cursor):
12601256
raise ValueError(f"Invalid Thrift handle: {resp.operationHandle}")
12611257

12621258
cursor.active_command_id = command_id
1263-
self._check_direct_results_for_error(resp.directResults, self._session_id_hex)
1259+
self._check_direct_results_for_error(resp.directResults, self._host)
12641260

12651261
final_operation_state = self._wait_until_command_done(
12661262
resp.operationHandle,
@@ -1275,7 +1271,7 @@ def _handle_execute_response_async(self, resp, cursor):
12751271
raise ValueError(f"Invalid Thrift handle: {resp.operationHandle}")
12761272

12771273
cursor.active_command_id = command_id
1278-
self._check_direct_results_for_error(resp.directResults, self._session_id_hex)
1274+
self._check_direct_results_for_error(resp.directResults, self._host)
12791275

12801276
def fetch_results(
12811277
self,
@@ -1313,7 +1309,7 @@ def fetch_results(
13131309
"fetch_results failed due to inconsistency in the state between the client and the server. Expected results to start from {} but they instead start at {}, some result batches must have been skipped".format(
13141310
expected_row_start_offset, resp.results.startRowOffset
13151311
),
1316-
session_id_hex=self._session_id_hex,
1312+
host_url=self._host,
13171313
)
13181314

13191315
queue = ThriftResultSetQueueFactory.build_queue(

0 commit comments

Comments
 (0)