From 8d8e1a87d1dca993f8377d5e92dbc915b91cff52 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Fri, 27 Jun 2025 11:16:46 +0530 Subject: [PATCH 1/4] send telemetry to unauth endpoint in case of connection/auth errors Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/client.py | 54 +++++++++---- src/databricks/sql/exc.py | 55 +++++++++++++ src/databricks/sql/telemetry/models/event.py | 2 +- .../sql/telemetry/telemetry_client.py | 81 ++++++++++++++++++- 4 files changed, 174 insertions(+), 18 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index c137306a7..429f82d53 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -21,6 +21,8 @@ InterfaceError, NotSupportedError, ProgrammingError, + AuthenticationError, + ConnectionError, ) from databricks.sql.thrift_api.TCLIService import ttypes from databricks.sql.thrift_backend import ThriftBackend @@ -241,9 +243,18 @@ def read(self) -> Optional[OAuthToken]: self.disable_pandas = kwargs.get("_disable_pandas", False) self.lz4_compression = kwargs.get("enable_query_result_lz4_compression", True) - auth_provider = get_python_sql_connector_auth_provider( - server_hostname, **kwargs - ) + try: + auth_provider = get_python_sql_connector_auth_provider( + server_hostname, **kwargs + ) + except Exception as e: + raise AuthenticationError( + message=f"Failed to create authentication provider: {str(e)}", + host_url=server_hostname, + http_path=http_path, + port=self.port, + original_exception=e + ) from e self.server_telemetry_enabled = True self.client_telemetry_enabled = kwargs.get("enable_telemetry", False) @@ -281,20 +292,31 @@ def read(self) -> Optional[OAuthToken]: tls_client_cert_key_password=kwargs.get("_tls_client_cert_key_password"), ) - self.thrift_backend = ThriftBackend( - self.host, - self.port, - http_path, - (http_headers or []) + base_headers, - auth_provider, - ssl_options=self._ssl_options, - _use_arrow_native_complex_types=_use_arrow_native_complex_types, - **kwargs, - ) + try: + self.thrift_backend = ThriftBackend( + self.host, + self.port, + http_path, + (http_headers or []) + base_headers, + auth_provider, + ssl_options=self._ssl_options, + _use_arrow_native_complex_types=_use_arrow_native_complex_types, + **kwargs, + ) + + self._open_session_resp = self.thrift_backend.open_session( + session_configuration, catalog, schema + ) + except Exception as e: + raise ConnectionError( + message=f"Failed to establish connection: {str(e)}", + host_url=self.host, + http_path=http_path, + port=self.port, + user_agent=useragent_header, + original_exception=e + ) from e - self._open_session_resp = self.thrift_backend.open_session( - session_configuration, catalog, schema - ) self._session_handle = self._open_session_resp.sessionHandle self.protocol_version = self.get_protocol_version(self._open_session_resp) self.use_cloud_fetch = kwargs.get("use_cloud_fetch", True) diff --git a/src/databricks/sql/exc.py b/src/databricks/sql/exc.py index 30fd6c26d..0bbe54301 100644 --- a/src/databricks/sql/exc.py +++ b/src/databricks/sql/exc.py @@ -22,10 +22,26 @@ def __init__( error_name = self.__class__.__name__ if session_id_hex: + # Normal case: we have a session, send to regular telemetry client telemetry_client = TelemetryClientFactory.get_telemetry_client( session_id_hex ) telemetry_client.export_failure_log(error_name, self.message) + elif isinstance(self, (ConnectionError, AuthenticationError)) and 'host_url' in self.context: + # Connection error case: no session but we should still send telemetry + self._send_connection_error_telemetry(error_name) + + def _send_connection_error_telemetry(self, error_name): + """Send connection error telemetry to unauthenticated endpoint""" + + TelemetryClientFactory.send_connection_error_telemetry( + error_name=error_name, + error_message=self.message or str(self), + host_url=self.context['host_url'], + http_path=self.context.get('http_path', ''), + port=self.context.get('port', 443), + user_agent=self.context.get('user_agent'), + ) def __str__(self): return self.message @@ -126,3 +142,42 @@ class SessionAlreadyClosedError(RequestError): class CursorAlreadyClosedError(RequestError): """Thrown if CancelOperation receives a code 404. ThriftBackend should gracefully proceed as this is expected.""" + + +class ConnectionError(OperationalError): + """Thrown when connection to Databricks fails during initial setup""" + + def __init__( + self, + message=None, + host_url=None, + http_path=None, + port=443, + user_agent=None, + original_exception=None, + **kwargs + ): + # Set up context for connection error telemetry + context = kwargs.get('context', {}) + if host_url: + context.update({ + 'host_url': host_url, + 'http_path': http_path or '', + 'port': port, + 'user_agent': user_agent, + 'original_exception': str(original_exception) if original_exception else None, + }) + + super().__init__(message=message, context=context, **kwargs) + + +class AuthenticationError(ConnectionError): + """Thrown when authentication to Databricks fails""" + + def __init__(self, message=None, auth_type=None, **kwargs): + context = kwargs.get('context', {}) + if auth_type: + context['auth_type'] = auth_type + kwargs['context'] = context + + super().__init__(message=message, **kwargs) diff --git a/src/databricks/sql/telemetry/models/event.py b/src/databricks/sql/telemetry/models/event.py index f5496deec..a155c7597 100644 --- a/src/databricks/sql/telemetry/models/event.py +++ b/src/databricks/sql/telemetry/models/event.py @@ -149,9 +149,9 @@ class TelemetryEvent(JsonSerializableMixin): operation_latency_ms (Optional[int]): Operation latency in milliseconds """ - session_id: str system_configuration: DriverSystemConfiguration driver_connection_params: DriverConnectionParameters + session_id: Optional[str] = None sql_statement_id: Optional[str] = None auth_type: Optional[str] = None vol_operation: Optional[DriverVolumeOperation] = None diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index 10aa04eff..116aef234 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -9,6 +9,8 @@ TelemetryEvent, DriverSystemConfiguration, DriverErrorInfo, + DriverConnectionParameters, + HostDetails, ) from databricks.sql.telemetry.models.frontend_logs import ( TelemetryFrontendLog, @@ -16,7 +18,7 @@ FrontendLogContext, FrontendLogEntry, ) -from databricks.sql.telemetry.models.enums import AuthMech, AuthFlow +from databricks.sql.telemetry.models.enums import AuthMech, AuthFlow, DatabricksClientType from databricks.sql.auth.authenticators import ( AccessTokenAuthProvider, DatabricksOAuthProvider, @@ -434,3 +436,80 @@ def close(session_id_hex): TelemetryClientFactory._executor.shutdown(wait=True) TelemetryClientFactory._executor = None TelemetryClientFactory._initialized = False + + @staticmethod + def send_connection_error_telemetry( + error_name: str, + error_message: str, + host_url: str, + http_path: str, + port: int = 443, + user_agent: Optional[str] = None, + ): + """Send error telemetry when connection creation fails, without requiring a session""" + try: + logger.debug("Sending connection error telemetry for host: %s", host_url) + + # Initialize factory if needed (with proper locking) + with TelemetryClientFactory._lock: + TelemetryClientFactory._initialize() + + # Create driver connection params for the failed connection + driver_connection_params = DriverConnectionParameters( + http_path=http_path, + mode=DatabricksClientType.THRIFT, + host_info=HostDetails(host_url=host_url, port=port), + ) + + error_info = DriverErrorInfo( + error_name=error_name, + stack_trace=error_message + ) + + telemetry_frontend_log = TelemetryFrontendLog( + frontend_log_event_id=str(uuid.uuid4()), + context=FrontendLogContext( + client_context=TelemetryClientContext( + timestamp_millis=int(time.time() * 1000), + user_agent=user_agent or "PyDatabricksSqlConnector", + ) + ), + entry=FrontendLogEntry( + sql_driver_log=TelemetryEvent( + system_configuration=TelemetryHelper.get_driver_system_configuration(), + driver_connection_params=driver_connection_params, + error_info=error_info, + ) + ), + ) + + # Send to unauthenticated endpoint since we don't have working auth + request = { + "uploadTime": int(time.time() * 1000), + "items": [], + "protoLogs": [telemetry_frontend_log.to_json()], + } + + url = f"https://{host_url}/telemetry-unauth" + headers = {"Accept": "application/json", "Content-Type": "application/json"} + + # Send synchronously for connection errors since we're probably about to exit + try: + response = requests.post( + url, + data=json.dumps(request), + headers=headers, + timeout=5, + ) + if response.status_code == 200: + logger.debug("Connection error telemetry sent successfully") + else: + logger.debug( + "Connection error telemetry failed with status: %s", + response.status_code, + ) + except Exception as e: + logger.debug("Failed to send connection error telemetry: %s", e) + + except Exception as e: + logger.debug("Failed to create connection error telemetry: %s", e) \ No newline at end of file From 4eb51ef98520ab4a0f1fa5035b5407c30fcc441a Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Fri, 27 Jun 2025 11:25:55 +0530 Subject: [PATCH 2/4] formatting Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/client.py | 4 +- src/databricks/sql/exc.py | 57 +++++++++++-------- .../sql/telemetry/telemetry_client.py | 11 ++-- 3 files changed, 41 insertions(+), 31 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 429f82d53..5e3eacd37 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -253,7 +253,7 @@ def read(self) -> Optional[OAuthToken]: host_url=server_hostname, http_path=http_path, port=self.port, - original_exception=e + original_exception=e, ) from e self.server_telemetry_enabled = True @@ -314,7 +314,7 @@ def read(self) -> Optional[OAuthToken]: http_path=http_path, port=self.port, user_agent=useragent_header, - original_exception=e + original_exception=e, ) from e self._session_handle = self._open_session_resp.sessionHandle diff --git a/src/databricks/sql/exc.py b/src/databricks/sql/exc.py index 0bbe54301..d3a175e26 100644 --- a/src/databricks/sql/exc.py +++ b/src/databricks/sql/exc.py @@ -27,20 +27,23 @@ def __init__( session_id_hex ) telemetry_client.export_failure_log(error_name, self.message) - elif isinstance(self, (ConnectionError, AuthenticationError)) and 'host_url' in self.context: + elif ( + isinstance(self, (ConnectionError, AuthenticationError)) + and "host_url" in self.context + ): # Connection error case: no session but we should still send telemetry self._send_connection_error_telemetry(error_name) def _send_connection_error_telemetry(self, error_name): """Send connection error telemetry to unauthenticated endpoint""" - + TelemetryClientFactory.send_connection_error_telemetry( error_name=error_name, error_message=self.message or str(self), - host_url=self.context['host_url'], - http_path=self.context.get('http_path', ''), - port=self.context.get('port', 443), - user_agent=self.context.get('user_agent'), + host_url=self.context["host_url"], + http_path=self.context.get("http_path", ""), + port=self.context.get("port", 443), + user_agent=self.context.get("user_agent"), ) def __str__(self): @@ -146,38 +149,42 @@ class CursorAlreadyClosedError(RequestError): class ConnectionError(OperationalError): """Thrown when connection to Databricks fails during initial setup""" - + def __init__( - self, - message=None, - host_url=None, - http_path=None, + self, + message=None, + host_url=None, + http_path=None, port=443, user_agent=None, original_exception=None, **kwargs ): # Set up context for connection error telemetry - context = kwargs.get('context', {}) + context = kwargs.get("context", {}) if host_url: - context.update({ - 'host_url': host_url, - 'http_path': http_path or '', - 'port': port, - 'user_agent': user_agent, - 'original_exception': str(original_exception) if original_exception else None, - }) - + context.update( + { + "host_url": host_url, + "http_path": http_path or "", + "port": port, + "user_agent": user_agent, + "original_exception": str(original_exception) + if original_exception + else None, + } + ) + super().__init__(message=message, context=context, **kwargs) class AuthenticationError(ConnectionError): """Thrown when authentication to Databricks fails""" - + def __init__(self, message=None, auth_type=None, **kwargs): - context = kwargs.get('context', {}) + context = kwargs.get("context", {}) if auth_type: - context['auth_type'] = auth_type - kwargs['context'] = context - + context["auth_type"] = auth_type + kwargs["context"] = context + super().__init__(message=message, **kwargs) diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index 116aef234..092089ddf 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -18,7 +18,11 @@ FrontendLogContext, FrontendLogEntry, ) -from databricks.sql.telemetry.models.enums import AuthMech, AuthFlow, DatabricksClientType +from databricks.sql.telemetry.models.enums import ( + AuthMech, + AuthFlow, + DatabricksClientType, +) from databricks.sql.auth.authenticators import ( AccessTokenAuthProvider, DatabricksOAuthProvider, @@ -462,8 +466,7 @@ def send_connection_error_telemetry( ) error_info = DriverErrorInfo( - error_name=error_name, - stack_trace=error_message + error_name=error_name, stack_trace=error_message ) telemetry_frontend_log = TelemetryFrontendLog( @@ -512,4 +515,4 @@ def send_connection_error_telemetry( logger.debug("Failed to send connection error telemetry: %s", e) except Exception as e: - logger.debug("Failed to create connection error telemetry: %s", e) \ No newline at end of file + logger.debug("Failed to create connection error telemetry: %s", e) From 0b7686c99ecfb6dca6761d7710fae037bd6cb108 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Fri, 27 Jun 2025 11:51:23 +0530 Subject: [PATCH 3/4] added unit test for send_connection_error_telemetry Signed-off-by: Sai Shree Pradhan --- .../sql/telemetry/telemetry_client.py | 29 ++++---- tests/unit/test_telemetry.py | 67 ++++++++++++++++++- 2 files changed, 79 insertions(+), 17 deletions(-) diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index 092089ddf..7e3964406 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -497,22 +497,19 @@ def send_connection_error_telemetry( headers = {"Accept": "application/json", "Content-Type": "application/json"} # Send synchronously for connection errors since we're probably about to exit - try: - response = requests.post( - url, - data=json.dumps(request), - headers=headers, - timeout=5, + response = requests.post( + url, + data=json.dumps(request), + headers=headers, + timeout=5, + ) + if response.status_code == 200: + logger.debug("Connection error telemetry sent successfully") + else: + logger.debug( + "Connection error telemetry failed with status: %s", + response.status_code, ) - if response.status_code == 200: - logger.debug("Connection error telemetry sent successfully") - else: - logger.debug( - "Connection error telemetry failed with status: %s", - response.status_code, - ) - except Exception as e: - logger.debug("Failed to send connection error telemetry: %s", e) except Exception as e: - logger.debug("Failed to create connection error telemetry: %s", e) + logger.debug("Failed to send connection error telemetry: %s", e) diff --git a/tests/unit/test_telemetry.py b/tests/unit/test_telemetry.py index 699480bbe..3ca9e873b 100644 --- a/tests/unit/test_telemetry.py +++ b/tests/unit/test_telemetry.py @@ -498,4 +498,69 @@ def test_global_exception_hook(self, mock_handle_exception, telemetry_system_res test_exception = ValueError("Test exception") TelemetryClientFactory._handle_unhandled_exception(type(test_exception), test_exception, None) - mock_handle_exception.assert_called_once_with(type(test_exception), test_exception, None) \ No newline at end of file + mock_handle_exception.assert_called_once_with(type(test_exception), test_exception, None) + + @patch("requests.post") + @patch("databricks.sql.telemetry.telemetry_client.TelemetryHelper.get_driver_system_configuration") + @patch("databricks.sql.telemetry.telemetry_client.TelemetryFrontendLog") + @patch("databricks.sql.telemetry.telemetry_client.DriverErrorInfo") + @patch("databricks.sql.telemetry.telemetry_client.DriverConnectionParameters") + @patch("databricks.sql.telemetry.telemetry_client.uuid.uuid4") + @patch("databricks.sql.telemetry.telemetry_client.time.time") + def test_send_connection_error_telemetry( + self, + mock_time, + mock_uuid4, + mock_driver_connection_params, + mock_driver_error_info, + mock_frontend_log, + mock_get_driver_config, + mock_post, + telemetry_system_reset + ): + """Test connection error telemetry functionality.""" + # Setup mocks + mock_time.return_value = 1000 + mock_uuid4.return_value = "test-uuid" + mock_get_driver_config.return_value = MagicMock() + mock_driver_connection_params.return_value = MagicMock() + mock_driver_error_info.return_value = MagicMock() + + mock_frontend_log_instance = MagicMock() + mock_frontend_log_instance.to_json.return_value = '{"test": "data"}' + mock_frontend_log.return_value = mock_frontend_log_instance + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_post.return_value = mock_response + + # Test successful call + TelemetryClientFactory.send_connection_error_telemetry( + error_name="ConnectionError", + error_message="Failed to connect", + host_url="test.databricks.com", + http_path="/sql/1.0/endpoints/test", + port=443, + user_agent="TestAgent" + ) + + # Verify requests.post was called correctly + mock_post.assert_called_once() + args, kwargs = mock_post.call_args + assert args[0] == "https://test.databricks.com/telemetry-unauth" + assert kwargs["headers"]["Accept"] == "application/json" + assert kwargs["timeout"] == 5 + + # Test that exceptions don't break the function + mock_post.reset_mock() + mock_post.side_effect = Exception("Network error") + + # Should not raise exception + TelemetryClientFactory.send_connection_error_telemetry( + error_name="AuthenticationError", + error_message="Auth failed", + host_url="test.databricks.com", + http_path="/sql/1.0/endpoints/test" + ) + + mock_post.assert_called_once() \ No newline at end of file From 4c219dad2b769cd0cefbb9382444e197013fe77b Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Mon, 30 Jun 2025 09:28:39 +0530 Subject: [PATCH 4/4] retry errors Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 5e3eacd37..bf7cf77dc 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -24,6 +24,7 @@ AuthenticationError, ConnectionError, ) +from urllib3.exceptions import MaxRetryError from databricks.sql.thrift_api.TCLIService import ttypes from databricks.sql.thrift_backend import ThriftBackend from databricks.sql.utils import ( @@ -307,6 +308,8 @@ def read(self) -> Optional[OAuthToken]: self._open_session_resp = self.thrift_backend.open_session( session_configuration, catalog, schema ) + except (RequestError, MaxRetryError) as e: + raise except Exception as e: raise ConnectionError( message=f"Failed to establish connection: {str(e)}",