Skip to content

Commit ace72e9

Browse files
authored
Propagate errors across all results in a transaction (#973)
* Propagate errors across all results in a transaction A new error `ResultFailedError` is introduced. It will be raised when using a `Result` object after the result or another result in the same transaction has failed. User code would only ever run into this situation when catch exceptions and deciding to ignore them. Now, an error will be raised instead of undefined behavior. The undefined behavior before this fix could be (among other things) protocol violations, incomplete summary data, and hard to interpret errors. * Fix missing space in error message * Add unit tests for transaction error propagation
1 parent 41620cd commit ace72e9

File tree

13 files changed

+200
-15
lines changed

13 files changed

+200
-15
lines changed

docs/source/api.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1902,6 +1902,8 @@ Client-side errors
19021902

19031903
* :class:`neo4j.exceptions.ResultError`
19041904

1905+
* :class:`neo4j.exceptions.ResultFailedError`
1906+
19051907
* :class:`neo4j.exceptions.ResultConsumedError`
19061908

19071909
* :class:`neo4j.exceptions.ResultNotSingleError`
@@ -1946,6 +1948,9 @@ Client-side errors
19461948
:show-inheritance:
19471949
:members: result
19481950

1951+
.. autoexception:: neo4j.exceptions.ResultFailedError()
1952+
:show-inheritance:
1953+
19491954
.. autoexception:: neo4j.exceptions.ResultConsumedError()
19501955
:show-inheritance:
19511956

src/neo4j/_async/work/result.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
)
3939
from ...exceptions import (
4040
ResultConsumedError,
41+
ResultFailedError,
4142
ResultNotSingleError,
4243
)
4344
from ...time import (
@@ -57,6 +58,10 @@
5758
_TResultKey = t.Union[int, str]
5859

5960

61+
_RESULT_FAILED_ERROR = (
62+
"The result has failed. Either this result or another result in the same "
63+
"transaction has encountered an error."
64+
)
6065
_RESULT_OUT_OF_SCOPE_ERROR = (
6166
"The result is out of scope. The associated transaction "
6267
"has been closed. Results can only be used while the "
@@ -76,8 +81,11 @@ class AsyncResult:
7681
"""
7782

7883
def __init__(self, connection, fetch_size, on_closed, on_error):
79-
self._connection = ConnectionErrorHandler(connection, on_error)
84+
self._connection = ConnectionErrorHandler(
85+
connection, self._connection_error_handler
86+
)
8087
self._hydration_scope = connection.new_hydration_scope()
88+
self._on_error = on_error
8189
self._on_closed = on_closed
8290
self._metadata = None
8391
self._keys = None
@@ -101,6 +109,13 @@ def __init__(self, connection, fetch_size, on_closed, on_error):
101109
self._consumed = False
102110
# the result has been closed as a result of closing the transaction
103111
self._out_of_scope = False
112+
# exception shared across all results of a transaction
113+
self._exception = None
114+
115+
async def _connection_error_handler(self, exc):
116+
self._exception = exc
117+
self._attached = False
118+
await AsyncUtil.callback(self._on_error, exc)
104119

105120
@property
106121
def _qid(self):
@@ -257,6 +272,9 @@ async def __aiter__(self) -> t.AsyncIterator[Record]:
257272
await self._connection.send_all()
258273

259274
self._exhausted = True
275+
if self._exception is not None:
276+
raise ResultFailedError(self, _RESULT_FAILED_ERROR) \
277+
from self._exception
260278
if self._out_of_scope:
261279
raise ResultConsumedError(self, _RESULT_OUT_OF_SCOPE_ERROR)
262280
if self._consumed:
@@ -346,6 +364,11 @@ async def _tx_end(self):
346364
await self._exhaust()
347365
self._out_of_scope = True
348366

367+
def _tx_failure(self, exc):
368+
# Handle failure of the associated transaction.
369+
self._attached = False
370+
self._exception = exc
371+
349372
async def consume(self) -> ResultSummary:
350373
"""Consume the remainder of this result and return a :class:`neo4j.ResultSummary`.
351374

src/neo4j/_async/work/transaction.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ async def _result_on_closed_handler(self):
9292

9393
async def _error_handler(self, exc):
9494
self._last_error = exc
95+
for result in self._results:
96+
result._tx_failure(exc)
9597
if isinstance(exc, asyncio.CancelledError):
9698
self._cancel()
9799
return

src/neo4j/_sync/work/result.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
)
3939
from ...exceptions import (
4040
ResultConsumedError,
41+
ResultFailedError,
4142
ResultNotSingleError,
4243
)
4344
from ...time import (
@@ -57,6 +58,10 @@
5758
_TResultKey = t.Union[int, str]
5859

5960

61+
_RESULT_FAILED_ERROR = (
62+
"The result has failed. Either this result or another result in the same "
63+
"transaction has encountered an error."
64+
)
6065
_RESULT_OUT_OF_SCOPE_ERROR = (
6166
"The result is out of scope. The associated transaction "
6267
"has been closed. Results can only be used while the "
@@ -76,8 +81,11 @@ class Result:
7681
"""
7782

7883
def __init__(self, connection, fetch_size, on_closed, on_error):
79-
self._connection = ConnectionErrorHandler(connection, on_error)
84+
self._connection = ConnectionErrorHandler(
85+
connection, self._connection_error_handler
86+
)
8087
self._hydration_scope = connection.new_hydration_scope()
88+
self._on_error = on_error
8189
self._on_closed = on_closed
8290
self._metadata = None
8391
self._keys = None
@@ -101,6 +109,13 @@ def __init__(self, connection, fetch_size, on_closed, on_error):
101109
self._consumed = False
102110
# the result has been closed as a result of closing the transaction
103111
self._out_of_scope = False
112+
# exception shared across all results of a transaction
113+
self._exception = None
114+
115+
def _connection_error_handler(self, exc):
116+
self._exception = exc
117+
self._attached = False
118+
Util.callback(self._on_error, exc)
104119

105120
@property
106121
def _qid(self):
@@ -257,6 +272,9 @@ def __iter__(self) -> t.Iterator[Record]:
257272
self._connection.send_all()
258273

259274
self._exhausted = True
275+
if self._exception is not None:
276+
raise ResultFailedError(self, _RESULT_FAILED_ERROR) \
277+
from self._exception
260278
if self._out_of_scope:
261279
raise ResultConsumedError(self, _RESULT_OUT_OF_SCOPE_ERROR)
262280
if self._consumed:
@@ -346,6 +364,11 @@ def _tx_end(self):
346364
self._exhaust()
347365
self._out_of_scope = True
348366

367+
def _tx_failure(self, exc):
368+
# Handle failure of the associated transaction.
369+
self._attached = False
370+
self._exception = exc
371+
349372
def consume(self) -> ResultSummary:
350373
"""Consume the remainder of this result and return a :class:`neo4j.ResultSummary`.
351374

src/neo4j/_sync/work/transaction.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ def _result_on_closed_handler(self):
9292

9393
def _error_handler(self, exc):
9494
self._last_error = exc
95+
for result in self._results:
96+
result._tx_failure(exc)
9597
if isinstance(exc, asyncio.CancelledError):
9698
self._cancel()
9799
return

src/neo4j/exceptions.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
+ TransactionError
4141
+ TransactionNestingError
4242
+ ResultError
43+
+ ResultFailedError
4344
+ ResultConsumedError
4445
+ ResultNotSingleError
4546
+ BrokenRecordError
@@ -464,6 +465,17 @@ def __init__(self, result_, *args, **kwargs):
464465
self.result = result_
465466

466467

468+
# DriverError > ResultError > ResultFailedError
469+
class ResultFailedError(ResultError):
470+
"""Raised when trying to access records of a failed result.
471+
472+
A :class:`.Result` will be considered failed if
473+
* itself encountered an error while fetching records
474+
* another result within the same transaction encountered an error while
475+
fetching records
476+
"""
477+
478+
467479
# DriverError > ResultError > ResultConsumedError
468480
class ResultConsumedError(ResultError):
469481
"""Raised when trying to access records of a consumed result."""

testkitbackend/_async/requests.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ async def ExecuteQuery(backend, data):
401401

402402
def resolution_func(backend, custom_resolver=False, custom_dns_resolver=False):
403403
# This solution (putting custom resolution together with DNS resolution
404-
# into one function only works because the Python driver calls the custom
404+
# into one function) only works because the Python driver calls the custom
405405
# resolver function for every connection, which is not true for all
406406
# drivers. Properly exposing a way to change the DNS lookup behavior is not
407407
# possible without changing the driver's code.

testkitbackend/_sync/requests.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ def ExecuteQuery(backend, data):
401401

402402
def resolution_func(backend, custom_resolver=False, custom_dns_resolver=False):
403403
# This solution (putting custom resolution together with DNS resolution
404-
# into one function only works because the Python driver calls the custom
404+
# into one function) only works because the Python driver calls the custom
405405
# resolver function for every connection, which is not true for all
406406
# drivers. Properly exposing a way to change the DNS lookup behavior is not
407407
# possible without changing the driver's code.

testkitbackend/test_config.json

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,7 @@
1313
"'neo4j.datatypes.test_temporal_types.TestDataTypes.test_should_echo_all_timezone_ids'":
1414
"test_subtest_skips.dt_conversion",
1515
"'neo4j.datatypes.test_temporal_types.TestDataTypes.test_date_time_cypher_created_tz_id'":
16-
"test_subtest_skips.tz_id",
17-
"'stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_discard_after_tx_termination_on_run'":
18-
"Fixme: transactions don't prevent further actions after failure.",
19-
"'stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_pull_after_tx_termination_on_pull'":
20-
"Fixme: transactions don't prevent further actions after failure.",
21-
"'stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_pull_after_tx_termination_on_run'":
22-
"Fixme: transactions don't prevent further actions after failure."
16+
"test_subtest_skips.tz_id"
2317
},
2418
"features": {
2519
"Feature:API:BookmarkManager": true,

tests/unit/async_/fixtures/fake_connection.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from neo4j._async.io import AsyncBolt
2525
from neo4j._deadline import Deadline
2626
from neo4j.auth_management import AsyncAuthManager
27+
from neo4j.exceptions import Neo4jError
2728

2829

2930
__all__ = [
@@ -154,10 +155,12 @@ def set_script(self, callbacks):
154155
[
155156
("run", {"on_success": ({},), "on_summary": None}),
156157
("pull", {
158+
"on_records": ([some_record],),
157159
"on_success": None,
158160
"on_summary": None,
159-
"on_records":
160161
})
162+
# use any exception to throw it instead of calling handlers
163+
("commit", RuntimeError("oh no!"))
161164
]
162165
```
163166
Note that arguments can be `None`. In this case, ScriptedConnection
@@ -180,6 +183,9 @@ def func(*args, **kwargs):
180183
self._script_pos += 1
181184

182185
async def callback():
186+
if isinstance(scripted_callbacks, BaseException):
187+
raise scripted_callbacks
188+
error = None
183189
for cb_name, default_cb_args in (
184190
("on_ignored", ({},)),
185191
("on_failure", ({},)),
@@ -197,10 +203,14 @@ async def callback():
197203
if cb_args is None:
198204
cb_args = default_cb_args
199205
res = cb(*cb_args)
206+
if cb_name == "on_failure":
207+
error = Neo4jError.hydrate(**cb_args[0])
200208
try:
201209
await res # maybe the callback is async
202210
except TypeError:
203211
pass # or maybe it wasn't ;)
212+
if error is not None:
213+
raise error
204214

205215
self.callbacks.append(callback)
206216

tests/unit/async_/work/test_transaction.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818

1919
from unittest.mock import MagicMock
20-
from uuid import uuid4
2120

2221
import pytest
2322

@@ -26,6 +25,11 @@
2625
NotificationMinimumSeverity,
2726
Query,
2827
)
28+
from neo4j.exceptions import (
29+
ClientError,
30+
ResultFailedError,
31+
ServiceUnavailable,
32+
)
2933

3034
from ...._async_compat import mark_async_test
3135

@@ -275,3 +279,51 @@ async def test_transaction_begin_pipelining(
275279
expected_calls.append(("send_all",))
276280
expected_calls.append(("fetch_all",))
277281
assert async_fake_connection.method_calls == expected_calls
282+
283+
284+
@pytest.mark.parametrize("error", ("server", "connection"))
285+
@mark_async_test
286+
async def test_server_error_propagates(async_scripted_connection, error):
287+
connection = async_scripted_connection
288+
script = [
289+
# res 1
290+
("run", {"on_success": ({"fields": ["n"]},), "on_summary": None}),
291+
("pull", {"on_records": ([[1], [2]],),
292+
"on_success": ({"has_more": True},)}),
293+
# res 2
294+
("run", {"on_success": ({"fields": ["n"]},), "on_summary": None}),
295+
("pull", {"on_records": ([[1], [2]],),
296+
"on_success": ({"has_more": True},)}),
297+
]
298+
if error == "server":
299+
script.append(
300+
("pull", {"on_failure": ({"code": "Neo.ClientError.Made.Up"},),
301+
"on_summary": None})
302+
)
303+
expected_error = ClientError
304+
elif error == "connection":
305+
script.append(("pull", ServiceUnavailable()))
306+
expected_error = ServiceUnavailable
307+
else:
308+
raise ValueError(f"Unknown error type {error}")
309+
connection.set_script(script)
310+
311+
tx = AsyncTransaction(
312+
connection, 2, lambda *args, **kwargs: None,
313+
lambda *args, **kwargs: None, lambda *args, **kwargs: None
314+
)
315+
res1 = await tx.run("UNWIND range(1, 1000) AS n RETURN n")
316+
assert await res1.__anext__() == {"n": 1}
317+
318+
res2 = await tx.run("RETURN 'causes error later'")
319+
assert await res2.fetch(2) == [{"n": 1}, {"n": 2}]
320+
with pytest.raises(expected_error) as exc1:
321+
await res2.__anext__()
322+
323+
# can finish the buffer
324+
assert await res1.fetch(1) == [{"n": 2}]
325+
# then fails because the connection was broken by res2
326+
with pytest.raises(ResultFailedError) as exc2:
327+
await res1.__anext__()
328+
329+
assert exc1.value is exc2.value.__cause__

tests/unit/sync/fixtures/fake_connection.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from neo4j._deadline import Deadline
2525
from neo4j._sync.io import Bolt
2626
from neo4j.auth_management import AuthManager
27+
from neo4j.exceptions import Neo4jError
2728

2829

2930
__all__ = [
@@ -154,10 +155,12 @@ def set_script(self, callbacks):
154155
[
155156
("run", {"on_success": ({},), "on_summary": None}),
156157
("pull", {
158+
"on_records": ([some_record],),
157159
"on_success": None,
158160
"on_summary": None,
159-
"on_records":
160161
})
162+
# use any exception to throw it instead of calling handlers
163+
("commit", RuntimeError("oh no!"))
161164
]
162165
```
163166
Note that arguments can be `None`. In this case, ScriptedConnection
@@ -180,6 +183,9 @@ def func(*args, **kwargs):
180183
self._script_pos += 1
181184

182185
def callback():
186+
if isinstance(scripted_callbacks, BaseException):
187+
raise scripted_callbacks
188+
error = None
183189
for cb_name, default_cb_args in (
184190
("on_ignored", ({},)),
185191
("on_failure", ({},)),
@@ -197,10 +203,14 @@ def callback():
197203
if cb_args is None:
198204
cb_args = default_cb_args
199205
res = cb(*cb_args)
206+
if cb_name == "on_failure":
207+
error = Neo4jError.hydrate(**cb_args[0])
200208
try:
201209
res # maybe the callback is async
202210
except TypeError:
203211
pass # or maybe it wasn't ;)
212+
if error is not None:
213+
raise error
204214

205215
self.callbacks.append(callback)
206216

0 commit comments

Comments
 (0)