Skip to content

Commit f2b9057

Browse files
committed
Don't remove broken connections multiple times from the pool
Fixes `ValueError: deque.remove(x): x not in deque` when multiple connections to the same server brake more or less simultaneously.
1 parent 5de87f5 commit f2b9057

File tree

2 files changed

+40
-7
lines changed

2 files changed

+40
-7
lines changed

neo4j/io/__init__.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -772,13 +772,16 @@ def deactivate(self, address):
772772
connections = self.connections[address]
773773
except KeyError: # already removed from the connection pool
774774
return
775-
for conn in list(connections):
776-
if not conn.in_use:
777-
connections.remove(conn)
778-
try:
779-
conn.close()
780-
except OSError:
781-
pass
775+
closable_connections = [
776+
conn for conn in connections if not conn.in_use
777+
]
778+
# First remove all connections in question, then try to close them.
779+
# If closing of a connection fails, we will end up in this method
780+
# again.
781+
for conn in closable_connections:
782+
connections.remove(conn)
783+
for conn in closable_connections:
784+
conn.close()
782785
if not self.connections[address]:
783786
del self.connections[address]
784787

tests/unit/io/test_neo4j_pool.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,36 @@ def test_release_does_not_resets_defunct_connections(opener):
232232
cx1.reset.asset_not_called()
233233

234234

235+
def test_multiple_broken_connections_on_close(opener):
236+
def mock_connection_breaks_on_close(cx):
237+
def close_side_effect():
238+
cx.closed.return_value = True
239+
cx.defunct.return_value = True
240+
pool.deactivate(READER_ADDRESS)
241+
242+
cx.attach_mock(Mock(side_effect=close_side_effect), "close")
243+
244+
# create pool with 2 idle connections
245+
pool = Neo4jPool(opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS)
246+
cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None)
247+
cx2 = pool.acquire(READ_ACCESS, 30, "test_db", None)
248+
pool.release(cx1)
249+
pool.release(cx2)
250+
251+
# both will loose connection
252+
mock_connection_breaks_on_close(cx1)
253+
mock_connection_breaks_on_close(cx2)
254+
255+
# force pool to close cx1, which will make it realize that the server is
256+
# unreachable
257+
cx1.stale.return_value = True
258+
259+
cx3 = pool.acquire(READ_ACCESS, 30, "test_db", None)
260+
261+
assert cx3 is not cx1
262+
assert cx3 is not cx2
263+
264+
235265
def test_failing_opener_leaves_connections_in_use_alone(opener):
236266
pool = Neo4jPool(opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS)
237267
cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None)

0 commit comments

Comments
 (0)