Skip to content

Commit 0f0a75c

Browse files
committed
Send BEGIN eagerly on transaction begin
1 parent 752bcd3 commit 0f0a75c

File tree

4 files changed

+52
-51
lines changed

4 files changed

+52
-51
lines changed

neo4j/io/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
__all__ = [
3030
"Bolt",
3131
"BoltPool",
32+
"ConnectionErrorHandler",
3233
"Neo4jPool",
3334
"check_supported_server_product",
3435
]
@@ -92,6 +93,7 @@
9293
)
9394
from neo4j.io._common import (
9495
CommitResponse,
96+
ConnectionErrorHandler,
9597
Inbox,
9698
InitResponse,
9799
Outbox,

neo4j/io/_common.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,43 @@ def view(self):
139139
return memoryview(self._data[:end])
140140

141141

142+
class ConnectionErrorHandler:
143+
"""
144+
Wrapper class for handling connection errors.
145+
146+
The class will wrap each method to invoke a callback if the method raises
147+
Neo4jError, SessionExpired, or ServiceUnavailable.
148+
The error will be re-raised after the callback.
149+
"""
150+
151+
def __init__(self, connection, on_error):
152+
"""
153+
:param connection the connection object to warp
154+
:type connection Bolt
155+
:param on_error the function to be called when a method of
156+
connection raises of of the caught errors.
157+
:type on_error callable
158+
"""
159+
self.__connection = connection
160+
self.__on_error = on_error
161+
162+
def __getattr__(self, item):
163+
connection_attr = getattr(self.__connection, item)
164+
if not callable(connection_attr):
165+
return connection_attr
166+
167+
def outer(func):
168+
def inner(*args, **kwargs):
169+
try:
170+
func(*args, **kwargs)
171+
except (Neo4jError, ServiceUnavailable, SessionExpired) as exc:
172+
self.__on_error(exc)
173+
raise
174+
return inner
175+
176+
return outer(connection_attr)
177+
178+
142179
class Response:
143180
""" Subscriber object for a full response (zero or
144181
more detail messages followed by one summary message).

neo4j/work/result.py

Lines changed: 4 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -23,51 +23,10 @@
2323
from warnings import warn
2424

2525
from neo4j.data import DataDehydrator
26-
from neo4j.exceptions import (
27-
Neo4jError,
28-
ServiceUnavailable,
29-
SessionExpired,
30-
)
26+
from neo4j.io import ConnectionErrorHandler
3127
from neo4j.work.summary import ResultSummary
3228

3329

34-
class _ConnectionErrorHandler:
35-
"""
36-
Wrapper class for handling connection errors.
37-
38-
The class will wrap each method to invoke a callback if the method raises
39-
SessionExpired or ServiceUnavailable.
40-
The error will be re-raised after the callback.
41-
"""
42-
43-
def __init__(self, connection, on_error):
44-
"""
45-
:param connection the connection object to warp
46-
:type connection Bolt
47-
:param on_error the function to be called when a method of
48-
connection raises of of the caught errors.
49-
:type on_error callable
50-
"""
51-
self.connection = connection
52-
self.on_error = on_error
53-
54-
def __getattr__(self, item):
55-
connection_attr = getattr(self.connection, item)
56-
if not callable(connection_attr):
57-
return connection_attr
58-
59-
def outer(func):
60-
def inner(*args, **kwargs):
61-
try:
62-
func(*args, **kwargs)
63-
except (Neo4jError, ServiceUnavailable, SessionExpired) as exc:
64-
self.on_error(exc)
65-
raise
66-
return inner
67-
68-
return outer(connection_attr)
69-
70-
7130
class Result:
7231
"""A handler for the result of Cypher query execution. Instances
7332
of this class are typically constructed and returned by
@@ -76,7 +35,7 @@ class Result:
7635

7736
def __init__(self, connection, hydrant, fetch_size, on_closed,
7837
on_error):
79-
self._connection = _ConnectionErrorHandler(connection, on_error)
38+
self._connection = ConnectionErrorHandler(connection, on_error)
8039
self._hydrant = hydrant
8140
self._on_closed = on_closed
8241
self._metadata = None
@@ -98,7 +57,7 @@ def __init__(self, connection, hydrant, fetch_size, on_closed,
9857

9958
@property
10059
def _qid(self):
101-
if self._raw_qid == self._connection.connection.most_recent_qid:
60+
if self._raw_qid == self._connection.most_recent_qid:
10261
return -1
10362
else:
10463
return self._raw_qid
@@ -127,7 +86,7 @@ def on_attached(metadata):
12786
# For auto-commit there is no qid and Bolt 3 does not support qid
12887
self._raw_qid = metadata.get("qid", -1)
12988
if self._raw_qid != -1:
130-
self._connection.connection.most_recent_qid = self._raw_qid
89+
self._connection.most_recent_qid = self._raw_qid
13190
self._keys = metadata.get("fields")
13291
self._attached = True
13392

neo4j/work/transaction.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,9 @@
2222
from neo4j.work.result import Result
2323
from neo4j.data import DataHydrator
2424
from neo4j.exceptions import (
25-
ServiceUnavailable,
26-
SessionExpired,
2725
TransactionError,
2826
)
27+
from neo4j.io import ConnectionErrorHandler
2928

3029

3130
class Transaction:
@@ -41,6 +40,9 @@ class Transaction:
4140

4241
def __init__(self, connection, fetch_size, on_closed, on_error):
4342
self._connection = connection
43+
self._error_handling_connection = ConnectionErrorHandler(
44+
connection, self._error_handler
45+
)
4446
self._bookmark = None
4547
self._results = []
4648
self._closed = False
@@ -63,14 +65,15 @@ def __exit__(self, exception_type, exception_value, traceback):
6365
def _begin(self, database, bookmarks, access_mode, metadata, timeout):
6466
self._connection.begin(bookmarks=bookmarks, metadata=metadata,
6567
timeout=timeout, mode=access_mode, db=database)
68+
self._error_handling_connection.send_all()
69+
self._error_handling_connection.fetch_all()
6670

6771
def _result_on_closed_handler(self):
6872
pass
6973

70-
def _result_on_error_handler(self, exc):
74+
def _error_handler(self, exc):
7175
self._last_error = exc
72-
if isinstance(exc, (ServiceUnavailable, SessionExpired)):
73-
self._closed = True
76+
self._closed = True
7477
self._on_error(exc)
7578

7679
def _consume_results(self):
@@ -126,7 +129,7 @@ def run(self, query, parameters=None, **kwparameters):
126129
result = Result(
127130
self._connection, DataHydrator(), self._fetch_size,
128131
self._result_on_closed_handler,
129-
self._result_on_error_handler
132+
self._error_handler
130133
)
131134
self._results.append(result)
132135

0 commit comments

Comments
 (0)