Skip to content

Commit 755b30d

Browse files
authored
Catch-up with TestKit (#593)
* Add TestKit backend message TransactionClose * add temporary TestKit feature flags * implement fetchSize parameter in NewDriver * establish alphabetical order in `testkitbackend/test_config.json` * Fix: only acquire RT once per connection acquisition attempt
1 parent 4427536 commit 755b30d

File tree

3 files changed

+47
-22
lines changed

3 files changed

+47
-22
lines changed

neo4j/io/__init__.py

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
)
5959
from threading import (
6060
Condition,
61-
Lock,
6261
RLock,
6362
)
6463
from time import perf_counter
@@ -875,7 +874,7 @@ def __init__(self, opener, pool_config, workspace_config, address):
875874
log.debug("[#0000] C: <NEO4J POOL> routing address %r", address)
876875
self.address = address
877876
self.routing_tables = {workspace_config.database: RoutingTable(database=workspace_config.database, routers=[address])}
878-
self.refresh_lock = Lock()
877+
self.refresh_lock = RLock()
879878

880879
def __repr__(self):
881880
""" The representation shows the initial routing addresses.
@@ -1109,23 +1108,25 @@ def _select_address(self, *, access_mode, database, bookmarks):
11091108
from neo4j.api import READ_ACCESS
11101109
""" Selects the address with the fewest in-use connections.
11111110
"""
1112-
self.create_routing_table(database)
1113-
self.ensure_routing_table_is_fresh(
1114-
access_mode=access_mode, database=database, bookmarks=bookmarks
1115-
)
1116-
log.debug("[#0000] C: <ROUTING TABLE ENSURE FRESH> %r", self.routing_tables)
1117-
if access_mode == READ_ACCESS:
1118-
addresses = self.routing_tables[database].readers
1119-
else:
1120-
addresses = self.routing_tables[database].writers
1121-
addresses_by_usage = {}
1122-
for address in addresses:
1123-
addresses_by_usage.setdefault(self.in_use_connection_count(address), []).append(address)
1111+
with self.refresh_lock:
1112+
if access_mode == READ_ACCESS:
1113+
addresses = self.routing_tables[database].readers
1114+
else:
1115+
addresses = self.routing_tables[database].writers
1116+
addresses_by_usage = {}
1117+
for address in addresses:
1118+
addresses_by_usage.setdefault(
1119+
self.in_use_connection_count(address), []
1120+
).append(address)
11241121
if not addresses_by_usage:
11251122
if access_mode == READ_ACCESS:
1126-
raise ReadServiceUnavailable("No read service currently available")
1123+
raise ReadServiceUnavailable(
1124+
"No read service currently available"
1125+
)
11271126
else:
1128-
raise WriteServiceUnavailable("No write service currently available")
1127+
raise WriteServiceUnavailable(
1128+
"No write service currently available"
1129+
)
11291130
return choice(addresses_by_usage[min(addresses_by_usage)])
11301131

11311132
def acquire(self, access_mode=None, timeout=None, database=None,
@@ -1137,17 +1138,24 @@ def acquire(self, access_mode=None, timeout=None, database=None,
11371138

11381139
from neo4j.api import check_access_mode
11391140
access_mode = check_access_mode(access_mode)
1141+
with self.refresh_lock:
1142+
self.create_routing_table(database)
1143+
log.debug("[#0000] C: <ROUTING TABLE ENSURE FRESH> %r", self.routing_tables)
1144+
self.ensure_routing_table_is_fresh(
1145+
access_mode=access_mode, database=database, bookmarks=bookmarks
1146+
)
1147+
11401148
while True:
11411149
try:
11421150
# Get an address for a connection that have the fewest in-use connections.
11431151
address = self._select_address(
11441152
access_mode=access_mode, database=database,
11451153
bookmarks=bookmarks
11461154
)
1147-
log.debug("[#0000] C: <ACQUIRE ADDRESS> database=%r address=%r", database, address)
11481155
except (ReadServiceUnavailable, WriteServiceUnavailable) as err:
11491156
raise SessionExpired("Failed to obtain connection towards '%s' server." % access_mode) from err
11501157
try:
1158+
log.debug("[#0000] C: <ACQUIRE ADDRESS> database=%r address=%r", database, address)
11511159
connection = self._acquire(address, timeout=timeout) # should always be a resolved address
11521160
except ServiceUnavailable:
11531161
self.deactivate(address=address)

testkitbackend/requests.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,18 @@ def NewDriver(backend, data):
7575
if data["resolverRegistered"] or data["domainNameResolverRegistered"]:
7676
resolver = resolution_func(backend, data["resolverRegistered"],
7777
data["domainNameResolverRegistered"])
78-
connection_timeout = data.get("connectionTimeoutMs", None)
78+
connection_timeout = data.get("connectionTimeoutMs")
7979
if connection_timeout is not None:
8080
connection_timeout /= 1000
81+
max_transaction_retry_time = data.get("maxTxRetryTimeMs")
82+
if max_transaction_retry_time is not None:
83+
max_transaction_retry_time /= 1000
8184
data.mark_item_as_read("domainNameResolverRegistered")
8285
driver = neo4j.GraphDatabase.driver(
8386
data["uri"], auth=auth, user_agent=data["userAgent"],
84-
resolver=resolver, connection_timeout=connection_timeout
87+
resolver=resolver, connection_timeout=connection_timeout,
88+
fetch_size=data.get("fetchSize"),
89+
max_transaction_retry_time=max_transaction_retry_time,
8590
)
8691
key = backend.next_key()
8792
backend.drivers[key] = driver
@@ -304,6 +309,13 @@ def TransactionRollback(backend, data):
304309
backend.send_response("Transaction", {"id": key})
305310

306311

312+
def TransactionClose(backend, data):
313+
key = data["txId"]
314+
tx = backend.transactions[key]
315+
tx.close()
316+
backend.send_response("Transaction", {"id": key})
317+
318+
307319
def ResultNext(backend, data):
308320
result = backend.results[data["resultId"]]
309321
try:

testkitbackend/test_config.json

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,18 @@
3232
"Feature:Auth:Custom": true,
3333
"Feature:Auth:Kerberos": true,
3434
"AuthorizationExpiredTreatment": true,
35+
"Optimization:ConnectionReuse": true,
36+
"Optimization:EagerTransactionBegin": true,
3537
"Optimization:ImplicitDefaultArguments": true,
3638
"Optimization:MinimalResets": true,
37-
"Optimization:ConnectionReuse": true,
3839
"Optimization:PullPipelining": true,
3940
"ConfHint:connection.recv_timeout_seconds": true,
40-
"Temporary:ResultKeys": true,
41+
"Temporary:CypherPathAndRelationship": true,
42+
"Temporary:DriverFetchSize": true,
43+
"Temporary:DriverMaxTxRetryTime": true,
4144
"Temporary:FullSummary": true,
42-
"Temporary:CypherPathAndRelationship": true
45+
"Temporary:ResultKeys": true,
46+
"Temporary:ResultList": "requires further specification/discussion in the team",
47+
"Temporary:TransactionClose": true
4348
}
4449
}

0 commit comments

Comments
 (0)