Skip to content

Commit d80ae9a

Browse files
committed
Send RESET when putting a connection back into the pool
1 parent bfa2542 commit d80ae9a

File tree

5 files changed

+47
-21
lines changed

5 files changed

+47
-21
lines changed

neo4j/io/__init__.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
ConfigurationError,
8484
DriverError,
8585
IncompleteCommit,
86+
Neo4jError,
8687
ReadServiceUnavailable,
8788
ServiceUnavailable,
8889
SessionExpired,
@@ -121,6 +122,9 @@ class Bolt(abc.ABC):
121122

122123
PROTOCOL_VERSION = None
123124

125+
# flag if connection needs RESET to go back to READY state
126+
_is_reset = True
127+
124128
# The socket
125129
in_use = False
126130

@@ -144,7 +148,6 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=No
144148
self.responses = deque()
145149
self._max_connection_lifetime = max_connection_lifetime
146150
self._creation_timestamp = perf_counter()
147-
self._is_reset = True
148151
self.routing_context = routing_context
149152

150153
# Determine the user agent
@@ -447,6 +450,10 @@ def rollback(self, **handlers):
447450
""" Appends a ROLLBACK message to the output queue."""
448451
pass
449452

453+
@property
454+
def is_reset(self):
455+
return self._is_reset
456+
450457
@abc.abstractmethod
451458
def reset(self):
452459
""" Appends a RESET message to the outgoing queue, sends it and consumes
@@ -564,23 +571,26 @@ def _set_defunct(self, message, error=None, silent=False):
564571
def stale(self):
565572
return (self._stale
566573
or (0 <= self._max_connection_lifetime
567-
<= perf_counter()- self._creation_timestamp))
574+
<= perf_counter() - self._creation_timestamp))
568575

569576
_stale = False
570577

571578
def set_stale(self):
572579
self._stale = True
573580

581+
@abc.abstractmethod
574582
def close(self):
575583
""" Close the connection.
576584
"""
577-
raise NotImplementedError
585+
pass
578586

587+
@abc.abstractmethod
579588
def closed(self):
580-
raise NotImplementedError
589+
pass
581590

591+
@abc.abstractmethod
582592
def defunct(self):
583-
raise NotImplementedError
593+
pass
584594

585595

586596
class IOPool:
@@ -682,6 +692,13 @@ def release(self, *connections):
682692
"""
683693
with self.lock:
684694
for connection in connections:
695+
if not connection.is_reset:
696+
try:
697+
connection.reset()
698+
except (Neo4jError, DriverError, BoltError) as e:
699+
log.debug(
700+
"Reset on IOPool.release failed: {}".format(e)
701+
)
685702
connection.in_use = False
686703
self.cond.notify_all()
687704

neo4j/io/_bolt4.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,6 @@ class Bolt4x0(Bolt):
6161

6262
PROTOCOL_VERSION = Version(4, 0)
6363

64-
# The socket
65-
in_use = False
66-
67-
# The socket
68-
_closed = False
69-
70-
# The socket
71-
_defunct = False
72-
73-
#: The pool of which this connection is a member
74-
pool = None
75-
7664
supports_multiple_results = True
7765

7866
supports_multiple_databases = True

neo4j/work/transaction.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ def rollback(self):
160160
raise TransactionError("Transaction closed")
161161
metadata = {}
162162
try:
163-
if not self._connection._is_reset:
163+
if not self._connection.is_reset:
164164
# DISCARD pending records then do a rollback.
165165
self._consume_results()
166166
self._connection.rollback(on_success=metadata.update)

testkitbackend/skipped_tests.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,6 @@
6565
"Driver uses custom resolver for each connection, not only initial seeding",
6666
"stub.retry.TestRetryClustering.test_retry_ForbiddenOnReadOnlyDatabase_ChangingWriter":
6767
"Test makes assumptions about how verify_connectivity is implemented",
68-
"stub.disconnected.SessionRunDisconnected.test_fail_on_reset":
69-
"It is not reseting connection when putting back to pool",
7068
"stub.authorization.AuthorizationTests.test_should_retry_on_auth_expired_on_begin_using_tx_function":
7169
"Flaky: test requires the driver to contact servers in a specific order",
7270
"stub.authorization.AuthorizationTestsV3.test_should_retry_on_auth_expired_on_begin_using_tx_function":

tests/unit/io/test_direct.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
# limitations under the License.
2020

2121

22-
from unittest import TestCase
22+
from unittest import (
23+
mock,
24+
TestCase,
25+
)
2326
import pytest
2427
from threading import (
2528
Condition,
@@ -277,6 +280,26 @@ def test_multithread(self):
277280
# The pool size is still 5, but all are free
278281
self.assert_pool_size(address, 0, 5, pool)
279282

283+
def test_reset_when_released(self):
284+
def test(is_reset):
285+
with mock.patch(__name__ + ".QuickConnection.is_reset",
286+
new_callable=mock.PropertyMock) as is_reset_mock:
287+
with mock.patch(__name__ + ".QuickConnection.reset",
288+
new_callable=mock.MagicMock) as reset_mock:
289+
is_reset_mock.return_value = is_reset
290+
connection = self.pool._acquire(address, timeout=3)
291+
self.assertIsInstance(connection, QuickConnection)
292+
self.assertEqual(is_reset_mock.call_count, 0)
293+
self.assertEqual(reset_mock.call_count, 0)
294+
self.pool.release(connection)
295+
self.assertEqual(is_reset_mock.call_count, 1)
296+
self.assertEqual(reset_mock.call_count, int(not is_reset))
297+
298+
address = ("127.0.0.1", 7687)
299+
for is_reset in (True, False):
300+
with self.subTest():
301+
test(is_reset)
302+
280303

281304
def acquire_release_conn(pool, address, acquired_counter, release_event):
282305
conn = pool._acquire(address, timeout=3)

0 commit comments

Comments
 (0)