Skip to content

Commit ad928c5

Browse files
authored
Merge pull request #197 from zhenlineo/1.5-purge-idle
Do not purge inUse connections
2 parents 9aa31dc + 97c8ccd commit ad928c5

File tree

6 files changed

+127
-16
lines changed

6 files changed

+127
-16
lines changed

neo4j/bolt/connection.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,25 @@ def in_use_connection_count(self, address):
492492
else:
493493
return sum(1 if connection.in_use else 0 for connection in connections)
494494

495+
def deactivate(self, address):
496+
""" Deactivate an address from the connection pool, if present, closing
497+
all idle connection to that address
498+
"""
499+
with self.lock:
500+
try:
501+
connections = self.connections[address]
502+
except KeyError: # already removed from the connection pool
503+
return
504+
for conn in list(connections):
505+
if not conn.in_use:
506+
connections.remove(conn)
507+
try:
508+
conn.close()
509+
except IOError:
510+
pass
511+
if not connections:
512+
self.remove(address)
513+
495514
def remove(self, address):
496515
""" Remove an address from the connection pool, if present, closing
497516
all connections to that address.

neo4j/v1/routing.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@ def update(self, new_routing_table):
140140
self.last_updated_time = self.timer()
141141
self.ttl = new_routing_table.ttl
142142

143+
def servers(self):
144+
return set(self.routers) | set(self.writers) | set(self.readers)
145+
143146

144147
class RoutingSession(BoltSession):
145148

@@ -249,9 +252,9 @@ class RoutingConnectionErrorHandler(ConnectionErrorHandler):
249252

250253
def __init__(self, pool):
251254
super(RoutingConnectionErrorHandler, self).__init__({
252-
SessionExpired: lambda address: pool.remove(address),
253-
ServiceUnavailable: lambda address: pool.remove(address),
254-
DatabaseUnavailableError: lambda address: pool.remove(address),
255+
SessionExpired: lambda address: pool.deactivate(address),
256+
ServiceUnavailable: lambda address: pool.deactivate(address),
257+
DatabaseUnavailableError: lambda address: pool.deactivate(address),
255258
NotALeaderError: lambda address: pool.remove_writer(address),
256259
ForbiddenOnReadOnlyDatabaseError: lambda address: pool.remove_writer(address)
257260
})
@@ -288,7 +291,7 @@ def fetch_routing_info(self, address):
288291
else:
289292
raise ServiceUnavailable("Routing support broken on server {!r}".format(address))
290293
except ServiceUnavailable:
291-
self.remove(address)
294+
self.deactivate(address)
292295
return None
293296

294297
def fetch_routing_table(self, address):
@@ -365,6 +368,12 @@ def update_routing_table(self):
365368
# None of the routers have been successful, so just fail
366369
raise ServiceUnavailable("Unable to retrieve routing information")
367370

371+
def update_connection_pool(self):
372+
servers = self.routing_table.servers()
373+
for address in list(self.connections):
374+
if address not in servers:
375+
super(RoutingConnectionPool, self).deactivate(address)
376+
368377
def ensure_routing_table_is_fresh(self, access_mode):
369378
""" Update the routing table if stale.
370379
@@ -387,6 +396,7 @@ def ensure_routing_table_is_fresh(self, access_mode):
387396
self.missing_writer = not self.routing_table.is_fresh(WRITE_ACCESS)
388397
return False
389398
self.update_routing_table()
399+
self.update_connection_pool()
390400
return True
391401

392402
def acquire(self, access_mode=None):
@@ -410,21 +420,22 @@ def acquire(self, access_mode=None):
410420
connection = self.acquire_direct(address) # should always be a resolved address
411421
connection.Error = SessionExpired
412422
except ServiceUnavailable:
413-
self.remove(address)
423+
self.deactivate(address)
414424
else:
415425
return connection
416426
raise SessionExpired("Failed to obtain connection towards '%s' server." % access_mode)
417427

418-
def remove(self, address):
419-
""" Remove an address from the connection pool, if present, closing
420-
all connections to that address. Also remove from the routing table.
428+
def deactivate(self, address):
429+
""" Deactivate an address from the connection pool,
430+
if present, remove from the routing table and also closing
431+
all idle connections to that address.
421432
"""
422433
# We use `discard` instead of `remove` here since the former
423434
# will not fail if the address has already been removed.
424435
self.routing_table.routers.discard(address)
425436
self.routing_table.readers.discard(address)
426437
self.routing_table.writers.discard(address)
427-
super(RoutingConnectionPool, self).remove(address)
438+
super(RoutingConnectionPool, self).deactivate(address)
428439

429440
def remove_writer(self, address):
430441
""" Remove a writer address from the routing table, if present.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
4+
C: RUN "CALL dbms.cluster.routing.getServers" {}
5+
PULL_ALL
6+
S: SUCCESS {"fields": ["ttl", "servers"]}
7+
RECORD [300, [{"role":"ROUTE","addresses":["127.0.0.1:9001","127.0.0.1:9002"]},{"role":"READ","addresses":["127.0.0.1:9001","127.0.0.1:9003"]},{"role":"WRITE","addresses":["127.0.0.1:9004"]}]]
8+
SUCCESS {}

test/stub/test_routing.py

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,59 @@ def test_should_flag_reading_without_writer(self):
303303
pool.ensure_routing_table_is_fresh(READ_ACCESS)
304304
assert pool.missing_writer
305305

306+
def test_should_purge_idle_connections_from_connection_pool(self):
307+
with StubCluster({9006: "router.script", 9001: "router_with_multiple_servers.script"}):
308+
address = ("127.0.0.1", 9006)
309+
with RoutingPool(address) as pool:
310+
# close the acquired connection with init router and then set it to be idle
311+
conn = pool.acquire(WRITE_ACCESS)
312+
conn.close()
313+
conn.in_use = False
314+
315+
table = pool.routing_table
316+
assert table.routers == {("127.0.0.1", 9001), ("127.0.0.1", 9002),
317+
("127.0.0.1", 9003)}
318+
assert table.readers == {("127.0.0.1", 9004), ("127.0.0.1", 9005)}
319+
assert table.writers == {("127.0.0.1", 9006)}
320+
assert set(pool.connections.keys()) == {("127.0.0.1", 9006)}
321+
322+
# immediately expire the routing table to enforce update a new routing table
323+
pool.routing_table.ttl = 0
324+
pool.ensure_routing_table_is_fresh(WRITE_ACCESS)
325+
table = pool.routing_table
326+
assert table.routers == {("127.0.0.1", 9001), ("127.0.0.1", 9002)}
327+
assert table.readers == {("127.0.0.1", 9001), ("127.0.0.1", 9003)}
328+
assert table.writers == {("127.0.0.1", 9004)}
329+
330+
assert set(pool.connections.keys()) == {("127.0.0.1", 9001)}
331+
332+
def test_should_not_purge_idle_connections_from_connection_pool(self):
333+
with StubCluster({9006: "router.script", 9001: "router_with_multiple_servers.script"}):
334+
address = ("127.0.0.1", 9006)
335+
with RoutingPool(address) as pool:
336+
# close the acquired connection with init router and then set it to be inUse
337+
conn = pool.acquire(WRITE_ACCESS)
338+
conn.close()
339+
conn.in_use = True
340+
341+
table = pool.routing_table
342+
assert table.routers == {("127.0.0.1", 9001), ("127.0.0.1", 9002),
343+
("127.0.0.1", 9003)}
344+
assert table.readers == {("127.0.0.1", 9004), ("127.0.0.1", 9005)}
345+
assert table.writers == {("127.0.0.1", 9006)}
346+
assert set(pool.connections.keys()) == {("127.0.0.1", 9006)}
347+
348+
# immediately expire the routing table to enforce update a new routing table
349+
pool.routing_table.ttl = 0
350+
pool.ensure_routing_table_is_fresh(WRITE_ACCESS)
351+
table = pool.routing_table
352+
assert table.routers == {("127.0.0.1", 9001), ("127.0.0.1", 9002)}
353+
assert table.readers == {("127.0.0.1", 9001), ("127.0.0.1", 9003)}
354+
assert table.writers == {("127.0.0.1", 9004)}
355+
356+
assert set(pool.connections.keys()) == {("127.0.0.1", 9001), ("127.0.0.1", 9006)}
357+
358+
306359
# TODO: fix flaky test
307360
# def test_concurrent_refreshes_should_not_block_if_fresh(self):
308361
# address = ("127.0.0.1", 9001)
@@ -481,15 +534,15 @@ def test_should_error_to_writer_in_absent_of_reader(self):
481534
assert not pool.missing_writer
482535

483536

484-
class RoutingConnectionPoolRemoveTestCase(StubTestCase):
537+
class RoutingConnectionPoolDeactivateTestCase(StubTestCase):
485538
def test_should_remove_router_from_routing_table_if_present(self):
486539
with StubCluster({9001: "router.script"}):
487540
address = ("127.0.0.1", 9001)
488541
with RoutingPool(address) as pool:
489542
pool.ensure_routing_table_is_fresh(WRITE_ACCESS)
490543
target = ("127.0.0.1", 9001)
491544
assert target in pool.routing_table.routers
492-
pool.remove(target)
545+
pool.deactivate(target)
493546
assert target not in pool.routing_table.routers
494547

495548
def test_should_remove_reader_from_routing_table_if_present(self):
@@ -499,7 +552,7 @@ def test_should_remove_reader_from_routing_table_if_present(self):
499552
pool.ensure_routing_table_is_fresh(WRITE_ACCESS)
500553
target = ("127.0.0.1", 9004)
501554
assert target in pool.routing_table.readers
502-
pool.remove(target)
555+
pool.deactivate(target)
503556
assert target not in pool.routing_table.readers
504557

505558
def test_should_remove_writer_from_routing_table_if_present(self):
@@ -509,7 +562,7 @@ def test_should_remove_writer_from_routing_table_if_present(self):
509562
pool.ensure_routing_table_is_fresh(WRITE_ACCESS)
510563
target = ("127.0.0.1", 9006)
511564
assert target in pool.routing_table.writers
512-
pool.remove(target)
565+
pool.deactivate(target)
513566
assert target not in pool.routing_table.writers
514567

515568
def test_should_not_fail_if_absent(self):
@@ -518,4 +571,4 @@ def test_should_not_fail_if_absent(self):
518571
with RoutingPool(address) as pool:
519572
pool.ensure_routing_table_is_fresh(WRITE_ACCESS)
520573
target = ("127.0.0.1", 9007)
521-
pool.remove(target)
574+
pool.deactivate(target)

test/stub/test_routingdriver.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,13 +289,19 @@ def test_forgets_address_on_service_unavailable_error(self):
289289
pool = driver._pool
290290
table = pool.routing_table
291291

292-
# address should not have connections in the pool, it has failed
293-
assert ('127.0.0.1', 9004) not in pool.connections
292+
# address should have connections in the pool but be inactive, it has failed
293+
assert ('127.0.0.1', 9004) in pool.connections
294+
conns = pool.connections[('127.0.0.1', 9004)]
295+
conn = conns[0]
296+
assert conn._closed == True
297+
assert conn.in_use == True
294298
assert table.routers == {('127.0.0.1', 9001), ('127.0.0.1', 9002), ('127.0.0.1', 9003)}
295299
# reader 127.0.0.1:9004 should've been forgotten because of an error
296300
assert table.readers == {('127.0.0.1', 9005)}
297301
assert table.writers == {('127.0.0.1', 9006)}
298302

303+
assert conn.in_use == False
304+
299305
def test_forgets_address_on_database_unavailable_error(self):
300306
with StubCluster({9001: "router.script", 9004: "database_unavailable.script"}):
301307
uri = "bolt+routing://127.0.0.1:9001"

test/unit/test_routing.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,20 @@ def test_should_fail_on_multiple_records(self):
169169
_ = RoutingTable.parse_routing_info([VALID_ROUTING_RECORD, VALID_ROUTING_RECORD])
170170

171171

172+
class RoutingTableServersTestCase(TestCase):
173+
def test_should_return_all_distinct_servers_in_routing_table(self):
174+
routing_table = {
175+
"ttl": 300,
176+
"servers": [
177+
{"role": "ROUTE", "addresses": ["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"]},
178+
{"role": "READ", "addresses": ["127.0.0.1:9001", "127.0.0.1:9005"]},
179+
{"role": "WRITE", "addresses": ["127.0.0.1:9002"]},
180+
],
181+
}
182+
table = RoutingTable.parse_routing_info([routing_table])
183+
assert table.servers() == {('127.0.0.1', 9001), ('127.0.0.1', 9002), ('127.0.0.1', 9003), ('127.0.0.1', 9005)}
184+
185+
172186
class RoutingTableFreshnessTestCase(TestCase):
173187
def test_should_be_fresh_after_update(self):
174188
table = RoutingTable.parse_routing_info([VALID_ROUTING_RECORD])

0 commit comments

Comments
 (0)