Skip to content

Commit fad6357

Browse files
authored
Discard transaction on disconnect (#518)
* Turn internal BoltIncompleteCommitError into public IncompleteCommit and let it propagate for the user code to handle it. * Session discards transaction on network errors * Workspace/Session releases connection properly * Adjust test for session releasing connection on fail When a network error occurs, the session should release the current connection as it's dead and acquire a new one for the next transaction.
1 parent cdf8c83 commit fad6357

File tree

9 files changed

+214
-148
lines changed

9 files changed

+214
-148
lines changed

neo4j/io/__init__.py

Lines changed: 25 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -38,60 +38,52 @@
3838
from logging import getLogger
3939
from random import choice
4040
from select import select
41-
from time import perf_counter
42-
4341
from socket import (
42+
AF_INET,
43+
AF_INET6,
44+
SHUT_RDWR,
45+
SO_KEEPALIVE,
4446
socket,
4547
SOL_SOCKET,
46-
SO_KEEPALIVE,
47-
SHUT_RDWR,
4848
timeout as SocketTimeout,
49-
AF_INET,
50-
AF_INET6,
5149
)
52-
5350
from ssl import (
5451
HAS_SNI,
5552
SSLError,
5653
)
57-
58-
from struct import (
59-
pack as struct_pack,
60-
)
61-
6254
from threading import (
55+
Condition,
6356
Lock,
6457
RLock,
65-
Condition,
6658
)
59+
from time import perf_counter
6760

68-
from neo4j.addressing import Address
69-
from neo4j.conf import PoolConfig
7061
from neo4j._exceptions import (
62+
BoltHandshakeError,
63+
BoltProtocolError,
7164
BoltRoutingError,
7265
BoltSecurityError,
73-
BoltProtocolError,
74-
BoltHandshakeError,
7566
)
76-
from neo4j.exceptions import (
77-
ServiceUnavailable,
78-
ClientError,
79-
SessionExpired,
80-
ReadServiceUnavailable,
81-
WriteServiceUnavailable,
82-
ConfigurationError,
83-
UnsupportedServerProduct,
67+
from neo4j.addressing import Address
68+
from neo4j.api import (
69+
READ_ACCESS,
70+
Version,
71+
WRITE_ACCESS,
8472
)
85-
from neo4j.routing import RoutingTable
8673
from neo4j.conf import (
8774
PoolConfig,
8875
WorkspaceConfig,
8976
)
90-
from neo4j.api import (
91-
READ_ACCESS,
92-
WRITE_ACCESS,
93-
Version,
77+
from neo4j.exceptions import (
78+
ClientError,
79+
ConfigurationError,
80+
ReadServiceUnavailable,
81+
ServiceUnavailable,
82+
SessionExpired,
83+
UnsupportedServerProduct,
84+
WriteServiceUnavailable,
9485
)
86+
from neo4j.routing import RoutingTable
9587

9688
# Set up logger
9789
log = getLogger("neo4j")
@@ -258,7 +250,7 @@ def open(cls, address, *, auth=None, timeout=None, routing_context=None, **pool_
258250
except Exception as error:
259251
log.debug("[#%04X] C: <CLOSE> %s", s.getsockname()[1], str(error))
260252
_close_socket(s)
261-
raise error
253+
raise
262254

263255
return connection
264256

@@ -522,7 +514,7 @@ def deactivate(self, address):
522514
connections.remove(conn)
523515
try:
524516
conn.close()
525-
except IOError:
517+
except OSError:
526518
pass
527519
if not connections:
528520
self.remove(address)
@@ -538,7 +530,7 @@ def remove(self, address):
538530
for connection in self.connections.pop(address, ()):
539531
try:
540532
connection.close()
541-
except IOError:
533+
except OSError:
542534
pass
543535

544536
def close(self):

neo4j/io/_bolt3.py

Lines changed: 47 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -19,44 +19,49 @@
1919
# limitations under the License.
2020

2121
from collections import deque
22+
from logging import getLogger
2223
from ssl import SSLSocket
2324
from time import perf_counter
25+
26+
from neo4j._exceptions import (
27+
BoltError,
28+
BoltProtocolError,
29+
)
30+
from neo4j.addressing import Address
2431
from neo4j.api import (
25-
Version,
2632
READ_ACCESS,
33+
ServerInfo,
34+
Version,
2735
)
28-
from neo4j.io._common import (
29-
Inbox,
30-
Outbox,
31-
Response,
32-
InitResponse,
33-
CommitResponse,
34-
)
35-
from neo4j.meta import get_user_agent
3636
from neo4j.exceptions import (
3737
AuthError,
38-
DatabaseUnavailable,
3938
ConfigurationError,
39+
DatabaseUnavailable,
40+
DriverError,
4041
ForbiddenOnReadOnlyDatabase,
4142
IncompleteCommit,
4243
NotALeader,
4344
ServiceUnavailable,
4445
SessionExpired,
4546
)
46-
from neo4j._exceptions import BoltProtocolError
47-
from neo4j.packstream import (
48-
Unpacker,
49-
Packer,
50-
)
5147
from neo4j.io import (
48+
check_supported_server_product,
5249
Bolt,
5350
BoltPool,
54-
check_supported_server_product,
5551
)
56-
from neo4j.api import ServerInfo
57-
from neo4j.addressing import Address
52+
from neo4j.io._common import (
53+
CommitResponse,
54+
Inbox,
55+
InitResponse,
56+
Outbox,
57+
Response,
58+
)
59+
from neo4j.meta import get_user_agent
60+
from neo4j.packstream import (
61+
Packer,
62+
Unpacker,
63+
)
5864

59-
from logging import getLogger
6065
log = getLogger("neo4j")
6166

6267

@@ -85,7 +90,7 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=No
8590
self.socket = sock
8691
self.server_info = ServerInfo(Address(sock.getpeername()), self.PROTOCOL_VERSION)
8792
self.outbox = Outbox()
88-
self.inbox = Inbox(self.socket, on_error=self._set_defunct)
93+
self.inbox = Inbox(self.socket, on_error=self._set_defunct_read)
8994
self.packer = Packer(self.outbox)
9095
self.unpacker = Unpacker(self.inbox)
9196
self.responses = deque()
@@ -135,7 +140,7 @@ def der_encoded_server_certificate(self):
135140
def local_port(self):
136141
try:
137142
return self.socket.getsockname()[1]
138-
except IOError:
143+
except OSError:
139144
return 0
140145

141146
def get_base_headers(self):
@@ -292,7 +297,10 @@ def fail(metadata):
292297
def _send_all(self):
293298
data = self.outbox.view()
294299
if data:
295-
self.socket.sendall(data)
300+
try:
301+
self.socket.sendall(data)
302+
except OSError as error:
303+
self._set_defunct_write(error)
296304
self.outbox.clear()
297305

298306
def send_all(self):
@@ -306,17 +314,7 @@ def send_all(self):
306314
raise ServiceUnavailable("Failed to write to defunct connection {!r} ({!r})".format(
307315
self.unresolved_address, self.server_info.address))
308316

309-
try:
310-
self._send_all()
311-
except (IOError, OSError) as error:
312-
log.error("Failed to write data to connection "
313-
"{!r} ({!r}); ({!r})".
314-
format(self.unresolved_address,
315-
self.server_info.address,
316-
"; ".join(map(repr, error.args))))
317-
if self.pool:
318-
self.pool.deactivate(address=self.unresolved_address)
319-
raise
317+
self._send_all()
320318

321319
def fetch_message(self):
322320
""" Receive at least one message from the server, if available.
@@ -336,17 +334,7 @@ def fetch_message(self):
336334
return 0, 0
337335

338336
# Receive exactly one message
339-
try:
340-
details, summary_signature, summary_metadata = next(self.inbox)
341-
except (IOError, OSError) as error:
342-
log.error("Failed to read data from connection "
343-
"{!r} ({!r}); ({!r})".
344-
format(self.unresolved_address,
345-
self.server_info.address,
346-
"; ".join(map(repr, error.args))))
347-
if self.pool:
348-
self.pool.deactivate(address=self.unresolved_address)
349-
raise
337+
details, summary_signature, summary_metadata = next(self.inbox)
350338

351339
if details:
352340
log.debug("[#%04X] S: RECORD * %d", self.local_port, len(details)) # Do not log any data
@@ -380,11 +368,20 @@ def fetch_message(self):
380368

381369
return len(details), 1
382370

383-
def _set_defunct(self, error=None):
384-
direct_driver = isinstance(self.pool, BoltPool)
371+
def _set_defunct_read(self, error=None):
372+
message = "Failed to read from defunct connection {!r} ({!r})".format(
373+
self.unresolved_address, self.server_info.address
374+
)
375+
self._set_defunct(message, error=error)
385376

386-
message = ("Failed to read from defunct connection {!r} ({!r})".format(
387-
self.unresolved_address, self.server_info.address))
377+
def _set_defunct_write(self, error=None):
378+
message = "Failed to write data to connection {!r} ({!r})".format(
379+
self.unresolved_address, self.server_info.address
380+
)
381+
self._set_defunct(message, error=error)
382+
383+
def _set_defunct(self, message, error=None):
384+
direct_driver = isinstance(self.pool, BoltPool)
388385

389386
if error:
390387
log.error(str(error))
@@ -445,12 +442,12 @@ def close(self):
445442
self._append(b"\x02", ())
446443
try:
447444
self._send_all()
448-
except:
445+
except (OSError, BoltError, DriverError):
449446
pass
450447
log.debug("[#%04X] C: <CLOSE>", self.local_port)
451448
try:
452449
self.socket.close()
453-
except IOError:
450+
except OSError:
454451
pass
455452
finally:
456453
self._closed = True

0 commit comments

Comments
 (0)