From 325b68e580df433875c2851b33f871fd5e27a163 Mon Sep 17 00:00:00 2001 From: varun-edachali-dbx Date: Tue, 3 Jun 2025 09:24:51 +0000 Subject: [PATCH 1/9] introduce http client (temp) and sea test file Signed-off-by: varun-edachali-dbx --- examples/experimental/sea_connector_test.py | 66 +++++++ .../sql/backend/utils/http_client.py | 174 ++++++++++++++++++ 2 files changed, 240 insertions(+) create mode 100644 examples/experimental/sea_connector_test.py create mode 100644 src/databricks/sql/backend/utils/http_client.py diff --git a/examples/experimental/sea_connector_test.py b/examples/experimental/sea_connector_test.py new file mode 100644 index 000000000..a27099da7 --- /dev/null +++ b/examples/experimental/sea_connector_test.py @@ -0,0 +1,66 @@ +import os +import sys +import logging +from databricks.sql.client import Connection + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +def test_sea_session(): + """ + Test opening and closing a SEA session using the connector. + + This function connects to a Databricks SQL endpoint using the SEA backend, + opens a session, and then closes it. + + Required environment variables: + - DATABRICKS_SERVER_HOSTNAME: Databricks server hostname + - DATABRICKS_HTTP_PATH: HTTP path for the SQL endpoint + - DATABRICKS_TOKEN: Personal access token for authentication + """ + + server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME") + http_path = os.environ.get("DATABRICKS_HTTP_PATH") + access_token = os.environ.get("DATABRICKS_TOKEN") + catalog = os.environ.get("DATABRICKS_CATALOG") + + if not all([server_hostname, http_path, access_token]): + logger.error("Missing required environment variables.") + logger.error("Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN.") + sys.exit(1) + + logger.info(f"Connecting to {server_hostname}") + logger.info(f"HTTP Path: {http_path}") + if catalog: + logger.info(f"Using catalog: {catalog}") + + try: + logger.info("Creating connection with SEA backend...") + connection = Connection( + server_hostname=server_hostname, + http_path=http_path, + access_token=access_token, + catalog=catalog, + schema="default", + use_sea=True, + user_agent_entry="SEA-Test-Client" # add custom user agent + ) + + logger.info(f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}") + logger.info(f"backend type: {type(connection.session.backend)}") + + # Close the connection + logger.info("Closing the SEA session...") + connection.close() + logger.info("Successfully closed SEA session") + + except Exception as e: + logger.error(f"Error testing SEA session: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + sys.exit(1) + + logger.info("SEA session test completed successfully") + +if __name__ == "__main__": + test_sea_session() \ No newline at end of file diff --git a/src/databricks/sql/backend/utils/http_client.py b/src/databricks/sql/backend/utils/http_client.py new file mode 100644 index 000000000..82980792c --- /dev/null +++ b/src/databricks/sql/backend/utils/http_client.py @@ -0,0 +1,174 @@ +import json +import logging +import requests +from typing import Dict, Any, Optional, Union, List +from urllib.parse import urljoin + +from databricks.sql.auth.authenticators import AuthProvider +from databricks.sql.types import SSLOptions + +logger = logging.getLogger(__name__) + + +class CustomHttpClient: + """ + HTTP client for Statement Execution API (SEA). + + This client handles the HTTP communication with the SEA endpoints, + including authentication, request formatting, and response parsing. + """ + + def __init__( + self, + server_hostname: str, + port: int, + http_path: str, + http_headers: List[tuple], + auth_provider: AuthProvider, + ssl_options: SSLOptions, + **kwargs, + ): + """ + Initialize the SEA HTTP client. + + Args: + server_hostname: Hostname of the Databricks server + port: Port number for the connection + http_path: HTTP path for the connection + http_headers: List of HTTP headers to include in requests + auth_provider: Authentication provider + ssl_options: SSL configuration options + **kwargs: Additional keyword arguments + """ + + self.server_hostname = server_hostname + self.port = port + self.http_path = http_path + self.auth_provider = auth_provider + self.ssl_options = ssl_options + + self.base_url = f"https://{server_hostname}:{port}" + + self.headers = dict(http_headers) + self.headers.update({"Content-Type": "application/json"}) + + self.max_retries = kwargs.get("_retry_stop_after_attempts_count", 30) + + # Create a session for connection pooling + self.session = requests.Session() + + # Configure SSL verification + if ssl_options.tls_verify: + self.session.verify = ssl_options.tls_trusted_ca_file or True + else: + self.session.verify = False + + # Configure client certificates if provided + if ssl_options.tls_client_cert_file: + client_cert = ssl_options.tls_client_cert_file + client_key = ssl_options.tls_client_cert_key_file + client_key_password = ssl_options.tls_client_cert_key_password + + if client_key: + self.session.cert = (client_cert, client_key) + else: + self.session.cert = client_cert + + if client_key_password: + # Note: requests doesn't directly support key passwords + # This would require more complex handling with libraries like pyOpenSSL + logger.warning( + "Client key password provided but not supported by requests library" + ) + + def _get_auth_headers(self) -> Dict[str, str]: + """Get authentication headers from the auth provider.""" + headers: Dict[str, str] = {} + self.auth_provider.add_headers(headers) + return headers + + def _make_request( + self, method: str, path: str, data: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """ + Make an HTTP request to the SEA endpoint. + + Args: + method: HTTP method (GET, POST, DELETE) + path: API endpoint path + data: Request payload data + + Returns: + Dict[str, Any]: Response data parsed from JSON + + Raises: + RequestError: If the request fails + """ + + url = urljoin(self.base_url, path) + headers = {**self.headers, **self._get_auth_headers()} + + logger.debug(f"making {method} request to {url}") + + try: + if method.upper() == "GET": + response = self.session.get(url, headers=headers, params=data) + elif method.upper() == "POST": + response = self.session.post(url, headers=headers, json=data) + elif method.upper() == "DELETE": + # For DELETE requests, use params for data (query parameters) + response = self.session.delete(url, headers=headers, params=data) + else: + raise ValueError(f"Unsupported HTTP method: {method}") + + # Check for HTTP errors + response.raise_for_status() + + # Log response details + logger.debug(f"Response status: {response.status_code}") + + # Parse JSON response + if response.content: + result = response.json() + # Log response content (but limit it for large responses) + content_str = json.dumps(result) + if len(content_str) > 1000: + logger.debug( + f"Response content (truncated): {content_str[:1000]}..." + ) + else: + logger.debug(f"Response content: {content_str}") + return result + return {} + + except requests.exceptions.RequestException as e: + # Handle request errors + error_message = f"SEA HTTP request failed: {str(e)}" + logger.error(error_message) + + # Extract error details from response if available + if hasattr(e, "response") and e.response is not None: + try: + error_details = e.response.json() + error_message = ( + f"{error_message}: {error_details.get('message', '')}" + ) + logger.error( + f"Response status: {e.response.status_code}, Error details: {error_details}" + ) + except (ValueError, KeyError): + # If we can't parse the JSON, just log the raw content + content_str = ( + e.response.content.decode("utf-8", errors="replace") + if isinstance(e.response.content, bytes) + else str(e.response.content) + ) + logger.error( + f"Response status: {e.response.status_code}, Raw content: {content_str}" + ) + pass + + # Re-raise as a RequestError + from databricks.sql.exc import RequestError + + raise RequestError(error_message, e) From a91f1447ced432ec36aee9491113fdc681d586ee Mon Sep 17 00:00:00 2001 From: varun-edachali-dbx Date: Tue, 3 Jun 2025 09:30:52 +0000 Subject: [PATCH 2/9] reduce verbosity Signed-off-by: varun-edachali-dbx --- .../sql/backend/utils/http_client.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/databricks/sql/backend/utils/http_client.py b/src/databricks/sql/backend/utils/http_client.py index 82980792c..20d32b625 100644 --- a/src/databricks/sql/backend/utils/http_client.py +++ b/src/databricks/sql/backend/utils/http_client.py @@ -142,31 +142,29 @@ def _make_request( return {} except requests.exceptions.RequestException as e: - # Handle request errors + # Handle request errors and extract details from response if available error_message = f"SEA HTTP request failed: {str(e)}" - logger.error(error_message) - # Extract error details from response if available if hasattr(e, "response") and e.response is not None: + status_code = e.response.status_code try: error_details = e.response.json() error_message = ( f"{error_message}: {error_details.get('message', '')}" ) logger.error( - f"Response status: {e.response.status_code}, Error details: {error_details}" + f"Request failed (status {status_code}): {error_details}" ) except (ValueError, KeyError): - # If we can't parse the JSON, just log the raw content - content_str = ( + # If we can't parse JSON, log raw content + content = ( e.response.content.decode("utf-8", errors="replace") if isinstance(e.response.content, bytes) else str(e.response.content) ) - logger.error( - f"Response status: {e.response.status_code}, Raw content: {content_str}" - ) - pass + logger.error(f"Request failed (status {status_code}): {content}") + else: + logger.error(error_message) # Re-raise as a RequestError from databricks.sql.exc import RequestError From c0b63b26d47d6e9f785ab4fcaf3256e50929863d Mon Sep 17 00:00:00 2001 From: varun-edachali-dbx Date: Tue, 3 Jun 2025 10:16:19 +0000 Subject: [PATCH 3/9] redundant comment Signed-off-by: varun-edachali-dbx --- src/databricks/sql/backend/utils/http_client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/databricks/sql/backend/utils/http_client.py b/src/databricks/sql/backend/utils/http_client.py index 20d32b625..742e9d88d 100644 --- a/src/databricks/sql/backend/utils/http_client.py +++ b/src/databricks/sql/backend/utils/http_client.py @@ -116,7 +116,6 @@ def _make_request( elif method.upper() == "POST": response = self.session.post(url, headers=headers, json=data) elif method.upper() == "DELETE": - # For DELETE requests, use params for data (query parameters) response = self.session.delete(url, headers=headers, params=data) else: raise ValueError(f"Unsupported HTTP method: {method}") From 6a17773c7cf0f50e20c54bc1008340102b703988 Mon Sep 17 00:00:00 2001 From: varun-edachali-dbx Date: Tue, 3 Jun 2025 10:21:01 +0000 Subject: [PATCH 4/9] reduce redundancy, params and data separate Signed-off-by: varun-edachali-dbx --- .../sql/backend/utils/http_client.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/databricks/sql/backend/utils/http_client.py b/src/databricks/sql/backend/utils/http_client.py index 742e9d88d..892384684 100644 --- a/src/databricks/sql/backend/utils/http_client.py +++ b/src/databricks/sql/backend/utils/http_client.py @@ -88,7 +88,11 @@ def _get_auth_headers(self) -> Dict[str, str]: return headers def _make_request( - self, method: str, path: str, data: Optional[Dict[str, Any]] = None + self, + method: str, + path: str, + data: Optional[Dict[str, Any]] = None, + params: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Make an HTTP request to the SEA endpoint. @@ -97,6 +101,7 @@ def _make_request( method: HTTP method (GET, POST, DELETE) path: API endpoint path data: Request payload data + params: Query parameters Returns: Dict[str, Any]: Response data parsed from JSON @@ -111,12 +116,18 @@ def _make_request( logger.debug(f"making {method} request to {url}") try: + kwargs = { + "url": url, + "headers": headers, + "json": data, + "params": params, + } if method.upper() == "GET": - response = self.session.get(url, headers=headers, params=data) + response = self.session.get(**kwargs) elif method.upper() == "POST": - response = self.session.post(url, headers=headers, json=data) + response = self.session.post(**kwargs) elif method.upper() == "DELETE": - response = self.session.delete(url, headers=headers, params=data) + response = self.session.delete(**kwargs) else: raise ValueError(f"Unsupported HTTP method: {method}") From 446126c29ee0a1acaac2c52571fbf5b76b0eeb1b Mon Sep 17 00:00:00 2001 From: varun-edachali-dbx Date: Tue, 3 Jun 2025 12:39:08 +0000 Subject: [PATCH 5/9] rename client Signed-off-by: varun-edachali-dbx --- src/databricks/sql/backend/utils/http_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/sql/backend/utils/http_client.py b/src/databricks/sql/backend/utils/http_client.py index 892384684..746af94c3 100644 --- a/src/databricks/sql/backend/utils/http_client.py +++ b/src/databricks/sql/backend/utils/http_client.py @@ -10,7 +10,7 @@ logger = logging.getLogger(__name__) -class CustomHttpClient: +class SeaHttpClient: """ HTTP client for Statement Execution API (SEA). From ae93e9de618fd8635914f24a638c8daf547588b8 Mon Sep 17 00:00:00 2001 From: varun-edachali-dbx Date: Tue, 3 Jun 2025 12:45:03 +0000 Subject: [PATCH 6/9] fix type issues Signed-off-by: varun-edachali-dbx --- .../sql/backend/utils/http_client.py | 35 ++++++++++++------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/src/databricks/sql/backend/utils/http_client.py b/src/databricks/sql/backend/utils/http_client.py index 746af94c3..b8de45060 100644 --- a/src/databricks/sql/backend/utils/http_client.py +++ b/src/databricks/sql/backend/utils/http_client.py @@ -1,7 +1,7 @@ import json import logging import requests -from typing import Dict, Any, Optional, Union, List +from typing import Dict, Any, Optional, Union, List, Tuple from urllib.parse import urljoin from databricks.sql.auth.authenticators import AuthProvider @@ -23,7 +23,7 @@ def __init__( server_hostname: str, port: int, http_path: str, - http_headers: List[tuple], + http_headers: List[Tuple[str, str]], auth_provider: AuthProvider, ssl_options: SSLOptions, **kwargs, @@ -49,7 +49,7 @@ def __init__( self.base_url = f"https://{server_hostname}:{port}" - self.headers = dict(http_headers) + self.headers: Dict[str, str] = dict(http_headers) self.headers.update({"Content-Type": "application/json"}) self.max_retries = kwargs.get("_retry_stop_after_attempts_count", 30) @@ -111,23 +111,32 @@ def _make_request( """ url = urljoin(self.base_url, path) - headers = {**self.headers, **self._get_auth_headers()} + headers: Dict[str, str] = {**self.headers, **self._get_auth_headers()} logger.debug(f"making {method} request to {url}") try: - kwargs = { - "url": url, - "headers": headers, - "json": data, - "params": params, - } if method.upper() == "GET": - response = self.session.get(**kwargs) + response = self.session.get( + url=url, + headers=headers, + json=data, + params=params, + ) elif method.upper() == "POST": - response = self.session.post(**kwargs) + response = self.session.post( + url=url, + headers=headers, + json=data, + params=params, + ) elif method.upper() == "DELETE": - response = self.session.delete(**kwargs) + response = self.session.delete( + url=url, + headers=headers, + json=data, + params=params, + ) else: raise ValueError(f"Unsupported HTTP method: {method}") From cb61181d74f6e9cc26794d1f56701c62ab5f80d4 Mon Sep 17 00:00:00 2001 From: varun-edachali-dbx Date: Wed, 4 Jun 2025 01:28:20 +0000 Subject: [PATCH 7/9] reduce repetition in request calls Signed-off-by: varun-edachali-dbx --- .../sql/backend/utils/http_client.py | 44 +++++++++---------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/src/databricks/sql/backend/utils/http_client.py b/src/databricks/sql/backend/utils/http_client.py index b8de45060..1634879fd 100644 --- a/src/databricks/sql/backend/utils/http_client.py +++ b/src/databricks/sql/backend/utils/http_client.py @@ -1,7 +1,7 @@ import json import logging import requests -from typing import Dict, Any, Optional, Union, List, Tuple +from typing import Callable, Dict, Any, Optional, Union, List, Tuple from urllib.parse import urljoin from databricks.sql.auth.authenticators import AuthProvider @@ -87,6 +87,18 @@ def _get_auth_headers(self) -> Dict[str, str]: self.auth_provider.add_headers(headers) return headers + def _get_call(self, method: str) -> Callable: + """Get the appropriate HTTP method function.""" + method = method.upper() + if method == "GET": + return self.session.get + elif method == "POST": + return self.session.post + elif method == "DELETE": + return self.session.delete + else: + raise ValueError(f"Unsupported HTTP method: {method}") + def _make_request( self, method: str, @@ -116,29 +128,13 @@ def _make_request( logger.debug(f"making {method} request to {url}") try: - if method.upper() == "GET": - response = self.session.get( - url=url, - headers=headers, - json=data, - params=params, - ) - elif method.upper() == "POST": - response = self.session.post( - url=url, - headers=headers, - json=data, - params=params, - ) - elif method.upper() == "DELETE": - response = self.session.delete( - url=url, - headers=headers, - json=data, - params=params, - ) - else: - raise ValueError(f"Unsupported HTTP method: {method}") + call = self._get_call(method) + response = call( + url=url, + headers=headers, + json=data, + params=params, + ) # Check for HTTP errors response.raise_for_status() From 0d472bdafb09030e10e1f02e93a7943c9ddeaee0 Mon Sep 17 00:00:00 2001 From: varun-edachali-dbx Date: Wed, 4 Jun 2025 03:34:06 +0000 Subject: [PATCH 8/9] remove un-necessary elifs Signed-off-by: varun-edachali-dbx --- src/databricks/sql/backend/utils/http_client.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/databricks/sql/backend/utils/http_client.py b/src/databricks/sql/backend/utils/http_client.py index 1634879fd..f0b931ee4 100644 --- a/src/databricks/sql/backend/utils/http_client.py +++ b/src/databricks/sql/backend/utils/http_client.py @@ -92,12 +92,11 @@ def _get_call(self, method: str) -> Callable: method = method.upper() if method == "GET": return self.session.get - elif method == "POST": + if method == "POST": return self.session.post - elif method == "DELETE": + if method == "DELETE": return self.session.delete - else: - raise ValueError(f"Unsupported HTTP method: {method}") + raise ValueError(f"Unsupported HTTP method: {method}") def _make_request( self, From be41f472cec5c3ea2ce20417ff86ac5443330633 Mon Sep 17 00:00:00 2001 From: varun-edachali-dbx Date: Wed, 4 Jun 2025 05:53:03 +0000 Subject: [PATCH 9/9] add newline at EOF Signed-off-by: varun-edachali-dbx --- examples/experimental/sea_connector_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/experimental/sea_connector_test.py b/examples/experimental/sea_connector_test.py index a27099da7..abe6bd1ab 100644 --- a/examples/experimental/sea_connector_test.py +++ b/examples/experimental/sea_connector_test.py @@ -63,4 +63,4 @@ def test_sea_session(): logger.info("SEA session test completed successfully") if __name__ == "__main__": - test_sea_session() \ No newline at end of file + test_sea_session()