Skip to content

Commit 0dfe0f4

Browse files
authored
Add functionality for export of latency logs via telemetry (#608)
* added functionality for export of failure logs Signed-off-by: Sai Shree Pradhan <[email protected]> * changed logger.error to logger.debug in exc.py Signed-off-by: Sai Shree Pradhan <[email protected]> * Fix telemetry loss during Python shutdown Signed-off-by: Sai Shree Pradhan <[email protected]> * unit tests for export_failure_log Signed-off-by: Sai Shree Pradhan <[email protected]> * try-catch blocks to make telemetry failures non-blocking for connector operations Signed-off-by: Sai Shree Pradhan <[email protected]> * removed redundant try/catch blocks, added try/catch block to initialize and get telemetry client Signed-off-by: Sai Shree Pradhan <[email protected]> * skip null fields in telemetry request Signed-off-by: Sai Shree Pradhan <[email protected]> * removed dup import, renamed func, changed a filter_null_values to lamda Signed-off-by: Sai Shree Pradhan <[email protected]> * removed unnecassary class variable and a redundant try/except block Signed-off-by: Sai Shree Pradhan <[email protected]> * public functions defined at interface level Signed-off-by: Sai Shree Pradhan <[email protected]> * changed export_event and flush to private functions Signed-off-by: Sai Shree Pradhan <[email protected]> * formatting Signed-off-by: Sai Shree Pradhan <[email protected]> * changed connection_uuid to thread local in thrift backend Signed-off-by: Sai Shree Pradhan <[email protected]> * made errors more specific Signed-off-by: Sai Shree Pradhan <[email protected]> * revert change to connection_uuid Signed-off-by: Sai Shree Pradhan <[email protected]> * reverting change in close in telemetry client Signed-off-by: Sai Shree Pradhan <[email protected]> * JsonSerializableMixin Signed-off-by: Sai Shree Pradhan <[email protected]> * isdataclass check in JsonSerializableMixin Signed-off-by: Sai Shree Pradhan <[email protected]> * convert TelemetryClientFactory to module-level functions, replace NoopTelemetryClient class with NOOP_TELEMETRY_CLIENT singleton, updated tests accordingly Signed-off-by: Sai Shree Pradhan <[email protected]> * renamed connection_uuid as session_id_hex Signed-off-by: Sai Shree Pradhan <[email protected]> * added NotImplementedError to abstract class, added unit tests Signed-off-by: Sai Shree Pradhan <[email protected]> * formatting Signed-off-by: Sai Shree Pradhan <[email protected]> * added PEP-249 link, changed NoopTelemetryClient implementation Signed-off-by: Sai Shree Pradhan <[email protected]> * removed unused import Signed-off-by: Sai Shree Pradhan <[email protected]> * made telemetry client close a module-level function Signed-off-by: Sai Shree Pradhan <[email protected]> * unit tests verbose Signed-off-by: Sai Shree Pradhan <[email protected]> * debug logs in unit tests Signed-off-by: Sai Shree Pradhan <[email protected]> * debug logs in unit tests Signed-off-by: Sai Shree Pradhan <[email protected]> * removed ABC from mixin, added try/catch block around executor shutdown Signed-off-by: Sai Shree Pradhan <[email protected]> * checking stuff Signed-off-by: Sai Shree Pradhan <[email protected]> * finding out * finding out more * more more finding out more nice * locks are useless anyways * haha * normal * := looks like walrus horizontally * one more * walrus again * old stuff without walrus seems to fail * manually do the walrussing * change 3.13t, v2 Signed-off-by: Sai Shree Pradhan <[email protected]> * formatting, added walrus Signed-off-by: Sai Shree Pradhan <[email protected]> * formatting Signed-off-by: Sai Shree Pradhan <[email protected]> * removed walrus, removed test before stalling test Signed-off-by: Sai Shree Pradhan <[email protected]> * changed order of stalling test Signed-off-by: Sai Shree Pradhan <[email protected]> * removed debugging, added TelemetryClientFactory Signed-off-by: Sai Shree Pradhan <[email protected]> * remove more debugging Signed-off-by: Sai Shree Pradhan <[email protected]> * latency logs funcitionality Signed-off-by: Sai Shree Pradhan <[email protected]> * fixed type of return value in get_session_id_hex() in thrift backend Signed-off-by: Sai Shree Pradhan <[email protected]> * debug on TelemetryClientFactory lock Signed-off-by: Sai Shree Pradhan <[email protected]> * formatting Signed-off-by: Sai Shree Pradhan <[email protected]> * type notation for _waiters Signed-off-by: Sai Shree Pradhan <[email protected]> * called connection.close() in test_arraysize_buffer_size_passthrough Signed-off-by: Sai Shree Pradhan <[email protected]> * run all unit tests Signed-off-by: Sai Shree Pradhan <[email protected]> * more debugging Signed-off-by: Sai Shree Pradhan <[email protected]> * removed the connection.close() from that test, put debug statement before and after TelemetryClientFactory lock Signed-off-by: Sai Shree Pradhan <[email protected]> * more debug Signed-off-by: Sai Shree Pradhan <[email protected]> * more more more Signed-off-by: Sai Shree Pradhan <[email protected]> * why Signed-off-by: Sai Shree Pradhan <[email protected]> * whywhy Signed-off-by: Sai Shree Pradhan <[email protected]> * thread name Signed-off-by: Sai Shree Pradhan <[email protected]> * added teardown to all tests except finalizer test (gc collect) Signed-off-by: Sai Shree Pradhan <[email protected]> * added the get_attribute functions to the classes Signed-off-by: Sai Shree Pradhan <[email protected]> * removed tearDown, added connection.close() to first test Signed-off-by: Sai Shree Pradhan <[email protected]> * finally Signed-off-by: Sai Shree Pradhan <[email protected]> * remove debugging Signed-off-by: Sai Shree Pradhan <[email protected]> * added test for export_latency_log, made mock of thrift backend with retry policy Signed-off-by: Sai Shree Pradhan <[email protected]> * added multi threaded tests Signed-off-by: Sai Shree Pradhan <[email protected]> * formatting Signed-off-by: Sai Shree Pradhan <[email protected]> * added TelemetryExtractor, removed multithreaded tests Signed-off-by: Sai Shree Pradhan <[email protected]> * formatting Signed-off-by: Sai Shree Pradhan <[email protected]> * fixes in test Signed-off-by: Sai Shree Pradhan <[email protected]> * fix in telemetry extractor Signed-off-by: Sai Shree Pradhan <[email protected]> * added doc strings to latency_logger, abstracted export_telemetry_log Signed-off-by: Sai Shree Pradhan <[email protected]> * statement type, unit test fix Signed-off-by: Sai Shree Pradhan <[email protected]> * unit test fix Signed-off-by: Sai Shree Pradhan <[email protected]> * statement type changes Signed-off-by: Sai Shree Pradhan <[email protected]> * test_fetches fix Signed-off-by: Sai Shree Pradhan <[email protected]> * added mocks to resolve the errors caused by log_latency decorator in tests Signed-off-by: Sai Shree Pradhan <[email protected]> * removed function in test_fetches cuz it is only used once Signed-off-by: Sai Shree Pradhan <[email protected]> * added _safe_call which returns None in case of errors in the get functions Signed-off-by: Sai Shree Pradhan <[email protected]> * removed the changes in test_client and test_fetches Signed-off-by: Sai Shree Pradhan <[email protected]> * removed the changes in test_fetches Signed-off-by: Sai Shree Pradhan <[email protected]> * test_telemetry Signed-off-by: Sai Shree Pradhan <[email protected]> * removed test Signed-off-by: Sai Shree Pradhan <[email protected]> --------- Signed-off-by: Sai Shree Pradhan <[email protected]>
1 parent 6748c2c commit 0dfe0f4

File tree

4 files changed

+475
-440
lines changed

4 files changed

+475
-440
lines changed

src/databricks/sql/client.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@
6161
DriverConnectionParameters,
6262
HostDetails,
6363
)
64-
64+
from databricks.sql.telemetry.latency_logger import log_latency
65+
from databricks.sql.telemetry.models.enums import StatementType
6566

6667
logger = logging.getLogger(__name__)
6768

@@ -745,6 +746,7 @@ def _handle_staging_operation(
745746
session_id_hex=self.connection.get_session_id_hex(),
746747
)
747748

749+
@log_latency(StatementType.SQL)
748750
def _handle_staging_put(
749751
self, presigned_url: str, local_file: str, headers: Optional[dict] = None
750752
):
@@ -784,6 +786,7 @@ def _handle_staging_put(
784786
+ "but not yet applied on the server. It's possible this command may fail later."
785787
)
786788

789+
@log_latency(StatementType.SQL)
787790
def _handle_staging_get(
788791
self, local_file: str, presigned_url: str, headers: Optional[dict] = None
789792
):
@@ -811,6 +814,7 @@ def _handle_staging_get(
811814
with open(local_file, "wb") as fp:
812815
fp.write(r.content)
813816

817+
@log_latency(StatementType.SQL)
814818
def _handle_staging_remove(
815819
self, presigned_url: str, headers: Optional[dict] = None
816820
):
@@ -824,6 +828,7 @@ def _handle_staging_remove(
824828
session_id_hex=self.connection.get_session_id_hex(),
825829
)
826830

831+
@log_latency(StatementType.QUERY)
827832
def execute(
828833
self,
829834
operation: str,
@@ -914,6 +919,7 @@ def execute(
914919

915920
return self
916921

922+
@log_latency(StatementType.QUERY)
917923
def execute_async(
918924
self,
919925
operation: str,
@@ -1039,6 +1045,7 @@ def executemany(self, operation, seq_of_parameters):
10391045
self.execute(operation, parameters)
10401046
return self
10411047

1048+
@log_latency(StatementType.METADATA)
10421049
def catalogs(self) -> "Cursor":
10431050
"""
10441051
Get all available catalogs.
@@ -1062,6 +1069,7 @@ def catalogs(self) -> "Cursor":
10621069
)
10631070
return self
10641071

1072+
@log_latency(StatementType.METADATA)
10651073
def schemas(
10661074
self, catalog_name: Optional[str] = None, schema_name: Optional[str] = None
10671075
) -> "Cursor":
@@ -1090,6 +1098,7 @@ def schemas(
10901098
)
10911099
return self
10921100

1101+
@log_latency(StatementType.METADATA)
10931102
def tables(
10941103
self,
10951104
catalog_name: Optional[str] = None,
@@ -1125,6 +1134,7 @@ def tables(
11251134
)
11261135
return self
11271136

1137+
@log_latency(StatementType.METADATA)
11281138
def columns(
11291139
self,
11301140
catalog_name: Optional[str] = None,
@@ -1379,6 +1389,7 @@ def _fill_results_buffer(self):
13791389
self.results = results
13801390
self.has_more_rows = has_more_rows
13811391

1392+
@log_latency()
13821393
def _convert_columnar_table(self, table):
13831394
column_names = [c[0] for c in self.description]
13841395
ResultRow = Row(*column_names)
@@ -1391,6 +1402,7 @@ def _convert_columnar_table(self, table):
13911402

13921403
return result
13931404

1405+
@log_latency()
13941406
def _convert_arrow_table(self, table):
13951407
column_names = [c[0] for c in self.description]
13961408
ResultRow = Row(*column_names)
@@ -1433,6 +1445,7 @@ def _convert_arrow_table(self, table):
14331445
def rownumber(self):
14341446
return self._next_row_index
14351447

1448+
@log_latency()
14361449
def fetchmany_arrow(self, size: int) -> "pyarrow.Table":
14371450
"""
14381451
Fetch the next set of rows of a query result, returning a PyArrow table.
@@ -1475,6 +1488,7 @@ def merge_columnar(self, result1, result2):
14751488
]
14761489
return ColumnTable(merged_result, result1.column_names)
14771490

1491+
@log_latency()
14781492
def fetchmany_columnar(self, size: int):
14791493
"""
14801494
Fetch the next set of rows of a query result, returning a Columnar Table.
@@ -1500,6 +1514,7 @@ def fetchmany_columnar(self, size: int):
15001514

15011515
return results
15021516

1517+
@log_latency()
15031518
def fetchall_arrow(self) -> "pyarrow.Table":
15041519
"""Fetch all (remaining) rows of a query result, returning them as a PyArrow table."""
15051520
results = self.results.remaining_rows()
@@ -1526,6 +1541,7 @@ def fetchall_arrow(self) -> "pyarrow.Table":
15261541
return pyarrow.Table.from_pydict(data)
15271542
return results
15281543

1544+
@log_latency()
15291545
def fetchall_columnar(self):
15301546
"""Fetch all (remaining) rows of a query result, returning them as a Columnar table."""
15311547
results = self.results.remaining_rows()
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
import time
2+
import functools
3+
from typing import Optional
4+
from databricks.sql.telemetry.telemetry_client import TelemetryClientFactory
5+
from databricks.sql.telemetry.models.event import (
6+
SqlExecutionEvent,
7+
)
8+
from databricks.sql.telemetry.models.enums import ExecutionResultFormat, StatementType
9+
from databricks.sql.utils import ColumnQueue, CloudFetchQueue, ArrowQueue
10+
from uuid import UUID
11+
12+
13+
class TelemetryExtractor:
14+
"""
15+
Base class for extracting telemetry information from various object types.
16+
17+
This class serves as a proxy that delegates attribute access to the wrapped object
18+
while providing a common interface for extracting telemetry-related data.
19+
"""
20+
21+
def __init__(self, obj):
22+
"""
23+
Initialize the extractor with an object to wrap.
24+
25+
Args:
26+
obj: The object to extract telemetry information from.
27+
"""
28+
self._obj = obj
29+
30+
def __getattr__(self, name):
31+
"""
32+
Delegate attribute access to the wrapped object.
33+
34+
Args:
35+
name (str): The name of the attribute to access.
36+
37+
Returns:
38+
The attribute value from the wrapped object.
39+
"""
40+
return getattr(self._obj, name)
41+
42+
def get_session_id_hex(self):
43+
pass
44+
45+
def get_statement_id(self):
46+
pass
47+
48+
def get_is_compressed(self):
49+
pass
50+
51+
def get_execution_result(self):
52+
pass
53+
54+
def get_retry_count(self):
55+
pass
56+
57+
58+
class CursorExtractor(TelemetryExtractor):
59+
"""
60+
Telemetry extractor specialized for Cursor objects.
61+
62+
Extracts telemetry information from database cursor objects, including
63+
statement IDs, session information, compression settings, and result formats.
64+
"""
65+
66+
def get_statement_id(self) -> Optional[str]:
67+
return self.query_id
68+
69+
def get_session_id_hex(self) -> Optional[str]:
70+
return self.connection.get_session_id_hex()
71+
72+
def get_is_compressed(self) -> bool:
73+
return self.connection.lz4_compression
74+
75+
def get_execution_result(self) -> ExecutionResultFormat:
76+
if self.active_result_set is None:
77+
return ExecutionResultFormat.FORMAT_UNSPECIFIED
78+
79+
if isinstance(self.active_result_set.results, ColumnQueue):
80+
return ExecutionResultFormat.COLUMNAR_INLINE
81+
elif isinstance(self.active_result_set.results, CloudFetchQueue):
82+
return ExecutionResultFormat.EXTERNAL_LINKS
83+
elif isinstance(self.active_result_set.results, ArrowQueue):
84+
return ExecutionResultFormat.INLINE_ARROW
85+
return ExecutionResultFormat.FORMAT_UNSPECIFIED
86+
87+
def get_retry_count(self) -> int:
88+
if (
89+
hasattr(self.thrift_backend, "retry_policy")
90+
and self.thrift_backend.retry_policy
91+
):
92+
return len(self.thrift_backend.retry_policy.history)
93+
return 0
94+
95+
96+
class ResultSetExtractor(TelemetryExtractor):
97+
"""
98+
Telemetry extractor specialized for ResultSet objects.
99+
100+
Extracts telemetry information from database result set objects, including
101+
operation IDs, session information, compression settings, and result formats.
102+
"""
103+
104+
def get_statement_id(self) -> Optional[str]:
105+
if self.command_id:
106+
return str(UUID(bytes=self.command_id.operationId.guid))
107+
return None
108+
109+
def get_session_id_hex(self) -> Optional[str]:
110+
return self.connection.get_session_id_hex()
111+
112+
def get_is_compressed(self) -> bool:
113+
return self.lz4_compressed
114+
115+
def get_execution_result(self) -> ExecutionResultFormat:
116+
if isinstance(self.results, ColumnQueue):
117+
return ExecutionResultFormat.COLUMNAR_INLINE
118+
elif isinstance(self.results, CloudFetchQueue):
119+
return ExecutionResultFormat.EXTERNAL_LINKS
120+
elif isinstance(self.results, ArrowQueue):
121+
return ExecutionResultFormat.INLINE_ARROW
122+
return ExecutionResultFormat.FORMAT_UNSPECIFIED
123+
124+
def get_retry_count(self) -> int:
125+
if (
126+
hasattr(self.thrift_backend, "retry_policy")
127+
and self.thrift_backend.retry_policy
128+
):
129+
return len(self.thrift_backend.retry_policy.history)
130+
return 0
131+
132+
133+
def get_extractor(obj):
134+
"""
135+
Factory function to create the appropriate telemetry extractor for an object.
136+
137+
Determines the object type and returns the corresponding specialized extractor
138+
that can extract telemetry information from that object type.
139+
140+
Args:
141+
obj: The object to create an extractor for. Can be a Cursor, ResultSet,
142+
or any other object.
143+
144+
Returns:
145+
TelemetryExtractor: A specialized extractor instance:
146+
- CursorExtractor for Cursor objects
147+
- ResultSetExtractor for ResultSet objects
148+
- Throws an NotImplementedError for all other objects
149+
"""
150+
if obj.__class__.__name__ == "Cursor":
151+
return CursorExtractor(obj)
152+
elif obj.__class__.__name__ == "ResultSet":
153+
return ResultSetExtractor(obj)
154+
else:
155+
raise NotImplementedError(f"No extractor found for {obj.__class__.__name__}")
156+
157+
158+
def log_latency(statement_type: StatementType = StatementType.NONE):
159+
"""
160+
Decorator for logging execution latency and telemetry information.
161+
162+
This decorator measures the execution time of a method and sends telemetry
163+
data about the operation, including latency, statement information, and
164+
execution context.
165+
166+
The decorator automatically:
167+
- Measures execution time using high-precision performance counters
168+
- Extracts telemetry information from the method's object (self)
169+
- Creates a SqlExecutionEvent with execution details
170+
- Sends the telemetry data asynchronously via TelemetryClient
171+
172+
Args:
173+
statement_type (StatementType): The type of SQL statement being executed.
174+
175+
Usage:
176+
@log_latency(StatementType.SQL)
177+
def execute(self, query):
178+
# Method implementation
179+
pass
180+
181+
Returns:
182+
function: A decorator that wraps methods to add latency logging.
183+
184+
Note:
185+
The wrapped method's object (self) must be compatible with the
186+
telemetry extractor system (e.g., Cursor or ResultSet objects).
187+
"""
188+
189+
def decorator(func):
190+
@functools.wraps(func)
191+
def wrapper(self, *args, **kwargs):
192+
start_time = time.perf_counter()
193+
result = None
194+
try:
195+
result = func(self, *args, **kwargs)
196+
return result
197+
finally:
198+
199+
def _safe_call(func_to_call):
200+
"""Calls a function and returns a default value on any exception."""
201+
try:
202+
return func_to_call()
203+
except Exception:
204+
return None
205+
206+
end_time = time.perf_counter()
207+
duration_ms = int((end_time - start_time) * 1000)
208+
209+
extractor = get_extractor(self)
210+
session_id_hex = _safe_call(extractor.get_session_id_hex)
211+
statement_id = _safe_call(extractor.get_statement_id)
212+
213+
sql_exec_event = SqlExecutionEvent(
214+
statement_type=statement_type,
215+
is_compressed=_safe_call(extractor.get_is_compressed),
216+
execution_result=_safe_call(extractor.get_execution_result),
217+
retry_count=_safe_call(extractor.get_retry_count),
218+
)
219+
220+
telemetry_client = TelemetryClientFactory.get_telemetry_client(
221+
session_id_hex
222+
)
223+
telemetry_client.export_latency_log(
224+
latency_ms=duration_ms,
225+
sql_execution_event=sql_exec_event,
226+
sql_statement_id=statement_id,
227+
)
228+
229+
return wrapper
230+
231+
return decorator

0 commit comments

Comments
 (0)