Skip to content

Commit b3a6f55

Browse files
authored
Revert "Enhance Cursor close handling and context manager exception m… (#613)
* Revert "Enhance Cursor close handling and context manager exception management to prevent server side resource leaks (#554)" This reverts commit edfb283. * revert e2e
1 parent 4cebc36 commit b3a6f55

File tree

3 files changed

+4
-269
lines changed

3 files changed

+4
-269
lines changed

src/databricks/sql/client.py

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -321,13 +321,7 @@ def __enter__(self) -> "Connection":
321321
return self
322322

323323
def __exit__(self, exc_type, exc_value, traceback):
324-
try:
325-
self.close()
326-
except BaseException as e:
327-
logger.warning(f"Exception during connection close in __exit__: {e}")
328-
if exc_type is None:
329-
raise
330-
return False
324+
self.close()
331325

332326
def __del__(self):
333327
if self.open:
@@ -468,14 +462,7 @@ def __enter__(self) -> "Cursor":
468462
return self
469463

470464
def __exit__(self, exc_type, exc_value, traceback):
471-
try:
472-
logger.debug("Cursor context manager exiting, calling close()")
473-
self.close()
474-
except BaseException as e:
475-
logger.warning(f"Exception during cursor close in __exit__: {e}")
476-
if exc_type is None:
477-
raise
478-
return False
465+
self.close()
479466

480467
def __iter__(self):
481468
if self.active_result_set:
@@ -1185,21 +1172,7 @@ def cancel(self) -> None:
11851172
def close(self) -> None:
11861173
"""Close cursor"""
11871174
self.open = False
1188-
1189-
# Close active operation handle if it exists
1190-
if self.active_op_handle:
1191-
try:
1192-
self.thrift_backend.close_command(self.active_op_handle)
1193-
except RequestError as e:
1194-
if isinstance(e.args[1], CursorAlreadyClosedError):
1195-
logger.info("Operation was canceled by a prior request")
1196-
else:
1197-
logging.warning(f"Error closing operation handle: {e}")
1198-
except Exception as e:
1199-
logging.warning(f"Error closing operation handle: {e}")
1200-
finally:
1201-
self.active_op_handle = None
1202-
1175+
self.active_op_handle = None
12031176
if self.active_result_set:
12041177
self._close_and_clear_active_result_set()
12051178

tests/e2e/test_driver.py

Lines changed: 1 addition & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050

5151
from tests.e2e.common.uc_volume_tests import PySQLUCVolumeTestSuiteMixin
5252

53-
from databricks.sql.exc import SessionAlreadyClosedError, CursorAlreadyClosedError
53+
from databricks.sql.exc import SessionAlreadyClosedError
5454

5555
log = logging.getLogger(__name__)
5656

@@ -808,146 +808,6 @@ def test_catalogs_returns_arrow_table(self):
808808
results = cursor.fetchall_arrow()
809809
assert isinstance(results, pyarrow.Table)
810810

811-
def test_close_connection_closes_cursors(self):
812-
813-
from databricks.sql.thrift_api.TCLIService import ttypes
814-
815-
with self.connection() as conn:
816-
cursor = conn.cursor()
817-
cursor.execute(
818-
"SELECT id, id `id2`, id `id3` FROM RANGE(1000000) order by RANDOM()"
819-
)
820-
ars = cursor.active_result_set
821-
822-
# We must manually run this check because thrift_backend always forces `has_been_closed_server_side` to True
823-
# Cursor op state should be open before connection is closed
824-
status_request = ttypes.TGetOperationStatusReq(
825-
operationHandle=ars.command_id, getProgressUpdate=False
826-
)
827-
op_status_at_server = ars.thrift_backend._client.GetOperationStatus(
828-
status_request
829-
)
830-
assert (
831-
op_status_at_server.operationState
832-
!= ttypes.TOperationState.CLOSED_STATE
833-
)
834-
835-
conn.close()
836-
837-
# When connection closes, any cursor operations should no longer exist at the server
838-
with pytest.raises(SessionAlreadyClosedError) as cm:
839-
op_status_at_server = ars.thrift_backend._client.GetOperationStatus(
840-
status_request
841-
)
842-
843-
def test_closing_a_closed_connection_doesnt_fail(self, caplog):
844-
caplog.set_level(logging.DEBUG)
845-
# Second .close() call is when this context manager exits
846-
with self.connection() as conn:
847-
# First .close() call is explicit here
848-
conn.close()
849-
assert "Session appears to have been closed already" in caplog.text
850-
851-
conn = None
852-
try:
853-
with pytest.raises(KeyboardInterrupt):
854-
with self.connection() as c:
855-
conn = c
856-
raise KeyboardInterrupt("Simulated interrupt")
857-
finally:
858-
if conn is not None:
859-
assert (
860-
not conn.open
861-
), "Connection should be closed after KeyboardInterrupt"
862-
863-
def test_cursor_close_properly_closes_operation(self):
864-
"""Test that Cursor.close() properly closes the active operation handle on the server."""
865-
with self.connection() as conn:
866-
cursor = conn.cursor()
867-
try:
868-
cursor.execute("SELECT 1 AS test")
869-
assert cursor.active_op_handle is not None
870-
cursor.close()
871-
assert cursor.active_op_handle is None
872-
assert not cursor.open
873-
finally:
874-
if cursor.open:
875-
cursor.close()
876-
877-
conn = None
878-
cursor = None
879-
try:
880-
with self.connection() as c:
881-
conn = c
882-
with pytest.raises(KeyboardInterrupt):
883-
with conn.cursor() as cur:
884-
cursor = cur
885-
raise KeyboardInterrupt("Simulated interrupt")
886-
finally:
887-
if cursor is not None:
888-
assert (
889-
not cursor.open
890-
), "Cursor should be closed after KeyboardInterrupt"
891-
892-
def test_nested_cursor_context_managers(self):
893-
"""Test that nested cursor context managers properly close operations on the server."""
894-
with self.connection() as conn:
895-
with conn.cursor() as cursor1:
896-
cursor1.execute("SELECT 1 AS test1")
897-
assert cursor1.active_op_handle is not None
898-
899-
with conn.cursor() as cursor2:
900-
cursor2.execute("SELECT 2 AS test2")
901-
assert cursor2.active_op_handle is not None
902-
903-
# After inner context manager exit, cursor2 should be not open
904-
assert not cursor2.open
905-
assert cursor2.active_op_handle is None
906-
907-
# After outer context manager exit, cursor1 should be not open
908-
assert not cursor1.open
909-
assert cursor1.active_op_handle is None
910-
911-
def test_cursor_error_handling(self):
912-
"""Test that cursor close handles errors properly to prevent orphaned operations."""
913-
with self.connection() as conn:
914-
cursor = conn.cursor()
915-
916-
cursor.execute("SELECT 1 AS test")
917-
918-
op_handle = cursor.active_op_handle
919-
920-
assert op_handle is not None
921-
922-
# Manually close the operation to simulate server-side closure
923-
conn.thrift_backend.close_command(op_handle)
924-
925-
cursor.close()
926-
927-
assert not cursor.open
928-
929-
def test_result_set_close(self):
930-
"""Test that ResultSet.close() properly closes operations on the server and handles state correctly."""
931-
with self.connection() as conn:
932-
cursor = conn.cursor()
933-
try:
934-
cursor.execute("SELECT * FROM RANGE(10)")
935-
936-
result_set = cursor.active_result_set
937-
assert result_set is not None
938-
939-
initial_op_state = result_set.op_state
940-
941-
result_set.close()
942-
943-
assert result_set.op_state == result_set.thrift_backend.CLOSED_OP_STATE
944-
assert result_set.op_state != initial_op_state
945-
946-
# Closing the result set again should be a no-op and not raise exceptions
947-
result_set.close()
948-
finally:
949-
cursor.close()
950-
951811

952812
# use a RetrySuite to encapsulate these tests which we'll typically want to run together; however keep
953813
# the 429/503 subsuites separate since they execute under different circumstances.

tests/unit/test_client.py

Lines changed: 0 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import databricks.sql
2222
import databricks.sql.client as client
2323
from databricks.sql import InterfaceError, DatabaseError, Error, NotSupportedError
24-
from databricks.sql.exc import RequestError, CursorAlreadyClosedError
2524
from databricks.sql.types import Row
2625

2726
from databricks.sql.utils import ExecuteResponse
@@ -342,15 +341,6 @@ def test_context_manager_closes_cursor(self):
342341
cursor.close = mock_close
343342
mock_close.assert_called_once_with()
344343

345-
cursor = client.Cursor(Mock(), Mock())
346-
cursor.close = Mock()
347-
try:
348-
with self.assertRaises(KeyboardInterrupt):
349-
with cursor:
350-
raise KeyboardInterrupt("Simulated interrupt")
351-
finally:
352-
cursor.close.assert_called()
353-
354344
@patch("%s.client.ThriftBackend" % PACKAGE_NAME)
355345
def test_context_manager_closes_connection(self, mock_client_class):
356346
instance = mock_client_class.return_value
@@ -366,15 +356,6 @@ def test_context_manager_closes_connection(self, mock_client_class):
366356
close_session_id = instance.close_session.call_args[0][0].sessionId
367357
self.assertEqual(close_session_id, b"\x22")
368358

369-
connection = databricks.sql.connect(**self.DUMMY_CONNECTION_ARGS)
370-
connection.close = Mock()
371-
try:
372-
with self.assertRaises(KeyboardInterrupt):
373-
with connection:
374-
raise KeyboardInterrupt("Simulated interrupt")
375-
finally:
376-
connection.close.assert_called()
377-
378359
def dict_product(self, dicts):
379360
"""
380361
Generate cartesion product of values in input dictionary, outputting a dictionary
@@ -753,42 +734,6 @@ def test_access_current_query_id(self):
753734
cursor.close()
754735
self.assertIsNone(cursor.query_id)
755736

756-
def test_cursor_close_handles_exception(self):
757-
"""Test that Cursor.close() handles exceptions from close_command properly."""
758-
mock_backend = Mock()
759-
mock_connection = Mock()
760-
mock_op_handle = Mock()
761-
762-
mock_backend.close_command.side_effect = Exception("Test error")
763-
764-
cursor = client.Cursor(mock_connection, mock_backend)
765-
cursor.active_op_handle = mock_op_handle
766-
767-
cursor.close()
768-
769-
mock_backend.close_command.assert_called_once_with(mock_op_handle)
770-
771-
self.assertIsNone(cursor.active_op_handle)
772-
773-
self.assertFalse(cursor.open)
774-
775-
def test_cursor_context_manager_handles_exit_exception(self):
776-
"""Test that cursor's context manager handles exceptions during __exit__."""
777-
mock_backend = Mock()
778-
mock_connection = Mock()
779-
780-
cursor = client.Cursor(mock_connection, mock_backend)
781-
original_close = cursor.close
782-
cursor.close = Mock(side_effect=Exception("Test error during close"))
783-
784-
try:
785-
with cursor:
786-
raise ValueError("Test error inside context")
787-
except ValueError:
788-
pass
789-
790-
cursor.close.assert_called_once()
791-
792737
def test_connection_close_handles_cursor_close_exception(self):
793738
"""Test that _close handles exceptions from cursor.close() properly."""
794739
cursors_closed = []
@@ -824,49 +769,6 @@ def mock_close_normal():
824769
cursors_closed, [1, 2], "Both cursors should have close called"
825770
)
826771

827-
def test_resultset_close_handles_cursor_already_closed_error(self):
828-
"""Test that ResultSet.close() handles CursorAlreadyClosedError properly."""
829-
result_set = client.ResultSet.__new__(client.ResultSet)
830-
result_set.thrift_backend = Mock()
831-
result_set.thrift_backend.CLOSED_OP_STATE = "CLOSED"
832-
result_set.connection = Mock()
833-
result_set.connection.open = True
834-
result_set.op_state = "RUNNING"
835-
result_set.has_been_closed_server_side = False
836-
result_set.command_id = Mock()
837-
838-
class MockRequestError(Exception):
839-
def __init__(self):
840-
self.args = ["Error message", CursorAlreadyClosedError()]
841-
842-
result_set.thrift_backend.close_command.side_effect = MockRequestError()
843-
844-
original_close = client.ResultSet.close
845-
try:
846-
try:
847-
if (
848-
result_set.op_state != result_set.thrift_backend.CLOSED_OP_STATE
849-
and not result_set.has_been_closed_server_side
850-
and result_set.connection.open
851-
):
852-
result_set.thrift_backend.close_command(result_set.command_id)
853-
except MockRequestError as e:
854-
if isinstance(e.args[1], CursorAlreadyClosedError):
855-
pass
856-
finally:
857-
result_set.has_been_closed_server_side = True
858-
result_set.op_state = result_set.thrift_backend.CLOSED_OP_STATE
859-
860-
result_set.thrift_backend.close_command.assert_called_once_with(
861-
result_set.command_id
862-
)
863-
864-
assert result_set.has_been_closed_server_side is True
865-
866-
assert result_set.op_state == result_set.thrift_backend.CLOSED_OP_STATE
867-
finally:
868-
pass
869-
870772

871773
if __name__ == "__main__":
872774
suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])

0 commit comments

Comments
 (0)