Skip to content

Commit 9d9cfc5

Browse files
authored
Don't close stale connections while in use (#631)
The pool will close connections marked as stale when trying to pick them up from the pool. It should not do so while the connection is in use (i.e., already borrowed from the pool). In concurrent systems, this would lead to all sorts of errors caused by the connection being managed from different threads: * the thread trying to acquire a connection will close the stale connection on encounter * while the thread that borrowed it might still be using it, e.g., to fetch results or run a query In fact, in corner cases it is also possible to hit this issue without concurrency by having a workload that relies on multiple sessions to perform work simultaneously.
1 parent 6c785c7 commit 9d9cfc5

File tree

2 files changed

+45
-5
lines changed

2 files changed

+45
-5
lines changed

neo4j/io/__init__.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
defaultdict,
4040
deque,
4141
)
42+
import logging
4243
from logging import getLogger
4344
from random import choice
4445
import selectors
@@ -652,11 +653,19 @@ def time_remaining():
652653
# try to find a free connection in pool
653654
for connection in list(self.connections.get(address, [])):
654655
if (connection.closed() or connection.defunct()
655-
or connection.stale()):
656+
or (connection.stale() and not connection.in_use)):
656657
# `close` is a noop on already closed connections.
657658
# This is to make sure that the connection is gracefully
658659
# closed, e.g. if it's just marked as `stale` but still
659660
# alive.
661+
if log.isEnabledFor(logging.DEBUG):
662+
log.debug(
663+
"[#%04X] C: <POOL> removing old connection "
664+
"(closed=%s, defunct=%s, stale=%s, in_use=%s)",
665+
connection.local_port,
666+
connection.closed(), connection.defunct(),
667+
connection.stale(), connection.in_use
668+
)
660669
connection.close()
661670
try:
662671
self.connections.get(address, []).remove(connection)

tests/unit/io/test_neo4j_pool.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
# See the License for the specific language governing permissions and
1919
# limitations under the License.
2020

21-
import inspect
21+
2222
from unittest.mock import Mock
2323

2424
import pytest
@@ -121,7 +121,7 @@ def test_chooses_right_connection_type(opener, type_):
121121
cx1 = pool.acquire(READ_ACCESS if type_ == "r" else WRITE_ACCESS,
122122
30, "test_db", None)
123123
pool.release(cx1)
124-
if type_ == "r":
124+
if type_ == "r":
125125
assert cx1.addr == READER_ADDRESS
126126
else:
127127
assert cx1.addr == WRITER_ADDRESS
@@ -147,7 +147,7 @@ def break_connection():
147147
cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None)
148148
pool.release(cx1)
149149
assert cx1 in pool.connections[cx1.addr]
150-
# simulate connection going stale (e.g. exceeding) and than breaking when
150+
# simulate connection going stale (e.g. exceeding) and then breaking when
151151
# the pool tries to close the connection
152152
cx1.stale.return_value = True
153153
cx_close_mock = cx1.close
@@ -156,13 +156,44 @@ def break_connection():
156156
cx_close_mock.side_effect = break_connection
157157
cx2 = pool.acquire(READ_ACCESS, 30, "test_db", None)
158158
pool.release(cx2)
159-
assert cx1.close.called_once()
159+
if break_on_close:
160+
cx1.close.assert_called()
161+
else:
162+
cx1.close.assert_called_once()
160163
assert cx2 is not cx1
161164
assert cx2.addr == cx1.addr
162165
assert cx1 not in pool.connections[cx1.addr]
163166
assert cx2 in pool.connections[cx2.addr]
164167

165168

169+
def test_does_not_close_stale_connections_in_use(opener):
170+
pool = Neo4jPool(opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS)
171+
cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None)
172+
assert cx1 in pool.connections[cx1.addr]
173+
# simulate connection going stale (e.g. exceeding) while being in use
174+
cx1.stale.return_value = True
175+
cx2 = pool.acquire(READ_ACCESS, 30, "test_db", None)
176+
pool.release(cx2)
177+
cx1.close.assert_not_called()
178+
assert cx2 is not cx1
179+
assert cx2.addr == cx1.addr
180+
assert cx1 in pool.connections[cx1.addr]
181+
assert cx2 in pool.connections[cx2.addr]
182+
183+
pool.release(cx1)
184+
# now that cx1 is back in the pool and still stale,
185+
# it should be closed when trying to acquire the next connection
186+
cx1.close.assert_not_called()
187+
188+
cx3 = pool.acquire(READ_ACCESS, 30, "test_db", None)
189+
pool.release(cx3)
190+
cx1.close.assert_called_once()
191+
assert cx2 is cx3
192+
assert cx3.addr == cx1.addr
193+
assert cx1 not in pool.connections[cx1.addr]
194+
assert cx3 in pool.connections[cx2.addr]
195+
196+
166197
def test_release_resets_connections(opener):
167198
pool = Neo4jPool(opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS)
168199
cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None)

0 commit comments

Comments
 (0)