From 3bfbac15ed939b8b59a3c573de03aff275c43ed9 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Mon, 1 Dec 2025 19:09:23 +0000 Subject: [PATCH 01/19] feat: Implement host-level telemetry batching to reduce rate limiting Changes telemetry client architecture from per-session to per-host batching, matching the JDBC driver implementation. This reduces the number of HTTP requests to the telemetry endpoint and prevents rate limiting in test environments. Key changes: - Add _TelemetryClientHolder with reference counting for shared clients - Change TelemetryClientFactory to key clients by host_url instead of session_id - Add getHostUrlSafely() helper for defensive null handling - Update all callers (client.py, exc.py, latency_logger.py) to pass host_url Before: 100 connections to same host = 100 separate TelemetryClients After: 100 connections to same host = 1 shared TelemetryClient (refcount=100) This fixes rate limiting issues seen in e2e tests where 300+ parallel connections were overwhelming the telemetry endpoint with 429 errors. --- src/databricks/sql/client.py | 4 +- src/databricks/sql/exc.py | 6 +- .../sql/telemetry/latency_logger.py | 2 +- .../sql/telemetry/telemetry_client.py | 147 ++++++++++++++---- 4 files changed, 126 insertions(+), 33 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index a7f802dcd..2471725dd 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -341,7 +341,7 @@ def read(self) -> Optional[OAuthToken]: ) self._telemetry_client = TelemetryClientFactory.get_telemetry_client( - session_id_hex=self.get_session_id_hex() + host_url=self.session.host ) # Determine proxy usage @@ -521,7 +521,7 @@ def _close(self, close_cursors=True) -> None: except Exception as e: logger.error(f"Attempt to close session raised a local exception: {e}") - TelemetryClientFactory.close(self.get_session_id_hex()) + TelemetryClientFactory.close(host_url=self.session.host) # Close HTTP client that was created by this connection if self.http_client: diff --git a/src/databricks/sql/exc.py b/src/databricks/sql/exc.py index 24844d573..a30dee3e3 100644 --- a/src/databricks/sql/exc.py +++ b/src/databricks/sql/exc.py @@ -12,18 +12,18 @@ class Error(Exception): """ def __init__( - self, message=None, context=None, session_id_hex=None, *args, **kwargs + self, message=None, context=None, host_url=None, *args, **kwargs ): super().__init__(message, *args, **kwargs) self.message = message self.context = context or {} error_name = self.__class__.__name__ - if session_id_hex: + if host_url: from databricks.sql.telemetry.telemetry_client import TelemetryClientFactory telemetry_client = TelemetryClientFactory.get_telemetry_client( - session_id_hex + host_url=host_url ) telemetry_client.export_failure_log(error_name, self.message) diff --git a/src/databricks/sql/telemetry/latency_logger.py b/src/databricks/sql/telemetry/latency_logger.py index 36ebee2b8..a0322390e 100644 --- a/src/databricks/sql/telemetry/latency_logger.py +++ b/src/databricks/sql/telemetry/latency_logger.py @@ -205,7 +205,7 @@ def wrapper(self, *args, **kwargs): telemetry_client = ( TelemetryClientFactory.get_telemetry_client( - session_id_hex + host_url=connection.session.host ) ) telemetry_client.export_latency_log( diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index d5f5b575c..5b2da145d 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -409,15 +409,39 @@ def close(self): self._flush() +class _TelemetryClientHolder: + """ + Holds a telemetry client with reference counting. + Multiple connections to the same host share one client. + """ + + def __init__(self, client: BaseTelemetryClient): + self.client = client + self.refcount = 1 + + def increment(self): + """Increment reference count when a new connection uses this client""" + self.refcount += 1 + + def decrement(self): + """Decrement reference count when a connection closes""" + self.refcount -= 1 + return self.refcount + + class TelemetryClientFactory: """ Static factory class for creating and managing telemetry clients. It uses a thread pool to handle asynchronous operations and a single flush thread for all clients. + + Clients are shared at the HOST level - multiple connections to the same host + share a single TelemetryClient to enable efficient batching and reduce load + on the telemetry endpoint. """ _clients: Dict[ - str, BaseTelemetryClient - ] = {} # Map of session_id_hex -> BaseTelemetryClient + str, _TelemetryClientHolder + ] = {} # Map of host_url -> TelemetryClientHolder _executor: Optional[ThreadPoolExecutor] = None _initialized: bool = False _lock = threading.RLock() # Thread safety for factory operations @@ -431,6 +455,22 @@ class TelemetryClientFactory: _flush_interval_seconds = 300 # 5 minutes DEFAULT_BATCH_SIZE = 100 + UNKNOWN_HOST = "unknown-host" + + @staticmethod + def getHostUrlSafely(host_url): + """ + Safely get host URL with fallback to UNKNOWN_HOST. + + Args: + host_url: The host URL to validate + + Returns: + The host_url if valid, otherwise UNKNOWN_HOST + """ + if not host_url or not isinstance(host_url, str) or not host_url.strip(): + return TelemetryClientFactory.UNKNOWN_HOST + return host_url @classmethod def _initialize(cls): @@ -506,21 +546,38 @@ def initialize_telemetry_client( batch_size, client_context, ): - """Initialize a telemetry client for a specific connection if telemetry is enabled""" + """ + Initialize a telemetry client for a specific connection if telemetry is enabled. + + Clients are shared at the HOST level - multiple connections to the same host + will share a single TelemetryClient with reference counting. + """ try: + # Safely get host_url with fallback to UNKNOWN_HOST + host_url = TelemetryClientFactory.getHostUrlSafely(host_url) with TelemetryClientFactory._lock: TelemetryClientFactory._initialize() - if session_id_hex not in TelemetryClientFactory._clients: + if host_url in TelemetryClientFactory._clients: + # Reuse existing client for this host + holder = TelemetryClientFactory._clients[host_url] + holder.increment() + logger.debug( + "Reusing TelemetryClient for host %s (session %s, refcount=%d)", + host_url, + session_id_hex, + holder.refcount, + ) + else: + # Create new client for this host logger.debug( - "Creating new TelemetryClient for connection %s", + "Creating new TelemetryClient for host %s (session %s)", + host_url, session_id_hex, ) if telemetry_enabled: - TelemetryClientFactory._clients[ - session_id_hex - ] = TelemetryClient( + client = TelemetryClient( telemetry_enabled=telemetry_enabled, session_id_hex=session_id_hex, auth_provider=auth_provider, @@ -529,36 +586,72 @@ def initialize_telemetry_client( batch_size=batch_size, client_context=client_context, ) + TelemetryClientFactory._clients[host_url] = _TelemetryClientHolder( + client + ) else: - TelemetryClientFactory._clients[ - session_id_hex - ] = NoopTelemetryClient() + TelemetryClientFactory._clients[host_url] = _TelemetryClientHolder( + NoopTelemetryClient() + ) except Exception as e: logger.debug("Failed to initialize telemetry client: %s", e) # Fallback to NoopTelemetryClient to ensure connection doesn't fail - TelemetryClientFactory._clients[session_id_hex] = NoopTelemetryClient() + TelemetryClientFactory._clients[host_url] = _TelemetryClientHolder( + NoopTelemetryClient() + ) @staticmethod - def get_telemetry_client(session_id_hex): - """Get the telemetry client for a specific connection""" - return TelemetryClientFactory._clients.get( - session_id_hex, NoopTelemetryClient() - ) + def get_telemetry_client(host_url): + """ + Get the shared telemetry client for a specific host. + + Args: + host_url: The host URL to look up the client. If None/empty, uses UNKNOWN_HOST. + + Returns: + The shared TelemetryClient for this host, or NoopTelemetryClient if not found + """ + host_url = TelemetryClientFactory.getHostUrlSafely(host_url) + + if host_url in TelemetryClientFactory._clients: + return TelemetryClientFactory._clients[host_url].client + return NoopTelemetryClient() @staticmethod - def close(session_id_hex): - """Close and remove the telemetry client for a specific connection""" + def close(host_url): + """ + Close the telemetry client for a specific host. + + Decrements the reference count for the host's client. Only actually closes + the client when the reference count reaches zero (all connections to this host closed). + + Args: + host_url: The host URL whose client to close. If None/empty, uses UNKNOWN_HOST. + """ + host_url = TelemetryClientFactory.getHostUrlSafely(host_url) with TelemetryClientFactory._lock: - if ( - telemetry_client := TelemetryClientFactory._clients.pop( - session_id_hex, None - ) - ) is not None: + # Get the holder for this host + holder = TelemetryClientFactory._clients.get(host_url) + if holder is None: + logger.debug("No telemetry client found for host %s", host_url) + return + + # Decrement refcount + remaining_refs = holder.decrement() + logger.debug( + "Decremented refcount for host %s (refcount=%d)", + host_url, + remaining_refs, + ) + + # Only close if no more references + if remaining_refs <= 0: logger.debug( - "Removing telemetry client for connection %s", session_id_hex + "Closing telemetry client for host %s (no more references)", host_url ) - telemetry_client.close() + TelemetryClientFactory._clients.pop(host_url, None) + holder.client.close() # Shutdown executor if no more clients if not TelemetryClientFactory._clients and TelemetryClientFactory._executor: @@ -597,7 +690,7 @@ def connection_failure_log( ) telemetry_client = TelemetryClientFactory.get_telemetry_client( - UNAUTH_DUMMY_SESSION_ID + host_url=host_url ) telemetry_client._driver_connection_params = DriverConnectionParameters( http_path=http_path, From 5e29089212f18c53ec1a9674ff48467de9afa814 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Mon, 1 Dec 2025 19:27:08 +0000 Subject: [PATCH 02/19] chore: Change all telemetry logging to DEBUG level Reduces log noise by changing all telemetry-related log statements (info, warning, error) to debug level. Telemetry operations are background tasks and should not clutter logs with operational messages. Changes: - Circuit breaker state changes: info/warning -> debug - Telemetry send failures: error -> debug - All telemetry operations now consistently use debug level --- src/databricks/sql/telemetry/circuit_breaker_manager.py | 8 ++++---- src/databricks/sql/telemetry/telemetry_client.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/databricks/sql/telemetry/circuit_breaker_manager.py b/src/databricks/sql/telemetry/circuit_breaker_manager.py index 852f0d916..a5df7371e 100644 --- a/src/databricks/sql/telemetry/circuit_breaker_manager.py +++ b/src/databricks/sql/telemetry/circuit_breaker_manager.py @@ -60,16 +60,16 @@ def state_change(self, cb: CircuitBreaker, old_state, new_state) -> None: old_state_name = old_state.name if old_state else "None" new_state_name = new_state.name if new_state else "None" - logger.info( + logger.debug( LOG_CIRCUIT_BREAKER_STATE_CHANGED, old_state_name, new_state_name, cb.name ) if new_state_name == CIRCUIT_BREAKER_STATE_OPEN: - logger.warning(LOG_CIRCUIT_BREAKER_OPENED, cb.name) + logger.debug(LOG_CIRCUIT_BREAKER_OPENED, cb.name) elif new_state_name == CIRCUIT_BREAKER_STATE_CLOSED: - logger.info(LOG_CIRCUIT_BREAKER_CLOSED, cb.name) + logger.debug(LOG_CIRCUIT_BREAKER_CLOSED, cb.name) elif new_state_name == CIRCUIT_BREAKER_STATE_HALF_OPEN: - logger.info(LOG_CIRCUIT_BREAKER_HALF_OPEN, cb.name) + logger.debug(LOG_CIRCUIT_BREAKER_HALF_OPEN, cb.name) class CircuitBreakerManager: diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index 5b2da145d..6a9ec9a61 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -307,7 +307,7 @@ def _send_with_unified_client(self, url, data, headers, timeout=900): ) return response except Exception as e: - logger.error("Failed to send telemetry with unified client: %s", e) + logger.debug("Failed to send telemetry with unified client: %s", e) raise def _telemetry_request_callback(self, future, sent_count: int): From 26b67c72f60b4cda6d95698b48362fcf7cb63e53 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Mon, 1 Dec 2025 19:27:27 +0000 Subject: [PATCH 03/19] chore: Fix remaining telemetry warning log to debug Changes remaining logger.warning in telemetry_push_client.py to debug level for consistency with other telemetry logging. --- src/databricks/sql/telemetry/telemetry_push_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/sql/telemetry/telemetry_push_client.py b/src/databricks/sql/telemetry/telemetry_push_client.py index 461a57738..e77910007 100644 --- a/src/databricks/sql/telemetry/telemetry_push_client.py +++ b/src/databricks/sql/telemetry/telemetry_push_client.py @@ -120,7 +120,7 @@ def _make_request_and_check_status( # Check for rate limiting or service unavailable if response.status in [429, 503]: - logger.warning( + logger.debug( "Telemetry endpoint returned %d for host %s, triggering circuit breaker", response.status, self._host, From da13b9e84b541bea3bca8d1f859d4406ca908318 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Mon, 1 Dec 2025 19:39:04 +0000 Subject: [PATCH 04/19] fix: Update tests to use host_url instead of session_id_hex - Update circuit breaker test to check logger.debug instead of logger.info - Replace all session_id_hex test parameters with host_url - Apply Black formatting to exc.py and telemetry_client.py This fixes test failures caused by the signature change from session_id_hex to host_url in the Error class and TelemetryClientFactory. --- src/databricks/sql/exc.py | 4 +--- src/databricks/sql/telemetry/telemetry_client.py | 15 ++++++++------- tests/unit/test_circuit_breaker_manager.py | 2 +- tests/unit/test_client.py | 8 ++++---- tests/unit/test_download_manager.py | 2 +- tests/unit/test_downloader.py | 14 +++++++------- tests/unit/test_telemetry.py | 10 +++++----- 7 files changed, 27 insertions(+), 28 deletions(-) diff --git a/src/databricks/sql/exc.py b/src/databricks/sql/exc.py index a30dee3e3..e73da2981 100644 --- a/src/databricks/sql/exc.py +++ b/src/databricks/sql/exc.py @@ -11,9 +11,7 @@ class Error(Exception): `context`: Optional extra context about the error. MUST be JSON serializable """ - def __init__( - self, message=None, context=None, host_url=None, *args, **kwargs - ): + def __init__(self, message=None, context=None, host_url=None, *args, **kwargs): super().__init__(message, *args, **kwargs) self.message = message self.context = context or {} diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index 6a9ec9a61..aaea9bb3d 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -586,13 +586,13 @@ def initialize_telemetry_client( batch_size=batch_size, client_context=client_context, ) - TelemetryClientFactory._clients[host_url] = _TelemetryClientHolder( - client - ) + TelemetryClientFactory._clients[ + host_url + ] = _TelemetryClientHolder(client) else: - TelemetryClientFactory._clients[host_url] = _TelemetryClientHolder( - NoopTelemetryClient() - ) + TelemetryClientFactory._clients[ + host_url + ] = _TelemetryClientHolder(NoopTelemetryClient()) except Exception as e: logger.debug("Failed to initialize telemetry client: %s", e) # Fallback to NoopTelemetryClient to ensure connection doesn't fail @@ -648,7 +648,8 @@ def close(host_url): # Only close if no more references if remaining_refs <= 0: logger.debug( - "Closing telemetry client for host %s (no more references)", host_url + "Closing telemetry client for host %s (no more references)", + host_url, ) TelemetryClientFactory._clients.pop(host_url, None) holder.client.close() diff --git a/tests/unit/test_circuit_breaker_manager.py b/tests/unit/test_circuit_breaker_manager.py index e8ed4e809..1e02556d9 100644 --- a/tests/unit/test_circuit_breaker_manager.py +++ b/tests/unit/test_circuit_breaker_manager.py @@ -157,4 +157,4 @@ def test_circuit_breaker_state_listener_transitions(self, old_state, new_state): with patch("databricks.sql.telemetry.circuit_breaker_manager.logger") as mock_logger: listener.state_change(mock_cb, mock_old_state, mock_new_state) - mock_logger.info.assert_called() + mock_logger.debug.assert_called() diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index b515756e8..6b872b0a7 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -714,7 +714,7 @@ def test_autocommit_setter_wraps_database_error(self, mock_session_class): server_error = DatabaseError( "AUTOCOMMIT_SET_DURING_ACTIVE_TRANSACTION", context={"sql_state": "25000"}, - session_id_hex="test-session-id", + host_url="test-session-id", ) mock_cursor.execute.side_effect = server_error @@ -737,7 +737,7 @@ def test_autocommit_setter_preserves_exception_chain(self, mock_session_class): mock_cursor = Mock() original_error = DatabaseError( - "Original error", session_id_hex="test-session-id" + "Original error", host_url="test-session-id" ) mock_cursor.execute.side_effect = original_error @@ -772,7 +772,7 @@ def test_commit_wraps_database_error(self, mock_session_class): server_error = DatabaseError( "MULTI_STATEMENT_TRANSACTION_NO_ACTIVE_TRANSACTION", context={"sql_state": "25000"}, - session_id_hex="test-session-id", + host_url="test-session-id", ) mock_cursor.execute.side_effect = server_error @@ -822,7 +822,7 @@ def test_rollback_wraps_database_error(self, mock_session_class): server_error = DatabaseError( "Unexpected rollback error", context={"sql_state": "HY000"}, - session_id_hex="test-session-id", + host_url="test-session-id", ) mock_cursor.execute.side_effect = server_error diff --git a/tests/unit/test_download_manager.py b/tests/unit/test_download_manager.py index 1c77226a9..64a8e81a5 100644 --- a/tests/unit/test_download_manager.py +++ b/tests/unit/test_download_manager.py @@ -20,7 +20,7 @@ def create_download_manager( max_download_threads, lz4_compressed, ssl_options=SSLOptions(), - session_id_hex=Mock(), + host_url=Mock(), statement_id=Mock(), chunk_id=0, http_client=mock_http_client, diff --git a/tests/unit/test_downloader.py b/tests/unit/test_downloader.py index 00b1b849a..a74dfb639 100644 --- a/tests/unit/test_downloader.py +++ b/tests/unit/test_downloader.py @@ -57,7 +57,7 @@ def test_run_link_expired(self, mock_time): result_link, ssl_options=SSLOptions(), chunk_id=0, - session_id_hex=Mock(), + host_url=Mock(), statement_id=Mock(), http_client=mock_http_client, ) @@ -80,7 +80,7 @@ def test_run_link_past_expiry_buffer(self, mock_time): result_link, ssl_options=SSLOptions(), chunk_id=0, - session_id_hex=Mock(), + host_url=Mock(), statement_id=Mock(), http_client=mock_http_client, ) @@ -107,7 +107,7 @@ def test_run_get_response_not_ok(self, mock_time): result_link, ssl_options=SSLOptions(), chunk_id=0, - session_id_hex=Mock(), + host_url=Mock(), statement_id=Mock(), http_client=mock_http_client, ) @@ -137,7 +137,7 @@ def test_run_uncompressed_successful(self, mock_time): result_link, ssl_options=SSLOptions(), chunk_id=0, - session_id_hex=Mock(), + host_url=Mock(), statement_id=Mock(), http_client=mock_http_client, ) @@ -170,7 +170,7 @@ def test_run_compressed_successful(self, mock_time): result_link, ssl_options=SSLOptions(), chunk_id=0, - session_id_hex=Mock(), + host_url=Mock(), statement_id=Mock(), http_client=mock_http_client, ) @@ -194,7 +194,7 @@ def test_download_connection_error(self, mock_time): result_link, ssl_options=SSLOptions(), chunk_id=0, - session_id_hex=Mock(), + host_url=Mock(), statement_id=Mock(), http_client=mock_http_client, ) @@ -216,7 +216,7 @@ def test_download_timeout(self, mock_time): result_link, ssl_options=SSLOptions(), chunk_id=0, - session_id_hex=Mock(), + host_url=Mock(), statement_id=Mock(), http_client=mock_http_client, ) diff --git a/tests/unit/test_telemetry.py b/tests/unit/test_telemetry.py index 96a2f87d8..8123f6249 100644 --- a/tests/unit/test_telemetry.py +++ b/tests/unit/test_telemetry.py @@ -46,7 +46,7 @@ def mock_telemetry_client(): ): return TelemetryClient( telemetry_enabled=True, - session_id_hex=session_id, + host_url=session_id, auth_provider=auth_provider, host_url="test-host.com", executor=executor, @@ -242,7 +242,7 @@ def test_client_lifecycle_flow(self): ): TelemetryClientFactory.initialize_telemetry_client( telemetry_enabled=True, - session_id_hex=session_id_hex, + host_url=session_id_hex, auth_provider=auth_provider, host_url="test-host.com", batch_size=TelemetryClientFactory.DEFAULT_BATCH_SIZE, @@ -267,7 +267,7 @@ def test_disabled_telemetry_creates_noop_client(self): TelemetryClientFactory.initialize_telemetry_client( telemetry_enabled=False, - session_id_hex=session_id_hex, + host_url=session_id_hex, auth_provider=None, host_url="test-host.com", batch_size=TelemetryClientFactory.DEFAULT_BATCH_SIZE, @@ -289,7 +289,7 @@ def test_factory_error_handling(self): ): TelemetryClientFactory.initialize_telemetry_client( telemetry_enabled=True, - session_id_hex=session_id, + host_url=session_id, auth_provider=AccessTokenAuthProvider("token"), host_url="test-host.com", batch_size=TelemetryClientFactory.DEFAULT_BATCH_SIZE, @@ -313,7 +313,7 @@ def test_factory_shutdown_flow(self): for session in [session1, session2]: TelemetryClientFactory.initialize_telemetry_client( telemetry_enabled=True, - session_id_hex=session, + host_url=session, auth_provider=AccessTokenAuthProvider("token"), host_url="test-host.com", batch_size=TelemetryClientFactory.DEFAULT_BATCH_SIZE, From ffc7b278a662542f56b4b1d1105184bb89a6b78d Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Mon, 1 Dec 2025 19:46:36 +0000 Subject: [PATCH 05/19] fix: Revert session_id_hex in tests for functions that still use it Only Error classes changed from session_id_hex to host_url. Other classes (TelemetryClient, ResultSetDownloadHandler, etc.) still use session_id_hex. Reverted: - test_telemetry.py: TelemetryClient and initialize_telemetry_client - test_downloader.py: ResultSetDownloadHandler - test_download_manager.py: ResultFileDownloadManager Kept as host_url: - test_client.py: Error class instantiation --- tests/unit/test_download_manager.py | 2 +- tests/unit/test_downloader.py | 14 +++++++------- tests/unit/test_telemetry.py | 10 +++++----- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/tests/unit/test_download_manager.py b/tests/unit/test_download_manager.py index 64a8e81a5..1c77226a9 100644 --- a/tests/unit/test_download_manager.py +++ b/tests/unit/test_download_manager.py @@ -20,7 +20,7 @@ def create_download_manager( max_download_threads, lz4_compressed, ssl_options=SSLOptions(), - host_url=Mock(), + session_id_hex=Mock(), statement_id=Mock(), chunk_id=0, http_client=mock_http_client, diff --git a/tests/unit/test_downloader.py b/tests/unit/test_downloader.py index a74dfb639..00b1b849a 100644 --- a/tests/unit/test_downloader.py +++ b/tests/unit/test_downloader.py @@ -57,7 +57,7 @@ def test_run_link_expired(self, mock_time): result_link, ssl_options=SSLOptions(), chunk_id=0, - host_url=Mock(), + session_id_hex=Mock(), statement_id=Mock(), http_client=mock_http_client, ) @@ -80,7 +80,7 @@ def test_run_link_past_expiry_buffer(self, mock_time): result_link, ssl_options=SSLOptions(), chunk_id=0, - host_url=Mock(), + session_id_hex=Mock(), statement_id=Mock(), http_client=mock_http_client, ) @@ -107,7 +107,7 @@ def test_run_get_response_not_ok(self, mock_time): result_link, ssl_options=SSLOptions(), chunk_id=0, - host_url=Mock(), + session_id_hex=Mock(), statement_id=Mock(), http_client=mock_http_client, ) @@ -137,7 +137,7 @@ def test_run_uncompressed_successful(self, mock_time): result_link, ssl_options=SSLOptions(), chunk_id=0, - host_url=Mock(), + session_id_hex=Mock(), statement_id=Mock(), http_client=mock_http_client, ) @@ -170,7 +170,7 @@ def test_run_compressed_successful(self, mock_time): result_link, ssl_options=SSLOptions(), chunk_id=0, - host_url=Mock(), + session_id_hex=Mock(), statement_id=Mock(), http_client=mock_http_client, ) @@ -194,7 +194,7 @@ def test_download_connection_error(self, mock_time): result_link, ssl_options=SSLOptions(), chunk_id=0, - host_url=Mock(), + session_id_hex=Mock(), statement_id=Mock(), http_client=mock_http_client, ) @@ -216,7 +216,7 @@ def test_download_timeout(self, mock_time): result_link, ssl_options=SSLOptions(), chunk_id=0, - host_url=Mock(), + session_id_hex=Mock(), statement_id=Mock(), http_client=mock_http_client, ) diff --git a/tests/unit/test_telemetry.py b/tests/unit/test_telemetry.py index 8123f6249..96a2f87d8 100644 --- a/tests/unit/test_telemetry.py +++ b/tests/unit/test_telemetry.py @@ -46,7 +46,7 @@ def mock_telemetry_client(): ): return TelemetryClient( telemetry_enabled=True, - host_url=session_id, + session_id_hex=session_id, auth_provider=auth_provider, host_url="test-host.com", executor=executor, @@ -242,7 +242,7 @@ def test_client_lifecycle_flow(self): ): TelemetryClientFactory.initialize_telemetry_client( telemetry_enabled=True, - host_url=session_id_hex, + session_id_hex=session_id_hex, auth_provider=auth_provider, host_url="test-host.com", batch_size=TelemetryClientFactory.DEFAULT_BATCH_SIZE, @@ -267,7 +267,7 @@ def test_disabled_telemetry_creates_noop_client(self): TelemetryClientFactory.initialize_telemetry_client( telemetry_enabled=False, - host_url=session_id_hex, + session_id_hex=session_id_hex, auth_provider=None, host_url="test-host.com", batch_size=TelemetryClientFactory.DEFAULT_BATCH_SIZE, @@ -289,7 +289,7 @@ def test_factory_error_handling(self): ): TelemetryClientFactory.initialize_telemetry_client( telemetry_enabled=True, - host_url=session_id, + session_id_hex=session_id, auth_provider=AccessTokenAuthProvider("token"), host_url="test-host.com", batch_size=TelemetryClientFactory.DEFAULT_BATCH_SIZE, @@ -313,7 +313,7 @@ def test_factory_shutdown_flow(self): for session in [session1, session2]: TelemetryClientFactory.initialize_telemetry_client( telemetry_enabled=True, - host_url=session, + session_id_hex=session, auth_provider=AccessTokenAuthProvider("token"), host_url="test-host.com", batch_size=TelemetryClientFactory.DEFAULT_BATCH_SIZE, From b11a461752bf1a4496b966c8bf38f9b1fbf5b659 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Mon, 1 Dec 2025 20:03:39 +0000 Subject: [PATCH 06/19] fix: Update all Error raises and test calls to use host_url Changes: 1. client.py: Changed all error raises from session_id_hex to host_url - Connection class: session_id_hex=self.get_session_id_hex() -> host_url=self.session.host - Cursor class: session_id_hex=self.connection.get_session_id_hex() -> host_url=self.connection.session.host 2. test_telemetry.py: Updated get_telemetry_client() and close() calls - get_telemetry_client(session_id) -> get_telemetry_client(host_url) - close(session_id) -> close(host_url=host_url) 3. test_telemetry_push_client.py: Changed logger.warning to logger.debug - Updated test assertion to match debug logging level These changes complete the migration from session-level to host-level telemetry client management. --- src/databricks/sql/client.py | 62 ++++++++++++------------ tests/unit/test_telemetry.py | 18 +++---- tests/unit/test_telemetry_push_client.py | 8 +-- 3 files changed, 44 insertions(+), 44 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 2471725dd..4fa3201d2 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -494,7 +494,7 @@ def cursor( if not self.open: raise InterfaceError( "Cannot create cursor from closed connection", - session_id_hex=self.get_session_id_hex(), + host_url=self.session.host, ) cursor = Cursor( @@ -546,7 +546,7 @@ def autocommit(self) -> bool: if not self.open: raise InterfaceError( "Cannot get autocommit on closed connection", - session_id_hex=self.get_session_id_hex(), + host_url=self.session.host, ) if self._fetch_autocommit_from_server: @@ -578,7 +578,7 @@ def autocommit(self, value: bool) -> None: if not self.open: raise InterfaceError( "Cannot set autocommit on closed connection", - session_id_hex=self.get_session_id_hex(), + host_url=self.session.host, ) # Create internal cursor for transaction control @@ -600,7 +600,7 @@ def autocommit(self, value: bool) -> None: "operation": "set_autocommit", "autocommit_value": value, }, - session_id_hex=self.get_session_id_hex(), + host_url=self.session.host, ) from e finally: if cursor: @@ -627,7 +627,7 @@ def _fetch_autocommit_state_from_server(self) -> bool: raise TransactionError( "No result returned from SET AUTOCOMMIT query", context={"operation": "fetch_autocommit"}, - session_id_hex=self.get_session_id_hex(), + host_url=self.session.host, ) # Parse value (first column should be "true" or "false") @@ -647,7 +647,7 @@ def _fetch_autocommit_state_from_server(self) -> bool: raise TransactionError( f"Failed to fetch autocommit state from server: {e.message}", context={**e.context, "operation": "fetch_autocommit"}, - session_id_hex=self.get_session_id_hex(), + host_url=self.session.host, ) from e finally: if cursor: @@ -680,7 +680,7 @@ def commit(self) -> None: if not self.open: raise InterfaceError( "Cannot commit on closed connection", - session_id_hex=self.get_session_id_hex(), + host_url=self.session.host, ) cursor = None @@ -692,7 +692,7 @@ def commit(self) -> None: raise TransactionError( f"Failed to commit transaction: {e.message}", context={**e.context, "operation": "commit"}, - session_id_hex=self.get_session_id_hex(), + host_url=self.session.host, ) from e finally: if cursor: @@ -725,13 +725,13 @@ def rollback(self) -> None: if self.ignore_transactions: raise NotSupportedError( "Transactions are not supported on Databricks", - session_id_hex=self.get_session_id_hex(), + host_url=self.session.host, ) if not self.open: raise InterfaceError( "Cannot rollback on closed connection", - session_id_hex=self.get_session_id_hex(), + host_url=self.session.host, ) cursor = None @@ -743,7 +743,7 @@ def rollback(self) -> None: raise TransactionError( f"Failed to rollback transaction: {e.message}", context={**e.context, "operation": "rollback"}, - session_id_hex=self.get_session_id_hex(), + host_url=self.session.host, ) from e finally: if cursor: @@ -767,7 +767,7 @@ def get_transaction_isolation(self) -> str: if not self.open: raise InterfaceError( "Cannot get transaction isolation on closed connection", - session_id_hex=self.get_session_id_hex(), + host_url=self.session.host, ) return TRANSACTION_ISOLATION_LEVEL_REPEATABLE_READ @@ -793,7 +793,7 @@ def set_transaction_isolation(self, level: str) -> None: if not self.open: raise InterfaceError( "Cannot set transaction isolation on closed connection", - session_id_hex=self.get_session_id_hex(), + host_url=self.session.host, ) # Normalize and validate isolation level @@ -805,7 +805,7 @@ def set_transaction_isolation(self, level: str) -> None: raise NotSupportedError( f"Setting transaction isolation level '{level}' is not supported. " f"Only {TRANSACTION_ISOLATION_LEVEL_REPEATABLE_READ} is supported.", - session_id_hex=self.get_session_id_hex(), + host_url=self.session.host, ) @@ -857,7 +857,7 @@ def __iter__(self): else: raise ProgrammingError( "There is no active result set", - session_id_hex=self.connection.get_session_id_hex(), + host_url=self.connection.session.host, ) def _determine_parameter_approach( @@ -997,7 +997,7 @@ def _check_not_closed(self): if not self.open: raise InterfaceError( "Attempting operation on closed cursor", - session_id_hex=self.connection.get_session_id_hex(), + host_url=self.connection.session.host, ) def _handle_staging_operation( @@ -1041,7 +1041,7 @@ def _handle_staging_operation( else: raise ProgrammingError( "You must provide at least one staging_allowed_local_path when initialising a connection to perform ingestion commands", - session_id_hex=self.connection.get_session_id_hex(), + host_url=self.connection.session.host, ) abs_staging_allowed_local_paths = [ @@ -1067,7 +1067,7 @@ def _handle_staging_operation( if not allow_operation: raise ProgrammingError( "Local file operations are restricted to paths within the configured staging_allowed_local_path", - session_id_hex=self.connection.get_session_id_hex(), + host_url=self.connection.session.host, ) handler_args = { @@ -1095,7 +1095,7 @@ def _handle_staging_operation( raise ProgrammingError( f"Operation {row.operation} is not supported. " + "Supported operations are GET, PUT, and REMOVE", - session_id_hex=self.connection.get_session_id_hex(), + host_url=self.connection.session.host, ) @log_latency(StatementType.SQL) @@ -1110,7 +1110,7 @@ def _handle_staging_put( if local_file is None: raise ProgrammingError( "Cannot perform PUT without specifying a local_file", - session_id_hex=self.connection.get_session_id_hex(), + host_url=self.connection.session.host, ) with open(local_file, "rb") as fh: @@ -1135,7 +1135,7 @@ def _handle_staging_http_response(self, r): error_text = r.data.decode() if r.data else "" raise OperationalError( f"Staging operation over HTTP was unsuccessful: {r.status}-{error_text}", - session_id_hex=self.connection.get_session_id_hex(), + host_url=self.connection.session.host, ) if r.status == ACCEPTED: @@ -1166,7 +1166,7 @@ def _handle_staging_put_stream( if not stream: raise ProgrammingError( "No input stream provided for streaming operation", - session_id_hex=self.connection.get_session_id_hex(), + host_url=self.connection.session.host, ) r = self.connection.http_client.request( @@ -1187,7 +1187,7 @@ def _handle_staging_get( if local_file is None: raise ProgrammingError( "Cannot perform GET without specifying a local_file", - session_id_hex=self.connection.get_session_id_hex(), + host_url=self.connection.session.host, ) r = self.connection.http_client.request( @@ -1201,7 +1201,7 @@ def _handle_staging_get( error_text = r.data.decode() if r.data else "" raise OperationalError( f"Staging operation over HTTP was unsuccessful: {r.status}-{error_text}", - session_id_hex=self.connection.get_session_id_hex(), + host_url=self.connection.session.host, ) with open(local_file, "wb") as fp: @@ -1222,7 +1222,7 @@ def _handle_staging_remove( error_text = r.data.decode() if r.data else "" raise OperationalError( f"Staging operation over HTTP was unsuccessful: {r.status}-{error_text}", - session_id_hex=self.connection.get_session_id_hex(), + host_url=self.connection.session.host, ) @log_latency(StatementType.QUERY) @@ -1413,7 +1413,7 @@ def get_async_execution_result(self): else: raise OperationalError( f"get_execution_result failed with Operation status {operation_state}", - session_id_hex=self.connection.get_session_id_hex(), + host_url=self.connection.session.host, ) def executemany(self, operation, seq_of_parameters): @@ -1541,7 +1541,7 @@ def fetchall(self) -> List[Row]: else: raise ProgrammingError( "There is no active result set", - session_id_hex=self.connection.get_session_id_hex(), + host_url=self.connection.session.host, ) def fetchone(self) -> Optional[Row]: @@ -1558,7 +1558,7 @@ def fetchone(self) -> Optional[Row]: else: raise ProgrammingError( "There is no active result set", - session_id_hex=self.connection.get_session_id_hex(), + host_url=self.connection.session.host, ) def fetchmany(self, size: int) -> List[Row]: @@ -1583,7 +1583,7 @@ def fetchmany(self, size: int) -> List[Row]: else: raise ProgrammingError( "There is no active result set", - session_id_hex=self.connection.get_session_id_hex(), + host_url=self.connection.session.host, ) def fetchall_arrow(self) -> "pyarrow.Table": @@ -1593,7 +1593,7 @@ def fetchall_arrow(self) -> "pyarrow.Table": else: raise ProgrammingError( "There is no active result set", - session_id_hex=self.connection.get_session_id_hex(), + host_url=self.connection.session.host, ) def fetchmany_arrow(self, size) -> "pyarrow.Table": @@ -1603,7 +1603,7 @@ def fetchmany_arrow(self, size) -> "pyarrow.Table": else: raise ProgrammingError( "There is no active result set", - session_id_hex=self.connection.get_session_id_hex(), + host_url=self.connection.session.host, ) def cancel(self) -> None: diff --git a/tests/unit/test_telemetry.py b/tests/unit/test_telemetry.py index 96a2f87d8..da6254671 100644 --- a/tests/unit/test_telemetry.py +++ b/tests/unit/test_telemetry.py @@ -249,13 +249,13 @@ def test_client_lifecycle_flow(self): client_context=client_context, ) - client = TelemetryClientFactory.get_telemetry_client(session_id_hex) + client = TelemetryClientFactory.get_telemetry_client("test-host.com") assert isinstance(client, TelemetryClient) assert client._session_id_hex == session_id_hex # Close client with patch.object(client, "close") as mock_close: - TelemetryClientFactory.close(session_id_hex) + TelemetryClientFactory.close(host_url="test-host.com") mock_close.assert_called_once() # Should get NoopTelemetryClient after close @@ -274,7 +274,7 @@ def test_disabled_telemetry_creates_noop_client(self): client_context=client_context, ) - client = TelemetryClientFactory.get_telemetry_client(session_id_hex) + client = TelemetryClientFactory.get_telemetry_client("test-host.com") assert isinstance(client, NoopTelemetryClient) def test_factory_error_handling(self): @@ -297,7 +297,7 @@ def test_factory_error_handling(self): ) # Should fall back to NoopTelemetryClient - client = TelemetryClientFactory.get_telemetry_client(session_id) + client = TelemetryClientFactory.get_telemetry_client("test-host.com") assert isinstance(client, NoopTelemetryClient) def test_factory_shutdown_flow(self): @@ -325,11 +325,11 @@ def test_factory_shutdown_flow(self): assert TelemetryClientFactory._executor is not None # Close first client - factory should stay initialized - TelemetryClientFactory.close(session1) + TelemetryClientFactory.close(host_url="test-host.com") assert TelemetryClientFactory._initialized is True # Close second client - factory should shut down - TelemetryClientFactory.close(session2) + TelemetryClientFactory.close(host_url="test-host.com") assert TelemetryClientFactory._initialized is False assert TelemetryClientFactory._executor is None @@ -410,7 +410,7 @@ def test_telemetry_enabled_when_flag_is_true(self, mock_http_request, MockSessio assert conn.telemetry_enabled is True mock_http_request.assert_called_once() - client = TelemetryClientFactory.get_telemetry_client("test-session-ff-true") + client = TelemetryClientFactory.get_telemetry_client("test") assert isinstance(client, TelemetryClient) @patch("databricks.sql.common.unified_http_client.UnifiedHttpClient.request") @@ -440,7 +440,7 @@ def test_telemetry_disabled_when_flag_is_false( assert conn.telemetry_enabled is False mock_http_request.assert_called_once() - client = TelemetryClientFactory.get_telemetry_client("test-session-ff-false") + client = TelemetryClientFactory.get_telemetry_client("test") assert isinstance(client, NoopTelemetryClient) @patch("databricks.sql.common.unified_http_client.UnifiedHttpClient.request") @@ -470,7 +470,7 @@ def test_telemetry_disabled_when_flag_request_fails( assert conn.telemetry_enabled is False mock_http_request.assert_called_once() - client = TelemetryClientFactory.get_telemetry_client("test-session-ff-fail") + client = TelemetryClientFactory.get_telemetry_client("test") assert isinstance(client, NoopTelemetryClient) diff --git a/tests/unit/test_telemetry_push_client.py b/tests/unit/test_telemetry_push_client.py index 0e9455e1f..6555f1d02 100644 --- a/tests/unit/test_telemetry_push_client.py +++ b/tests/unit/test_telemetry_push_client.py @@ -114,10 +114,10 @@ def test_rate_limit_error_logging(self): with pytest.raises(TelemetryRateLimitError): self.client.request(HttpMethod.POST, "https://test.com", {}) - mock_logger.warning.assert_called() - warning_args = mock_logger.warning.call_args[0] - assert "429" in str(warning_args) - assert "circuit breaker" in warning_args[0] + mock_logger.debug.assert_called() + debug_args = mock_logger.debug.call_args[0] + assert "429" in str(debug_args) + assert "circuit breaker" in debug_args[0] def test_other_error_logging(self): """Test that other errors are logged during wrapping/unwrapping.""" From 60e50de7928a5c365274410951aa5d965fb5cec8 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Mon, 1 Dec 2025 20:19:41 +0000 Subject: [PATCH 07/19] fix: Update thrift_backend.py to use host_url instead of session_id_hex Changes: 1. Added self._host attribute to store server_hostname 2. Updated all error raises to use host_url=self._host 3. Changed method signatures from session_id_hex to host_url: - _check_response_for_error - _hive_schema_to_arrow_schema - _col_to_description - _hive_schema_to_description - _check_direct_results_for_error 4. Updated all method calls to pass self._host instead of self._session_id_hex This completes the migration from session-level to host-level error reporting. --- src/databricks/sql/backend/thrift_backend.py | 65 ++++++++++---------- 1 file changed, 33 insertions(+), 32 deletions(-) diff --git a/src/databricks/sql/backend/thrift_backend.py b/src/databricks/sql/backend/thrift_backend.py index d2b10e718..31f41c973 100644 --- a/src/databricks/sql/backend/thrift_backend.py +++ b/src/databricks/sql/backend/thrift_backend.py @@ -163,6 +163,7 @@ def __init__( else: raise ValueError("No valid connection settings.") + self._host = server_hostname self._initialize_retry_args(kwargs) self._use_arrow_native_complex_types = kwargs.get( "_use_arrow_native_complex_types", True @@ -279,14 +280,14 @@ def _initialize_retry_args(self, kwargs): ) @staticmethod - def _check_response_for_error(response, session_id_hex=None): + def _check_response_for_error(response, host_url=None): if response.status and response.status.statusCode in [ ttypes.TStatusCode.ERROR_STATUS, ttypes.TStatusCode.INVALID_HANDLE_STATUS, ]: raise DatabaseError( response.status.errorMessage, - session_id_hex=session_id_hex, + host_url=host_url, ) @staticmethod @@ -340,7 +341,7 @@ def _handle_request_error(self, error_info, attempt, elapsed): network_request_error = RequestError( user_friendly_error_message, full_error_info_context, - self._session_id_hex, + self._host, error_info.error, ) logger.info(network_request_error.message_with_context()) @@ -517,7 +518,7 @@ def attempt_request(attempt): # log nothing here, presume that main request logging covers response = response_or_error_info ThriftDatabricksClient._check_response_for_error( - response, self._session_id_hex + response, self._host ) return response @@ -533,7 +534,7 @@ def _check_protocol_version(self, t_open_session_resp): "Error: expected server to use a protocol version >= " "SPARK_CLI_SERVICE_PROTOCOL_V2, " "instead got: {}".format(protocol_version), - session_id_hex=self._session_id_hex, + host_url=self._host, ) def _check_initial_namespace(self, catalog, schema, response): @@ -547,7 +548,7 @@ def _check_initial_namespace(self, catalog, schema, response): raise InvalidServerResponseError( "Setting initial namespace not supported by the DBR version, " "Please use a Databricks SQL endpoint or a cluster with DBR >= 9.0.", - session_id_hex=self._session_id_hex, + host_url=self._host, ) if catalog: @@ -555,7 +556,7 @@ def _check_initial_namespace(self, catalog, schema, response): raise InvalidServerResponseError( "Unexpected response from server: Trying to set initial catalog to {}, " + "but server does not support multiple catalogs.".format(catalog), # type: ignore - session_id_hex=self._session_id_hex, + host_url=self._host, ) def _check_session_configuration(self, session_configuration): @@ -570,7 +571,7 @@ def _check_session_configuration(self, session_configuration): TIMESTAMP_AS_STRING_CONFIG, session_configuration[TIMESTAMP_AS_STRING_CONFIG], ), - session_id_hex=self._session_id_hex, + host_url=self._host, ) def open_session(self, session_configuration, catalog, schema) -> SessionId: @@ -639,7 +640,7 @@ def _check_command_not_in_error_or_closed_state( and guid_to_hex_id(op_handle.operationId.guid), "diagnostic-info": get_operations_resp.diagnosticInfo, }, - session_id_hex=self._session_id_hex, + host_url=self._host, ) else: raise ServerOperationError( @@ -649,7 +650,7 @@ def _check_command_not_in_error_or_closed_state( and guid_to_hex_id(op_handle.operationId.guid), "diagnostic-info": None, }, - session_id_hex=self._session_id_hex, + host_url=self._host, ) elif get_operations_resp.operationState == ttypes.TOperationState.CLOSED_STATE: raise DatabaseError( @@ -660,7 +661,7 @@ def _check_command_not_in_error_or_closed_state( "operation-id": op_handle and guid_to_hex_id(op_handle.operationId.guid) }, - session_id_hex=self._session_id_hex, + host_url=self._host, ) def _poll_for_status(self, op_handle): @@ -683,7 +684,7 @@ def _create_arrow_table(self, t_row_set, lz4_compressed, schema_bytes, descripti else: raise OperationalError( "Unsupported TRowSet instance {}".format(t_row_set), - session_id_hex=self._session_id_hex, + host_url=self._host, ) return convert_decimals_in_arrow_table(arrow_table, description), num_rows @@ -692,7 +693,7 @@ def _get_metadata_resp(self, op_handle): return self.make_request(self._client.GetResultSetMetadata, req) @staticmethod - def _hive_schema_to_arrow_schema(t_table_schema, session_id_hex=None): + def _hive_schema_to_arrow_schema(t_table_schema, host_url=None): def map_type(t_type_entry): if t_type_entry.primitiveEntry: return { @@ -724,7 +725,7 @@ def map_type(t_type_entry): # even for complex types raise OperationalError( "Thrift protocol error: t_type_entry not a primitiveEntry", - session_id_hex=session_id_hex, + host_url=host_url, ) def convert_col(t_column_desc): @@ -735,7 +736,7 @@ def convert_col(t_column_desc): return pyarrow.schema([convert_col(col) for col in t_table_schema.columns]) @staticmethod - def _col_to_description(col, field=None, session_id_hex=None): + def _col_to_description(col, field=None, host_url=None): type_entry = col.typeDesc.types[0] if type_entry.primitiveEntry: @@ -745,7 +746,7 @@ def _col_to_description(col, field=None, session_id_hex=None): else: raise OperationalError( "Thrift protocol error: t_type_entry not a primitiveEntry", - session_id_hex=session_id_hex, + host_url=host_url, ) if type_entry.primitiveEntry.type == ttypes.TTypeId.DECIMAL_TYPE: @@ -759,7 +760,7 @@ def _col_to_description(col, field=None, session_id_hex=None): raise OperationalError( "Decimal type did not provide typeQualifier precision, scale in " "primitiveEntry {}".format(type_entry.primitiveEntry), - session_id_hex=session_id_hex, + host_url=host_url, ) else: precision, scale = None, None @@ -779,7 +780,7 @@ def _col_to_description(col, field=None, session_id_hex=None): @staticmethod def _hive_schema_to_description( - t_table_schema, schema_bytes=None, session_id_hex=None + t_table_schema, schema_bytes=None, host_url=None ): field_dict = {} if pyarrow and schema_bytes: @@ -795,7 +796,7 @@ def _hive_schema_to_description( ThriftDatabricksClient._col_to_description( col, field_dict.get(col.columnName) if field_dict else None, - session_id_hex, + host_url, ) for col in t_table_schema.columns ] @@ -818,7 +819,7 @@ def _results_message_to_execute_response(self, resp, operation_state): t_result_set_metadata_resp.resultFormat ] ), - session_id_hex=self._session_id_hex, + host_url=self._host, ) direct_results = resp.directResults has_been_closed_server_side = direct_results and direct_results.closeOperation @@ -833,7 +834,7 @@ def _results_message_to_execute_response(self, resp, operation_state): schema_bytes = ( t_result_set_metadata_resp.arrowSchema or self._hive_schema_to_arrow_schema( - t_result_set_metadata_resp.schema, self._session_id_hex + t_result_set_metadata_resp.schema, self._host ) .serialize() .to_pybytes() @@ -844,7 +845,7 @@ def _results_message_to_execute_response(self, resp, operation_state): description = self._hive_schema_to_description( t_result_set_metadata_resp.schema, schema_bytes, - self._session_id_hex, + self._host, ) lz4_compressed = t_result_set_metadata_resp.lz4Compressed @@ -895,7 +896,7 @@ def get_execution_result( schema_bytes = ( t_result_set_metadata_resp.arrowSchema or self._hive_schema_to_arrow_schema( - t_result_set_metadata_resp.schema, self._session_id_hex + t_result_set_metadata_resp.schema, self._host ) .serialize() .to_pybytes() @@ -906,7 +907,7 @@ def get_execution_result( description = self._hive_schema_to_description( t_result_set_metadata_resp.schema, schema_bytes, - self._session_id_hex, + self._host, ) lz4_compressed = t_result_set_metadata_resp.lz4Compressed @@ -971,27 +972,27 @@ def get_query_state(self, command_id: CommandId) -> CommandState: return state @staticmethod - def _check_direct_results_for_error(t_spark_direct_results, session_id_hex=None): + def _check_direct_results_for_error(t_spark_direct_results, host_url=None): if t_spark_direct_results: if t_spark_direct_results.operationStatus: ThriftDatabricksClient._check_response_for_error( t_spark_direct_results.operationStatus, - session_id_hex, + host_url, ) if t_spark_direct_results.resultSetMetadata: ThriftDatabricksClient._check_response_for_error( t_spark_direct_results.resultSetMetadata, - session_id_hex, + host_url, ) if t_spark_direct_results.resultSet: ThriftDatabricksClient._check_response_for_error( t_spark_direct_results.resultSet, - session_id_hex, + host_url, ) if t_spark_direct_results.closeOperation: ThriftDatabricksClient._check_response_for_error( t_spark_direct_results.closeOperation, - session_id_hex, + host_url, ) def execute_command( @@ -1260,7 +1261,7 @@ def _handle_execute_response(self, resp, cursor): raise ValueError(f"Invalid Thrift handle: {resp.operationHandle}") cursor.active_command_id = command_id - self._check_direct_results_for_error(resp.directResults, self._session_id_hex) + self._check_direct_results_for_error(resp.directResults, self._host) final_operation_state = self._wait_until_command_done( resp.operationHandle, @@ -1275,7 +1276,7 @@ def _handle_execute_response_async(self, resp, cursor): raise ValueError(f"Invalid Thrift handle: {resp.operationHandle}") cursor.active_command_id = command_id - self._check_direct_results_for_error(resp.directResults, self._session_id_hex) + self._check_direct_results_for_error(resp.directResults, self._host) def fetch_results( self, @@ -1313,7 +1314,7 @@ def fetch_results( "fetch_results failed due to inconsistency in the state between the client and the server. Expected results to start from {} but they instead start at {}, some result batches must have been skipped".format( expected_row_start_offset, resp.results.startRowOffset ), - session_id_hex=self._session_id_hex, + host_url=self._host, ) queue = ThriftResultSetQueueFactory.build_queue( From 01ea1e1f8b4dde2d7bdcdbd08c5fbf1a3fe365d4 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Mon, 1 Dec 2025 20:35:04 +0000 Subject: [PATCH 08/19] Fix Black formatting by adjusting fmt directive placement Moved the `# fmt: on` directive to the except block level instead of inside the if statement to resolve Black parsing confusion. --- src/databricks/sql/backend/thrift_backend.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/databricks/sql/backend/thrift_backend.py b/src/databricks/sql/backend/thrift_backend.py index 31f41c973..edee02bfa 100644 --- a/src/databricks/sql/backend/thrift_backend.py +++ b/src/databricks/sql/backend/thrift_backend.py @@ -462,13 +462,12 @@ def attempt_request(attempt): errno.ECONNRESET, # | 104 | 54 | errno.ETIMEDOUT, # | 110 | 60 | ] + # fmt: on gos_name = TCLIServiceClient.GetOperationStatus.__name__ # retry on timeout. Happens a lot in Azure and it is safe as data has not been sent to server yet if method.__name__ == gos_name or err.errno == errno.ETIMEDOUT: retry_delay = bound_retry_delay(attempt, self._retry_delay_default) - - # fmt: on log_string = f"{gos_name} failed with code {err.errno} and will attempt to retry" if err.errno in info_errs: logger.info(log_string) @@ -517,9 +516,7 @@ def attempt_request(attempt): if not isinstance(response_or_error_info, RequestErrorInfo): # log nothing here, presume that main request logging covers response = response_or_error_info - ThriftDatabricksClient._check_response_for_error( - response, self._host - ) + ThriftDatabricksClient._check_response_for_error(response, self._host) return response error_info = response_or_error_info @@ -779,9 +776,7 @@ def _col_to_description(col, field=None, host_url=None): return col.columnName, cleaned_type, None, None, precision, scale, None @staticmethod - def _hive_schema_to_description( - t_table_schema, schema_bytes=None, host_url=None - ): + def _hive_schema_to_description(t_table_schema, schema_bytes=None, host_url=None): field_dict = {} if pyarrow and schema_bytes: try: From 9026f379e963404b2d3945011752cf8f135d7320 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Mon, 1 Dec 2025 20:48:40 +0000 Subject: [PATCH 09/19] Fix telemetry feature flag tests to set mock session host The tests were failing because they called get_telemetry_client("test") but the mock session didn't have .host set, so the telemetry client was registered under a different key (likely None or MagicMock). This caused the factory to return NoopTelemetryClient instead of the expected client. Fixed by setting mock_session_instance.host = "test" in all three tests. --- tests/unit/test_telemetry.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/unit/test_telemetry.py b/tests/unit/test_telemetry.py index da6254671..99be81275 100644 --- a/tests/unit/test_telemetry.py +++ b/tests/unit/test_telemetry.py @@ -391,6 +391,7 @@ def test_telemetry_enabled_when_flag_is_true(self, mock_http_request, MockSessio self._mock_ff_response(mock_http_request, enabled=True) mock_session_instance = MockSession.return_value mock_session_instance.guid_hex = "test-session-ff-true" + mock_session_instance.host = "test" # Set host for telemetry client lookup mock_session_instance.auth_provider = AccessTokenAuthProvider("token") mock_session_instance.is_open = ( False # Connection starts closed for test cleanup @@ -421,6 +422,7 @@ def test_telemetry_disabled_when_flag_is_false( self._mock_ff_response(mock_http_request, enabled=False) mock_session_instance = MockSession.return_value mock_session_instance.guid_hex = "test-session-ff-false" + mock_session_instance.host = "test" # Set host for telemetry client lookup mock_session_instance.auth_provider = AccessTokenAuthProvider("token") mock_session_instance.is_open = ( False # Connection starts closed for test cleanup @@ -451,6 +453,7 @@ def test_telemetry_disabled_when_flag_request_fails( mock_http_request.side_effect = Exception("Network is down") mock_session_instance = MockSession.return_value mock_session_instance.guid_hex = "test-session-ff-fail" + mock_session_instance.host = "test" # Set host for telemetry client lookup mock_session_instance.auth_provider = AccessTokenAuthProvider("token") mock_session_instance.is_open = ( False # Connection starts closed for test cleanup From 86d7828191d2950be033c41d8f236845f0ab32af Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Tue, 2 Dec 2025 06:37:51 +0000 Subject: [PATCH 10/19] Add teardown_method to clear telemetry factory state between tests Without this cleanup, tests were sharing telemetry clients because they all used the same host key ("test"), causing test pollution. The first test would create an enabled client, and subsequent tests would reuse it even when they expected a disabled client. --- tests/unit/test_telemetry.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/unit/test_telemetry.py b/tests/unit/test_telemetry.py index 99be81275..949a107f2 100644 --- a/tests/unit/test_telemetry.py +++ b/tests/unit/test_telemetry.py @@ -367,6 +367,10 @@ def test_connection_failure_sends_correct_telemetry_payload( class TestTelemetryFeatureFlag: """Tests the interaction between the telemetry feature flag and connection parameters.""" + def teardown_method(self): + """Clean up telemetry factory state after each test to prevent test pollution.""" + TelemetryClientFactory._clients.clear() + def _mock_ff_response(self, mock_http_request, enabled: bool): """Helper method to mock feature flag response for unified HTTP client.""" mock_response = MagicMock() From 91b8382d09e7a95551c9fd77ff9be0883ce9cc92 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Tue, 2 Dec 2025 06:42:59 +0000 Subject: [PATCH 11/19] Clear feature flag context cache in teardown to fix test pollution The FeatureFlagsContextFactory caches feature flag contexts per session, causing tests to share the same feature flag state. This resulted in the first test creating a context with telemetry enabled, and subsequent tests incorrectly reusing that enabled state even when they expected disabled. --- tests/unit/test_telemetry.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/unit/test_telemetry.py b/tests/unit/test_telemetry.py index 949a107f2..c2a339c82 100644 --- a/tests/unit/test_telemetry.py +++ b/tests/unit/test_telemetry.py @@ -369,7 +369,10 @@ class TestTelemetryFeatureFlag: def teardown_method(self): """Clean up telemetry factory state after each test to prevent test pollution.""" + from databricks.sql.common.feature_flag import FeatureFlagsContextFactory + TelemetryClientFactory._clients.clear() + FeatureFlagsContextFactory._context_map.clear() def _mock_ff_response(self, mock_http_request, enabled: bool): """Helper method to mock feature flag response for unified HTTP client.""" From 74821f819b5e8eab77a8c86b2060823774840546 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Tue, 2 Dec 2025 07:57:17 +0000 Subject: [PATCH 12/19] fix: Access actual client from holder in flush worker The flush worker was calling _flush() on _TelemetryClientHolder objects instead of the actual TelemetryClient. Fixed by accessing holder.client before calling _flush(). Fixes AttributeError in e2e tests: '_TelemetryClientHolder' object has no attribute '_flush' --- src/databricks/sql/telemetry/telemetry_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index aaea9bb3d..ea12d80e8 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -504,8 +504,8 @@ def _flush_worker(cls): with cls._lock: clients_to_flush = list(cls._clients.values()) - for client in clients_to_flush: - client._flush() + for holder in clients_to_flush: + holder.client._flush() @classmethod def _stop_flush_thread(cls): From 0b3dd8243a1043a9dea119b66c928110e6d52318 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Tue, 2 Dec 2025 07:58:06 +0000 Subject: [PATCH 13/19] Clear telemetry client cache in e2e test teardown Added _clients.clear() to the teardown fixture to prevent telemetry clients from persisting across e2e tests, which was causing session ID pollution in test_concurrent_queries_sends_telemetry. --- tests/e2e/test_concurrent_telemetry.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/e2e/test_concurrent_telemetry.py b/tests/e2e/test_concurrent_telemetry.py index d2ac4227d..546a2b8b2 100644 --- a/tests/e2e/test_concurrent_telemetry.py +++ b/tests/e2e/test_concurrent_telemetry.py @@ -41,6 +41,7 @@ def telemetry_setup_teardown(self): TelemetryClientFactory._executor.shutdown(wait=True) TelemetryClientFactory._executor = None TelemetryClientFactory._stop_flush_thread() + TelemetryClientFactory._clients.clear() TelemetryClientFactory._initialized = False def test_concurrent_queries_sends_telemetry(self): From 462445873526a011ac68ef441ee424275b75eebd Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Tue, 2 Dec 2025 18:20:41 +0000 Subject: [PATCH 14/19] Pass session_id parameter to telemetry export methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With host-level telemetry batching, multiple connections share one TelemetryClient. Each client stores session_id_hex from the first connection that created it. This caused all subsequent connections' telemetry events to use the wrong session ID. Changes: - Modified telemetry export method signatures to accept optional session_id - Updated Connection.export_initial_telemetry_log() to pass session_id - Updated latency_logger.py export_latency_log() to pass session_id - Updated Error.__init__() to accept optional session_id_hex and pass it - Updated all error raises in Connection and Cursor to pass session_id_hex 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/databricks/sql/client.py | 32 +++++++++++++++++++ src/databricks/sql/exc.py | 14 ++++++-- .../sql/telemetry/latency_logger.py | 1 + .../sql/telemetry/telemetry_client.py | 26 ++++++++------- 4 files changed, 60 insertions(+), 13 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 4fa3201d2..c873700bc 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -391,6 +391,7 @@ def read(self) -> Optional[OAuthToken]: self._telemetry_client.export_initial_telemetry_log( driver_connection_params=driver_connection_params, user_agent=self.session.useragent_header, + session_id=self.get_session_id_hex(), ) def _set_use_inline_params_with_warning(self, value: Union[bool, str]): @@ -495,6 +496,7 @@ def cursor( raise InterfaceError( "Cannot create cursor from closed connection", host_url=self.session.host, + session_id_hex=self.get_session_id_hex(), ) cursor = Cursor( @@ -547,6 +549,7 @@ def autocommit(self) -> bool: raise InterfaceError( "Cannot get autocommit on closed connection", host_url=self.session.host, + session_id_hex=self.get_session_id_hex(), ) if self._fetch_autocommit_from_server: @@ -579,6 +582,7 @@ def autocommit(self, value: bool) -> None: raise InterfaceError( "Cannot set autocommit on closed connection", host_url=self.session.host, + session_id_hex=self.get_session_id_hex(), ) # Create internal cursor for transaction control @@ -601,6 +605,7 @@ def autocommit(self, value: bool) -> None: "autocommit_value": value, }, host_url=self.session.host, + session_id_hex=self.get_session_id_hex(), ) from e finally: if cursor: @@ -628,6 +633,7 @@ def _fetch_autocommit_state_from_server(self) -> bool: "No result returned from SET AUTOCOMMIT query", context={"operation": "fetch_autocommit"}, host_url=self.session.host, + session_id_hex=self.get_session_id_hex(), ) # Parse value (first column should be "true" or "false") @@ -648,6 +654,7 @@ def _fetch_autocommit_state_from_server(self) -> bool: f"Failed to fetch autocommit state from server: {e.message}", context={**e.context, "operation": "fetch_autocommit"}, host_url=self.session.host, + session_id_hex=self.get_session_id_hex(), ) from e finally: if cursor: @@ -681,6 +688,7 @@ def commit(self) -> None: raise InterfaceError( "Cannot commit on closed connection", host_url=self.session.host, + session_id_hex=self.get_session_id_hex(), ) cursor = None @@ -693,6 +701,7 @@ def commit(self) -> None: f"Failed to commit transaction: {e.message}", context={**e.context, "operation": "commit"}, host_url=self.session.host, + session_id_hex=self.get_session_id_hex(), ) from e finally: if cursor: @@ -726,12 +735,14 @@ def rollback(self) -> None: raise NotSupportedError( "Transactions are not supported on Databricks", host_url=self.session.host, + session_id_hex=self.get_session_id_hex(), ) if not self.open: raise InterfaceError( "Cannot rollback on closed connection", host_url=self.session.host, + session_id_hex=self.get_session_id_hex(), ) cursor = None @@ -744,6 +755,7 @@ def rollback(self) -> None: f"Failed to rollback transaction: {e.message}", context={**e.context, "operation": "rollback"}, host_url=self.session.host, + session_id_hex=self.get_session_id_hex(), ) from e finally: if cursor: @@ -768,6 +780,7 @@ def get_transaction_isolation(self) -> str: raise InterfaceError( "Cannot get transaction isolation on closed connection", host_url=self.session.host, + session_id_hex=self.get_session_id_hex(), ) return TRANSACTION_ISOLATION_LEVEL_REPEATABLE_READ @@ -794,6 +807,7 @@ def set_transaction_isolation(self, level: str) -> None: raise InterfaceError( "Cannot set transaction isolation on closed connection", host_url=self.session.host, + session_id_hex=self.get_session_id_hex(), ) # Normalize and validate isolation level @@ -806,6 +820,7 @@ def set_transaction_isolation(self, level: str) -> None: f"Setting transaction isolation level '{level}' is not supported. " f"Only {TRANSACTION_ISOLATION_LEVEL_REPEATABLE_READ} is supported.", host_url=self.session.host, + session_id_hex=self.get_session_id_hex(), ) @@ -858,6 +873,7 @@ def __iter__(self): raise ProgrammingError( "There is no active result set", host_url=self.connection.session.host, + session_id_hex=self.connection.get_session_id_hex(), ) def _determine_parameter_approach( @@ -998,6 +1014,7 @@ def _check_not_closed(self): raise InterfaceError( "Attempting operation on closed cursor", host_url=self.connection.session.host, + session_id_hex=self.connection.get_session_id_hex(), ) def _handle_staging_operation( @@ -1042,6 +1059,7 @@ def _handle_staging_operation( raise ProgrammingError( "You must provide at least one staging_allowed_local_path when initialising a connection to perform ingestion commands", host_url=self.connection.session.host, + session_id_hex=self.connection.get_session_id_hex(), ) abs_staging_allowed_local_paths = [ @@ -1068,6 +1086,7 @@ def _handle_staging_operation( raise ProgrammingError( "Local file operations are restricted to paths within the configured staging_allowed_local_path", host_url=self.connection.session.host, + session_id_hex=self.connection.get_session_id_hex(), ) handler_args = { @@ -1096,6 +1115,7 @@ def _handle_staging_operation( f"Operation {row.operation} is not supported. " + "Supported operations are GET, PUT, and REMOVE", host_url=self.connection.session.host, + session_id_hex=self.connection.get_session_id_hex(), ) @log_latency(StatementType.SQL) @@ -1111,6 +1131,7 @@ def _handle_staging_put( raise ProgrammingError( "Cannot perform PUT without specifying a local_file", host_url=self.connection.session.host, + session_id_hex=self.connection.get_session_id_hex(), ) with open(local_file, "rb") as fh: @@ -1136,6 +1157,7 @@ def _handle_staging_http_response(self, r): raise OperationalError( f"Staging operation over HTTP was unsuccessful: {r.status}-{error_text}", host_url=self.connection.session.host, + session_id_hex=self.connection.get_session_id_hex(), ) if r.status == ACCEPTED: @@ -1167,6 +1189,7 @@ def _handle_staging_put_stream( raise ProgrammingError( "No input stream provided for streaming operation", host_url=self.connection.session.host, + session_id_hex=self.connection.get_session_id_hex(), ) r = self.connection.http_client.request( @@ -1188,6 +1211,7 @@ def _handle_staging_get( raise ProgrammingError( "Cannot perform GET without specifying a local_file", host_url=self.connection.session.host, + session_id_hex=self.connection.get_session_id_hex(), ) r = self.connection.http_client.request( @@ -1202,6 +1226,7 @@ def _handle_staging_get( raise OperationalError( f"Staging operation over HTTP was unsuccessful: {r.status}-{error_text}", host_url=self.connection.session.host, + session_id_hex=self.connection.get_session_id_hex(), ) with open(local_file, "wb") as fp: @@ -1223,6 +1248,7 @@ def _handle_staging_remove( raise OperationalError( f"Staging operation over HTTP was unsuccessful: {r.status}-{error_text}", host_url=self.connection.session.host, + session_id_hex=self.connection.get_session_id_hex(), ) @log_latency(StatementType.QUERY) @@ -1414,6 +1440,7 @@ def get_async_execution_result(self): raise OperationalError( f"get_execution_result failed with Operation status {operation_state}", host_url=self.connection.session.host, + session_id_hex=self.connection.get_session_id_hex(), ) def executemany(self, operation, seq_of_parameters): @@ -1542,6 +1569,7 @@ def fetchall(self) -> List[Row]: raise ProgrammingError( "There is no active result set", host_url=self.connection.session.host, + session_id_hex=self.connection.get_session_id_hex(), ) def fetchone(self) -> Optional[Row]: @@ -1559,6 +1587,7 @@ def fetchone(self) -> Optional[Row]: raise ProgrammingError( "There is no active result set", host_url=self.connection.session.host, + session_id_hex=self.connection.get_session_id_hex(), ) def fetchmany(self, size: int) -> List[Row]: @@ -1584,6 +1613,7 @@ def fetchmany(self, size: int) -> List[Row]: raise ProgrammingError( "There is no active result set", host_url=self.connection.session.host, + session_id_hex=self.connection.get_session_id_hex(), ) def fetchall_arrow(self) -> "pyarrow.Table": @@ -1594,6 +1624,7 @@ def fetchall_arrow(self) -> "pyarrow.Table": raise ProgrammingError( "There is no active result set", host_url=self.connection.session.host, + session_id_hex=self.connection.get_session_id_hex(), ) def fetchmany_arrow(self, size) -> "pyarrow.Table": @@ -1604,6 +1635,7 @@ def fetchmany_arrow(self, size) -> "pyarrow.Table": raise ProgrammingError( "There is no active result set", host_url=self.connection.session.host, + session_id_hex=self.connection.get_session_id_hex(), ) def cancel(self) -> None: diff --git a/src/databricks/sql/exc.py b/src/databricks/sql/exc.py index e73da2981..df6b72abf 100644 --- a/src/databricks/sql/exc.py +++ b/src/databricks/sql/exc.py @@ -11,7 +11,15 @@ class Error(Exception): `context`: Optional extra context about the error. MUST be JSON serializable """ - def __init__(self, message=None, context=None, host_url=None, *args, **kwargs): + def __init__( + self, + message=None, + context=None, + host_url=None, + session_id_hex=None, + *args, + **kwargs, + ): super().__init__(message, *args, **kwargs) self.message = message self.context = context or {} @@ -23,7 +31,9 @@ def __init__(self, message=None, context=None, host_url=None, *args, **kwargs): telemetry_client = TelemetryClientFactory.get_telemetry_client( host_url=host_url ) - telemetry_client.export_failure_log(error_name, self.message) + telemetry_client.export_failure_log( + error_name, self.message, session_id=session_id_hex + ) def __str__(self): return self.message diff --git a/src/databricks/sql/telemetry/latency_logger.py b/src/databricks/sql/telemetry/latency_logger.py index a0322390e..2445c25c2 100644 --- a/src/databricks/sql/telemetry/latency_logger.py +++ b/src/databricks/sql/telemetry/latency_logger.py @@ -212,6 +212,7 @@ def wrapper(self, *args, **kwargs): latency_ms=duration_ms, sql_execution_event=sql_exec_event, sql_statement_id=telemetry_data.get("statement_id"), + session_id=session_id_hex, ) return wrapper diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index ea12d80e8..0c3d0837b 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -147,13 +147,13 @@ def __new__(cls): cls._instance = super(NoopTelemetryClient, cls).__new__(cls) return cls._instance - def export_initial_telemetry_log(self, driver_connection_params, user_agent): + def export_initial_telemetry_log(self, driver_connection_params, user_agent, session_id=None): pass - def export_failure_log(self, error_name, error_message): + def export_failure_log(self, error_name, error_message, session_id=None): pass - def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id): + def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id, session_id=None): pass def close(self): @@ -352,19 +352,22 @@ def _telemetry_request_callback(self, future, sent_count: int): except Exception as e: logger.debug("Telemetry request failed with exception: %s", e) - def _export_telemetry_log(self, **telemetry_event_kwargs): + def _export_telemetry_log(self, session_id=None, **telemetry_event_kwargs): """ Common helper method for exporting telemetry logs. Args: + session_id: Optional session ID for this event. If not provided, uses the client's session ID. **telemetry_event_kwargs: Keyword arguments to pass to TelemetryEvent constructor """ - logger.debug("Exporting telemetry log for connection %s", self._session_id_hex) + # Use provided session_id or fall back to client's session_id + actual_session_id = session_id or self._session_id_hex + logger.debug("Exporting telemetry log for connection %s", actual_session_id) try: # Set common fields for all telemetry events event_kwargs = { - "session_id": self._session_id_hex, + "session_id": actual_session_id, "system_configuration": TelemetryHelper.get_driver_system_configuration(), "driver_connection_params": self._driver_connection_params, } @@ -387,17 +390,18 @@ def _export_telemetry_log(self, **telemetry_event_kwargs): except Exception as e: logger.debug("Failed to export telemetry log: %s", e) - def export_initial_telemetry_log(self, driver_connection_params, user_agent): + def export_initial_telemetry_log(self, driver_connection_params, user_agent, session_id=None): self._driver_connection_params = driver_connection_params self._user_agent = user_agent - self._export_telemetry_log() + self._export_telemetry_log(session_id=session_id) - def export_failure_log(self, error_name, error_message): + def export_failure_log(self, error_name, error_message, session_id=None): error_info = DriverErrorInfo(error_name=error_name, stack_trace=error_message) - self._export_telemetry_log(error_info=error_info) + self._export_telemetry_log(session_id=session_id, error_info=error_info) - def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id): + def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id, session_id=None): self._export_telemetry_log( + session_id=session_id, sql_statement_id=sql_statement_id, sql_operation=sql_execution_event, operation_latency_ms=latency_ms, From 8cb66ec2b04e58b46edfc0b555525975356d7ca0 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Tue, 2 Dec 2025 18:26:27 +0000 Subject: [PATCH 15/19] Fix Black formatting in telemetry_client.py --- src/databricks/sql/telemetry/telemetry_client.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index 0c3d0837b..77d1a2f9c 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -147,13 +147,17 @@ def __new__(cls): cls._instance = super(NoopTelemetryClient, cls).__new__(cls) return cls._instance - def export_initial_telemetry_log(self, driver_connection_params, user_agent, session_id=None): + def export_initial_telemetry_log( + self, driver_connection_params, user_agent, session_id=None + ): pass def export_failure_log(self, error_name, error_message, session_id=None): pass - def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id, session_id=None): + def export_latency_log( + self, latency_ms, sql_execution_event, sql_statement_id, session_id=None + ): pass def close(self): @@ -390,7 +394,9 @@ def _export_telemetry_log(self, session_id=None, **telemetry_event_kwargs): except Exception as e: logger.debug("Failed to export telemetry log: %s", e) - def export_initial_telemetry_log(self, driver_connection_params, user_agent, session_id=None): + def export_initial_telemetry_log( + self, driver_connection_params, user_agent, session_id=None + ): self._driver_connection_params = driver_connection_params self._user_agent = user_agent self._export_telemetry_log(session_id=session_id) @@ -399,7 +405,9 @@ def export_failure_log(self, error_name, error_message, session_id=None): error_info = DriverErrorInfo(error_name=error_name, stack_trace=error_message) self._export_telemetry_log(session_id=session_id, error_info=error_info) - def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id, session_id=None): + def export_latency_log( + self, latency_ms, sql_execution_event, sql_statement_id, session_id=None + ): self._export_telemetry_log( session_id=session_id, sql_statement_id=sql_statement_id, From 69789eedc6c15262698556d85bce38d9833df10b Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Tue, 2 Dec 2025 18:39:49 +0000 Subject: [PATCH 16/19] Use 'test-host' instead of 'test' for mock host in telemetry tests --- tests/unit/test_telemetry.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/test_telemetry.py b/tests/unit/test_telemetry.py index c2a339c82..35b271750 100644 --- a/tests/unit/test_telemetry.py +++ b/tests/unit/test_telemetry.py @@ -398,7 +398,7 @@ def test_telemetry_enabled_when_flag_is_true(self, mock_http_request, MockSessio self._mock_ff_response(mock_http_request, enabled=True) mock_session_instance = MockSession.return_value mock_session_instance.guid_hex = "test-session-ff-true" - mock_session_instance.host = "test" # Set host for telemetry client lookup + mock_session_instance.host = "test-host" # Set host for telemetry client lookup mock_session_instance.auth_provider = AccessTokenAuthProvider("token") mock_session_instance.is_open = ( False # Connection starts closed for test cleanup @@ -429,7 +429,7 @@ def test_telemetry_disabled_when_flag_is_false( self._mock_ff_response(mock_http_request, enabled=False) mock_session_instance = MockSession.return_value mock_session_instance.guid_hex = "test-session-ff-false" - mock_session_instance.host = "test" # Set host for telemetry client lookup + mock_session_instance.host = "test-host" # Set host for telemetry client lookup mock_session_instance.auth_provider = AccessTokenAuthProvider("token") mock_session_instance.is_open = ( False # Connection starts closed for test cleanup @@ -460,7 +460,7 @@ def test_telemetry_disabled_when_flag_request_fails( mock_http_request.side_effect = Exception("Network is down") mock_session_instance = MockSession.return_value mock_session_instance.guid_hex = "test-session-ff-fail" - mock_session_instance.host = "test" # Set host for telemetry client lookup + mock_session_instance.host = "test-host" # Set host for telemetry client lookup mock_session_instance.auth_provider = AccessTokenAuthProvider("token") mock_session_instance.is_open = ( False # Connection starts closed for test cleanup From b0aa88906541160e08b18f4f8a8a03bbaa2a959d Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Tue, 2 Dec 2025 18:42:53 +0000 Subject: [PATCH 17/19] Replace test-session-id with test-host in test_client.py --- tests/unit/test_client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 6b872b0a7..8f8a97eae 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -714,7 +714,7 @@ def test_autocommit_setter_wraps_database_error(self, mock_session_class): server_error = DatabaseError( "AUTOCOMMIT_SET_DURING_ACTIVE_TRANSACTION", context={"sql_state": "25000"}, - host_url="test-session-id", + host_url="test-host", ) mock_cursor.execute.side_effect = server_error @@ -737,7 +737,7 @@ def test_autocommit_setter_preserves_exception_chain(self, mock_session_class): mock_cursor = Mock() original_error = DatabaseError( - "Original error", host_url="test-session-id" + "Original error", host_url="test-host" ) mock_cursor.execute.side_effect = original_error @@ -772,7 +772,7 @@ def test_commit_wraps_database_error(self, mock_session_class): server_error = DatabaseError( "MULTI_STATEMENT_TRANSACTION_NO_ACTIVE_TRANSACTION", context={"sql_state": "25000"}, - host_url="test-session-id", + host_url="test-host", ) mock_cursor.execute.side_effect = server_error @@ -822,7 +822,7 @@ def test_rollback_wraps_database_error(self, mock_session_class): server_error = DatabaseError( "Unexpected rollback error", context={"sql_state": "HY000"}, - host_url="test-session-id", + host_url="test-host", ) mock_cursor.execute.side_effect = server_error From c8cfc23c3ebb1626db51d2458f18c3815780faf5 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Tue, 2 Dec 2025 18:47:33 +0000 Subject: [PATCH 18/19] Fix telemetry client lookup to use test-host in tests --- tests/unit/test_telemetry.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/test_telemetry.py b/tests/unit/test_telemetry.py index 35b271750..e9fa16649 100644 --- a/tests/unit/test_telemetry.py +++ b/tests/unit/test_telemetry.py @@ -418,7 +418,7 @@ def test_telemetry_enabled_when_flag_is_true(self, mock_http_request, MockSessio assert conn.telemetry_enabled is True mock_http_request.assert_called_once() - client = TelemetryClientFactory.get_telemetry_client("test") + client = TelemetryClientFactory.get_telemetry_client("test-host") assert isinstance(client, TelemetryClient) @patch("databricks.sql.common.unified_http_client.UnifiedHttpClient.request") @@ -449,7 +449,7 @@ def test_telemetry_disabled_when_flag_is_false( assert conn.telemetry_enabled is False mock_http_request.assert_called_once() - client = TelemetryClientFactory.get_telemetry_client("test") + client = TelemetryClientFactory.get_telemetry_client("test-host") assert isinstance(client, NoopTelemetryClient) @patch("databricks.sql.common.unified_http_client.UnifiedHttpClient.request") @@ -480,7 +480,7 @@ def test_telemetry_disabled_when_flag_request_fails( assert conn.telemetry_enabled is False mock_http_request.assert_called_once() - client = TelemetryClientFactory.get_telemetry_client("test") + client = TelemetryClientFactory.get_telemetry_client("test-host") assert isinstance(client, NoopTelemetryClient) From 962def5ff4e04981afe993f05bbda009681268a3 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Tue, 2 Dec 2025 19:45:18 +0000 Subject: [PATCH 19/19] Make session_id_hex keyword-only parameter in Error.__init__ --- src/databricks/sql/exc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/sql/exc.py b/src/databricks/sql/exc.py index df6b72abf..f4770f3c4 100644 --- a/src/databricks/sql/exc.py +++ b/src/databricks/sql/exc.py @@ -16,8 +16,8 @@ def __init__( message=None, context=None, host_url=None, - session_id_hex=None, *args, + session_id_hex=None, **kwargs, ): super().__init__(message, *args, **kwargs)