Skip to content

Generalise Backend Layer #604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5e5147b
Separate Session related functionality from Connection class (#571)
varun-edachali-dbx May 28, 2025
57370b3
Introduce Backend Interface (DatabricksClient) (#573)
varun-edachali-dbx May 30, 2025
75752bf
Implement ResultSet Abstraction (backend interfaces for fetch phase) …
varun-edachali-dbx Jun 3, 2025
450b80d
remove un-necessary initialisation assertions
varun-edachali-dbx Jun 18, 2025
a926f02
remove un-necessary line break s
varun-edachali-dbx Jun 18, 2025
55ad001
more un-necessary line breaks
varun-edachali-dbx Jun 18, 2025
fa15730
constrain diff of test_closing_connection_closes_commands
varun-edachali-dbx Jun 18, 2025
019c7fb
reduce diff of test_closing_connection_closes_commands
varun-edachali-dbx Jun 18, 2025
726abe7
use pytest-like assertions for test_closing_connection_closes_commands
varun-edachali-dbx Jun 18, 2025
bf6d41c
ensure command_id is not None
varun-edachali-dbx Jun 18, 2025
5afa733
line breaks after multi-line pyfocs
varun-edachali-dbx Jun 18, 2025
e3dfd36
ensure non null operationHandle for commandId creation
varun-edachali-dbx Jun 18, 2025
63360b3
use command_id methods instead of explicit guid_to_hex_id conversion
varun-edachali-dbx Jun 18, 2025
13ffb8d
remove un-necessary artifacts in test_session, add back assertion
varun-edachali-dbx Jun 18, 2025
d759050
add from __future__ import annotations to remove string literals arou…
varun-edachali-dbx Jun 19, 2025
1e21434
move docstring of DatabricksClient within class
varun-edachali-dbx Jun 24, 2025
cd4015b
move ThriftResultSet import to top of file
varun-edachali-dbx Jun 24, 2025
ed8b610
make backend/utils __init__ file empty
varun-edachali-dbx Jun 24, 2025
94d951e
use from __future__ import annotations to remove string literals arou…
varun-edachali-dbx Jun 24, 2025
c20058e
use lazy logging
varun-edachali-dbx Jun 24, 2025
fe3acb1
replace getters with property tag
varun-edachali-dbx Jun 24, 2025
9fb6a76
Merge branch 'main' into backend-refactors
varun-edachali-dbx Jun 24, 2025
61dfc4d
set active_command_id to None, not active_op_handle
varun-edachali-dbx Jun 24, 2025
64fb9b2
align test_session with pytest instead of unittest
varun-edachali-dbx Jun 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
342 changes: 342 additions & 0 deletions src/databricks/sql/backend/databricks_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,342 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any, Union, TYPE_CHECKING

if TYPE_CHECKING:
from databricks.sql.client import Cursor
from databricks.sql.result_set import ResultSet

from databricks.sql.thrift_api.TCLIService import ttypes
from databricks.sql.backend.types import SessionId, CommandId, CommandState


class DatabricksClient(ABC):
"""
Abstract client interface for interacting with Databricks SQL services.

Implementations of this class are responsible for:
- Managing connections to Databricks SQL services
- Executing SQL queries and commands
- Retrieving query results
- Fetching metadata about catalogs, schemas, tables, and columns
"""

# == Connection and Session Management ==
@abstractmethod
def open_session(
self,
session_configuration: Optional[Dict[str, Any]],
catalog: Optional[str],
schema: Optional[str],
) -> SessionId:
"""
Opens a new session with the Databricks SQL service.

This method establishes a new session with the server and returns a session
identifier that can be used for subsequent operations.

Args:
session_configuration: Optional dictionary of configuration parameters for the session
catalog: Optional catalog name to use as the initial catalog for the session
schema: Optional schema name to use as the initial schema for the session

Returns:
SessionId: A session identifier object that can be used for subsequent operations

Raises:
Error: If the session configuration is invalid
OperationalError: If there's an error establishing the session
InvalidServerResponseError: If the server response is invalid or unexpected
"""
pass

@abstractmethod
def close_session(self, session_id: SessionId) -> None:
"""
Closes an existing session with the Databricks SQL service.

This method terminates the session identified by the given session ID and
releases any resources associated with it.

Args:
session_id: The session identifier returned by open_session()

Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error closing the session
"""
pass

# == Query Execution, Command Management ==
@abstractmethod
def execute_command(
self,
operation: str,
session_id: SessionId,
max_rows: int,
max_bytes: int,
lz4_compression: bool,
cursor: Cursor,
use_cloud_fetch: bool,
parameters: List[ttypes.TSparkParameter],
async_op: bool,
enforce_embedded_schema_correctness: bool,
) -> Union[ResultSet, None]:
"""
Executes a SQL command or query within the specified session.

This method sends a SQL command to the server for execution and handles
the response. It can operate in both synchronous and asynchronous modes.

Args:
operation: The SQL command or query to execute
session_id: The session identifier in which to execute the command
max_rows: Maximum number of rows to fetch in a single fetch batch
max_bytes: Maximum number of bytes to fetch in a single fetch batch
lz4_compression: Whether to use LZ4 compression for result data
cursor: The cursor object that will handle the results
use_cloud_fetch: Whether to use cloud fetch for retrieving large result sets
parameters: List of parameters to bind to the query
async_op: Whether to execute the command asynchronously
enforce_embedded_schema_correctness: Whether to enforce schema correctness

Returns:
If async_op is False, returns a ResultSet object containing the
query results and metadata. If async_op is True, returns None and the
results must be fetched later using get_execution_result().

Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error executing the command
ServerOperationError: If the server encounters an error during execution
"""
pass

@abstractmethod
def cancel_command(self, command_id: CommandId) -> None:
"""
Cancels a running command or query.

This method attempts to cancel a command that is currently being executed.
It can be called from a different thread than the one executing the command.

Args:
command_id: The command identifier to cancel

Raises:
ValueError: If the command ID is invalid
OperationalError: If there's an error canceling the command
"""
pass

@abstractmethod
def close_command(self, command_id: CommandId) -> None:
"""
Closes a command and releases associated resources.

This method informs the server that the client is done with the command
and any resources associated with it can be released.

Args:
command_id: The command identifier to close

Raises:
ValueError: If the command ID is invalid
OperationalError: If there's an error closing the command
"""
pass

@abstractmethod
def get_query_state(self, command_id: CommandId) -> CommandState:
"""
Gets the current state of a query or command.

This method retrieves the current execution state of a command from the server.

Args:
command_id: The command identifier to check

Returns:
CommandState: The current state of the command

Raises:
ValueError: If the command ID is invalid
OperationalError: If there's an error retrieving the state
ServerOperationError: If the command is in an error state
DatabaseError: If the command has been closed unexpectedly
"""
pass

@abstractmethod
def get_execution_result(
self,
command_id: CommandId,
cursor: Cursor,
) -> ResultSet:
"""
Retrieves the results of a previously executed command.

This method fetches the results of a command that was executed asynchronously
or retrieves additional results from a command that has more rows available.

Args:
command_id: The command identifier for which to retrieve results
cursor: The cursor object that will handle the results

Returns:
ResultSet: An object containing the query results and metadata

Raises:
ValueError: If the command ID is invalid
OperationalError: If there's an error retrieving the results
"""
pass

# == Metadata Operations ==
@abstractmethod
def get_catalogs(
self,
session_id: SessionId,
max_rows: int,
max_bytes: int,
cursor: Cursor,
) -> ResultSet:
"""
Retrieves a list of available catalogs.

This method fetches metadata about all catalogs available in the current
session's context.

Args:
session_id: The session identifier
max_rows: Maximum number of rows to fetch in a single batch
max_bytes: Maximum number of bytes to fetch in a single batch
cursor: The cursor object that will handle the results

Returns:
ResultSet: An object containing the catalog metadata

Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error retrieving the catalogs
"""
pass

@abstractmethod
def get_schemas(
self,
session_id: SessionId,
max_rows: int,
max_bytes: int,
cursor: Cursor,
catalog_name: Optional[str] = None,
schema_name: Optional[str] = None,
) -> ResultSet:
"""
Retrieves a list of schemas, optionally filtered by catalog and schema name patterns.

This method fetches metadata about schemas available in the specified catalog
or all catalogs if no catalog is specified.

Args:
session_id: The session identifier
max_rows: Maximum number of rows to fetch in a single batch
max_bytes: Maximum number of bytes to fetch in a single batch
cursor: The cursor object that will handle the results
catalog_name: Optional catalog name pattern to filter by
schema_name: Optional schema name pattern to filter by

Returns:
ResultSet: An object containing the schema metadata

Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error retrieving the schemas
"""
pass

@abstractmethod
def get_tables(
self,
session_id: SessionId,
max_rows: int,
max_bytes: int,
cursor: Cursor,
catalog_name: Optional[str] = None,
schema_name: Optional[str] = None,
table_name: Optional[str] = None,
table_types: Optional[List[str]] = None,
) -> ResultSet:
"""
Retrieves a list of tables, optionally filtered by catalog, schema, table name, and table types.

This method fetches metadata about tables available in the specified catalog
and schema, or all catalogs and schemas if not specified.

Args:
session_id: The session identifier
max_rows: Maximum number of rows to fetch in a single batch
max_bytes: Maximum number of bytes to fetch in a single batch
cursor: The cursor object that will handle the results
catalog_name: Optional catalog name pattern to filter by
schema_name: Optional schema name pattern to filter by
table_name: Optional table name pattern to filter by
table_types: Optional list of table types to filter by (e.g., ['TABLE', 'VIEW'])

Returns:
ResultSet: An object containing the table metadata

Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error retrieving the tables
"""
pass

@abstractmethod
def get_columns(
self,
session_id: SessionId,
max_rows: int,
max_bytes: int,
cursor: Cursor,
catalog_name: Optional[str] = None,
schema_name: Optional[str] = None,
table_name: Optional[str] = None,
column_name: Optional[str] = None,
) -> ResultSet:
"""
Retrieves a list of columns, optionally filtered by catalog, schema, table, and column name patterns.

This method fetches metadata about columns available in the specified table,
or all tables if not specified.

Args:
session_id: The session identifier
max_rows: Maximum number of rows to fetch in a single batch
max_bytes: Maximum number of bytes to fetch in a single batch
cursor: The cursor object that will handle the results
catalog_name: Optional catalog name pattern to filter by
schema_name: Optional schema name pattern to filter by
table_name: Optional table name pattern to filter by
column_name: Optional column name pattern to filter by

Returns:
ResultSet: An object containing the column metadata

Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error retrieving the columns
"""
pass

@property
@abstractmethod
def max_download_threads(self) -> int:
"""
Gets the maximum number of download threads for cloud fetch operations.

Returns:
int: The maximum number of download threads
"""
pass
Loading
Loading