Skip to content

Commit 8d5e155

Browse files
authored
perf: Optimize telemetry latency logging to reduce overhead (databricks#715)
perf: Optimize telemetry latency logging to reduce overhead Optimizations implemented: 1. Eliminated extractor pattern - replaced wrapper classes with direct attribute access functions, removing object creation overhead 2. Added feature flag early exit - checks cached telemetry_enabled flag to skip heavy work when telemetry is disabled 3. Simplified code structure with early returns for better readability Signed-off-by: Samikshya Chand <samikshya.chand@databricks.com>
1 parent 73580fe commit 8d5e155

File tree

4 files changed

+264
-165
lines changed

4 files changed

+264
-165
lines changed

src/databricks/sql/common/feature_flag.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,9 @@ def get_instance(cls, connection: "Connection") -> FeatureFlagsContext:
165165
cls._initialize()
166166
assert cls._executor is not None
167167

168-
# Use the unique session ID as the key
169-
key = connection.get_session_id_hex()
168+
# Cache at HOST level - share feature flags across connections to same host
169+
# Feature flags are per-host, not per-session
170+
key = connection.session.host
170171
if key not in cls._context_map:
171172
cls._context_map[key] = FeatureFlagsContext(
172173
connection, cls._executor, connection.session.http_client
@@ -177,7 +178,8 @@ def get_instance(cls, connection: "Connection") -> FeatureFlagsContext:
177178
def remove_instance(cls, connection: "Connection"):
178179
"""Removes the context for a given connection and shuts down the executor if no clients remain."""
179180
with cls._lock:
180-
key = connection.get_session_id_hex()
181+
# Use host as key to match get_instance
182+
key = connection.session.host
181183
if key in cls._context_map:
182184
cls._context_map.pop(key, None)
183185

src/databricks/sql/telemetry/latency_logger.py

Lines changed: 146 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import time
22
import functools
3-
from typing import Optional
3+
from typing import Optional, Dict, Any
44
import logging
55
from databricks.sql.telemetry.telemetry_client import TelemetryClientFactory
66
from databricks.sql.telemetry.models.event import (
@@ -11,127 +11,141 @@
1111
logger = logging.getLogger(__name__)
1212

1313

14-
class TelemetryExtractor:
14+
def _extract_cursor_data(cursor) -> Dict[str, Any]:
1515
"""
16-
Base class for extracting telemetry information from various object types.
16+
Extract telemetry data directly from a Cursor object.
1717
18-
This class serves as a proxy that delegates attribute access to the wrapped object
19-
while providing a common interface for extracting telemetry-related data.
20-
"""
21-
22-
def __init__(self, obj):
23-
self._obj = obj
24-
25-
def __getattr__(self, name):
26-
return getattr(self._obj, name)
27-
28-
def get_session_id_hex(self):
29-
pass
30-
31-
def get_statement_id(self):
32-
pass
33-
34-
def get_is_compressed(self):
35-
pass
36-
37-
def get_execution_result_format(self):
38-
pass
39-
40-
def get_retry_count(self):
41-
pass
42-
43-
def get_chunk_id(self):
44-
pass
18+
OPTIMIZATION: Uses direct attribute access instead of wrapper objects.
19+
This eliminates object creation overhead and method call indirection.
4520
21+
Args:
22+
cursor: The Cursor object to extract data from
4623
47-
class CursorExtractor(TelemetryExtractor):
24+
Returns:
25+
Dict with telemetry data (values may be None if extraction fails)
4826
"""
49-
Telemetry extractor specialized for Cursor objects.
50-
51-
Extracts telemetry information from database cursor objects, including
52-
statement IDs, session information, compression settings, and result formats.
27+
data = {}
28+
29+
# Extract statement_id (query_id) - direct attribute access
30+
try:
31+
data["statement_id"] = cursor.query_id
32+
except (AttributeError, Exception):
33+
data["statement_id"] = None
34+
35+
# Extract session_id_hex - direct method call
36+
try:
37+
data["session_id_hex"] = cursor.connection.get_session_id_hex()
38+
except (AttributeError, Exception):
39+
data["session_id_hex"] = None
40+
41+
# Extract is_compressed - direct attribute access
42+
try:
43+
data["is_compressed"] = cursor.connection.lz4_compression
44+
except (AttributeError, Exception):
45+
data["is_compressed"] = False
46+
47+
# Extract execution_result_format - inline logic
48+
try:
49+
if cursor.active_result_set is None:
50+
data["execution_result"] = ExecutionResultFormat.FORMAT_UNSPECIFIED
51+
else:
52+
from databricks.sql.utils import ColumnQueue, CloudFetchQueue, ArrowQueue
53+
54+
results = cursor.active_result_set.results
55+
if isinstance(results, ColumnQueue):
56+
data["execution_result"] = ExecutionResultFormat.COLUMNAR_INLINE
57+
elif isinstance(results, CloudFetchQueue):
58+
data["execution_result"] = ExecutionResultFormat.EXTERNAL_LINKS
59+
elif isinstance(results, ArrowQueue):
60+
data["execution_result"] = ExecutionResultFormat.INLINE_ARROW
61+
else:
62+
data["execution_result"] = ExecutionResultFormat.FORMAT_UNSPECIFIED
63+
except (AttributeError, Exception):
64+
data["execution_result"] = ExecutionResultFormat.FORMAT_UNSPECIFIED
65+
66+
# Extract retry_count - direct attribute access
67+
try:
68+
if hasattr(cursor.backend, "retry_policy") and cursor.backend.retry_policy:
69+
data["retry_count"] = len(cursor.backend.retry_policy.history)
70+
else:
71+
data["retry_count"] = 0
72+
except (AttributeError, Exception):
73+
data["retry_count"] = 0
74+
75+
# chunk_id is always None for Cursor
76+
data["chunk_id"] = None
77+
78+
return data
79+
80+
81+
def _extract_result_set_handler_data(handler) -> Dict[str, Any]:
5382
"""
83+
Extract telemetry data directly from a ResultSetDownloadHandler object.
5484
55-
def get_statement_id(self) -> Optional[str]:
56-
return self.query_id
57-
58-
def get_session_id_hex(self) -> Optional[str]:
59-
return self.connection.get_session_id_hex()
60-
61-
def get_is_compressed(self) -> bool:
62-
return self.connection.lz4_compression
63-
64-
def get_execution_result_format(self) -> ExecutionResultFormat:
65-
if self.active_result_set is None:
66-
return ExecutionResultFormat.FORMAT_UNSPECIFIED
67-
68-
from databricks.sql.utils import ColumnQueue, CloudFetchQueue, ArrowQueue
69-
70-
if isinstance(self.active_result_set.results, ColumnQueue):
71-
return ExecutionResultFormat.COLUMNAR_INLINE
72-
elif isinstance(self.active_result_set.results, CloudFetchQueue):
73-
return ExecutionResultFormat.EXTERNAL_LINKS
74-
elif isinstance(self.active_result_set.results, ArrowQueue):
75-
return ExecutionResultFormat.INLINE_ARROW
76-
return ExecutionResultFormat.FORMAT_UNSPECIFIED
77-
78-
def get_retry_count(self) -> int:
79-
if hasattr(self.backend, "retry_policy") and self.backend.retry_policy:
80-
return len(self.backend.retry_policy.history)
81-
return 0
82-
83-
def get_chunk_id(self):
84-
return None
85+
OPTIMIZATION: Uses direct attribute access instead of wrapper objects.
8586
87+
Args:
88+
handler: The ResultSetDownloadHandler object to extract data from
8689
87-
class ResultSetDownloadHandlerExtractor(TelemetryExtractor):
88-
"""
89-
Telemetry extractor specialized for ResultSetDownloadHandler objects.
90+
Returns:
91+
Dict with telemetry data (values may be None if extraction fails)
9092
"""
93+
data = {}
9194

92-
def get_session_id_hex(self) -> Optional[str]:
93-
return self._obj.session_id_hex
95+
# Extract session_id_hex - direct attribute access
96+
try:
97+
data["session_id_hex"] = handler.session_id_hex
98+
except (AttributeError, Exception):
99+
data["session_id_hex"] = None
94100

95-
def get_statement_id(self) -> Optional[str]:
96-
return self._obj.statement_id
101+
# Extract statement_id - direct attribute access
102+
try:
103+
data["statement_id"] = handler.statement_id
104+
except (AttributeError, Exception):
105+
data["statement_id"] = None
97106

98-
def get_is_compressed(self) -> bool:
99-
return self._obj.settings.is_lz4_compressed
107+
# Extract is_compressed - direct attribute access
108+
try:
109+
data["is_compressed"] = handler.settings.is_lz4_compressed
110+
except (AttributeError, Exception):
111+
data["is_compressed"] = False
100112

101-
def get_execution_result_format(self) -> ExecutionResultFormat:
102-
return ExecutionResultFormat.EXTERNAL_LINKS
113+
# execution_result is always EXTERNAL_LINKS for result set handlers
114+
data["execution_result"] = ExecutionResultFormat.EXTERNAL_LINKS
103115

104-
def get_retry_count(self) -> Optional[int]:
105-
# standard requests and urllib3 libraries don't expose retry count
106-
return None
116+
# retry_count is not available for result set handlers
117+
data["retry_count"] = None
118+
119+
# Extract chunk_id - direct attribute access
120+
try:
121+
data["chunk_id"] = handler.chunk_id
122+
except (AttributeError, Exception):
123+
data["chunk_id"] = None
107124

108-
def get_chunk_id(self) -> Optional[int]:
109-
return self._obj.chunk_id
125+
return data
110126

111127

112-
def get_extractor(obj):
128+
def _extract_telemetry_data(obj) -> Optional[Dict[str, Any]]:
113129
"""
114-
Factory function to create the appropriate telemetry extractor for an object.
130+
Extract telemetry data from an object based on its type.
115131
116-
Determines the object type and returns the corresponding specialized extractor
117-
that can extract telemetry information from that object type.
132+
OPTIMIZATION: Returns a simple dict instead of creating wrapper objects.
133+
This dict will be used to create the SqlExecutionEvent in the background thread.
118134
119135
Args:
120-
obj: The object to create an extractor for. Can be a Cursor,
121-
ResultSetDownloadHandler, or any other object.
136+
obj: The object to extract data from (Cursor, ResultSetDownloadHandler, etc.)
122137
123138
Returns:
124-
TelemetryExtractor: A specialized extractor instance:
125-
- CursorExtractor for Cursor objects
126-
- ResultSetDownloadHandlerExtractor for ResultSetDownloadHandler objects
127-
- None for all other objects
139+
Dict with telemetry data, or None if object type is not supported
128140
"""
129-
if obj.__class__.__name__ == "Cursor":
130-
return CursorExtractor(obj)
131-
elif obj.__class__.__name__ == "ResultSetDownloadHandler":
132-
return ResultSetDownloadHandlerExtractor(obj)
141+
obj_type = obj.__class__.__name__
142+
143+
if obj_type == "Cursor":
144+
return _extract_cursor_data(obj)
145+
elif obj_type == "ResultSetDownloadHandler":
146+
return _extract_result_set_handler_data(obj)
133147
else:
134-
logger.debug("No extractor found for %s", obj.__class__.__name__)
148+
logger.debug("No telemetry extraction available for %s", obj_type)
135149
return None
136150

137151

@@ -143,12 +157,6 @@ def log_latency(statement_type: StatementType = StatementType.NONE):
143157
data about the operation, including latency, statement information, and
144158
execution context.
145159
146-
The decorator automatically:
147-
- Measures execution time using high-precision performance counters
148-
- Extracts telemetry information from the method's object (self)
149-
- Creates a SqlExecutionEvent with execution details
150-
- Sends the telemetry data asynchronously via TelemetryClient
151-
152160
Args:
153161
statement_type (StatementType): The type of SQL statement being executed.
154162
@@ -162,54 +170,49 @@ def execute(self, query):
162170
function: A decorator that wraps methods to add latency logging.
163171
164172
Note:
165-
The wrapped method's object (self) must be compatible with the
166-
telemetry extractor system (e.g., Cursor or ResultSet objects).
173+
The wrapped method's object (self) must be a Cursor or
174+
ResultSetDownloadHandler for telemetry data extraction.
167175
"""
168176

169177
def decorator(func):
170178
@functools.wraps(func)
171179
def wrapper(self, *args, **kwargs):
172-
start_time = time.perf_counter()
173-
result = None
180+
start_time = time.monotonic()
174181
try:
175-
result = func(self, *args, **kwargs)
176-
return result
182+
return func(self, *args, **kwargs)
177183
finally:
178-
179-
def _safe_call(func_to_call):
180-
"""Calls a function and returns a default value on any exception."""
181-
try:
182-
return func_to_call()
183-
except Exception:
184-
return None
185-
186-
end_time = time.perf_counter()
187-
duration_ms = int((end_time - start_time) * 1000)
188-
189-
extractor = get_extractor(self)
190-
191-
if extractor is not None:
192-
session_id_hex = _safe_call(extractor.get_session_id_hex)
193-
statement_id = _safe_call(extractor.get_statement_id)
194-
195-
sql_exec_event = SqlExecutionEvent(
196-
statement_type=statement_type,
197-
is_compressed=_safe_call(extractor.get_is_compressed),
198-
execution_result=_safe_call(
199-
extractor.get_execution_result_format
200-
),
201-
retry_count=_safe_call(extractor.get_retry_count),
202-
chunk_id=_safe_call(extractor.get_chunk_id),
203-
)
204-
205-
telemetry_client = TelemetryClientFactory.get_telemetry_client(
206-
session_id_hex
207-
)
208-
telemetry_client.export_latency_log(
209-
latency_ms=duration_ms,
210-
sql_execution_event=sql_exec_event,
211-
sql_statement_id=statement_id,
212-
)
184+
duration_ms = int((time.monotonic() - start_time) * 1000)
185+
186+
# Always log for debugging
187+
logger.debug("%s completed in %dms", func.__name__, duration_ms)
188+
189+
# Fast check: use cached telemetry_enabled flag from connection
190+
# Avoids dictionary lookup + instance check on every operation
191+
connection = getattr(self, "connection", None)
192+
if connection and getattr(connection, "telemetry_enabled", False):
193+
session_id_hex = connection.get_session_id_hex()
194+
if session_id_hex:
195+
# Telemetry enabled - extract and send
196+
telemetry_data = _extract_telemetry_data(self)
197+
if telemetry_data:
198+
sql_exec_event = SqlExecutionEvent(
199+
statement_type=statement_type,
200+
is_compressed=telemetry_data.get("is_compressed"),
201+
execution_result=telemetry_data.get("execution_result"),
202+
retry_count=telemetry_data.get("retry_count"),
203+
chunk_id=telemetry_data.get("chunk_id"),
204+
)
205+
206+
telemetry_client = (
207+
TelemetryClientFactory.get_telemetry_client(
208+
session_id_hex
209+
)
210+
)
211+
telemetry_client.export_latency_log(
212+
latency_ms=duration_ms,
213+
sql_execution_event=sql_exec_event,
214+
sql_statement_id=telemetry_data.get("statement_id"),
215+
)
213216

214217
return wrapper
215218

0 commit comments

Comments
 (0)