Skip to content

Commit 7eb49eb

Browse files
authored
Implement sticky home db in pipeline (#607)
1 parent 49bca20 commit 7eb49eb

File tree

2 files changed

+38
-51
lines changed

2 files changed

+38
-51
lines changed

neo4j/work/__init__.py

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
from neo4j.conf import WorkspaceConfig
2323
from neo4j.exceptions import ServiceUnavailable
24+
from neo4j.io import Neo4jPool
2425

2526

2627
class Workspace:
@@ -31,6 +32,9 @@ def __init__(self, pool, config):
3132
self._config = config
3233
self._connection = None
3334
self._connection_access_mode = None
35+
# Sessions are supposed to cache the database on which to operate.
36+
self._cached_database = False
37+
self._bookmarks = None
3438

3539
def __del__(self):
3640
try:
@@ -44,15 +48,43 @@ def __enter__(self):
4448
def __exit__(self, exc_type, exc_value, traceback):
4549
self.close()
4650

51+
def _set_cached_database(self, database):
52+
self._cached_database = True
53+
self._config.database = database
54+
4755
def _connect(self, access_mode):
4856
if self._connection:
49-
if access_mode == self._connection_access_mode:
50-
return
51-
self._disconnect(sync=True)
52-
self._connection = self._pool.acquire(access_mode=access_mode, timeout=self._config.connection_acquisition_timeout, database=self._config.database)
57+
# TODO: Investigate this
58+
# log.warning("FIXME: should always disconnect before connect")
59+
self._connection.send_all()
60+
self._connection.fetch_all()
61+
self._disconnect()
62+
if not self._cached_database:
63+
if (self._config.database is not None
64+
or not isinstance(self._pool, Neo4jPool)):
65+
self._set_cached_database(self._config.database)
66+
else:
67+
# This is the first time we open a connection to a server in a
68+
# cluster environment for this session without explicitly
69+
# configured database. Hence, we request a routing table update
70+
# to try to fetch the home database. If provided by the server,
71+
# we shall use this database explicitly for all subsequent
72+
# actions within this session.
73+
self._pool.update_routing_table(
74+
database=self._config.database,
75+
imp_user=self._config.impersonated_user,
76+
bookmarks=self._bookmarks,
77+
database_callback=self._set_cached_database
78+
)
79+
self._connection = self._pool.acquire(
80+
access_mode=access_mode,
81+
timeout=self._config.connection_acquisition_timeout,
82+
database=self._config.database,
83+
bookmarks=self._bookmarks
84+
)
5385
self._connection_access_mode = access_mode
5486

55-
def _disconnect(self, sync):
87+
def _disconnect(self, sync=False):
5688
if self._connection:
5789
if sync:
5890
try:

neo4j/work/simple.py

Lines changed: 1 addition & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
TransientError,
4242
TransactionError,
4343
)
44-
from neo4j.io import Neo4jPool
4544
from neo4j.work import Workspace
4645
from neo4j.work.result import Result
4746
from neo4j.work.transaction import Transaction
@@ -78,13 +77,6 @@ class Session(Workspace):
7877
# The current auto-transaction result, if any.
7978
_autoResult = None
8079

81-
# The set of bookmarks after which the next
82-
# :class:`.Transaction` should be carried out.
83-
_bookmarks = None
84-
85-
# Sessions are supposed to cache the database on which to operate.
86-
_cached_database = False
87-
8880
# The state this session is in.
8981
_state_failed = False
9082

@@ -110,47 +102,10 @@ def __exit__(self, exception_type, exception_value, traceback):
110102
self._state_failed = True
111103
self.close()
112104

113-
def _set_cached_database(self, database):
114-
self._cached_database = True
115-
self._config.database = database
116-
117105
def _connect(self, access_mode):
118106
if access_mode is None:
119107
access_mode = self._config.default_access_mode
120-
if self._connection:
121-
# TODO: Investigate this
122-
# log.warning("FIXME: should always disconnect before connect")
123-
self._connection.send_all()
124-
self._connection.fetch_all()
125-
self._disconnect()
126-
if not self._cached_database:
127-
if (self._config.database is not None
128-
or not isinstance(self._pool, Neo4jPool)):
129-
self._set_cached_database(self._config.database)
130-
else:
131-
# This is the first time we open a connection to a server in a
132-
# cluster environment for this session without explicitly
133-
# configured database. Hence, we request a routing table update
134-
# to try to fetch the home database. If provided by the server,
135-
# we shall use this database explicitly for all subsequent
136-
# actions within this session.
137-
self._pool.update_routing_table(
138-
database=self._config.database,
139-
imp_user=self._config.impersonated_user,
140-
bookmarks=self._bookmarks,
141-
database_callback=self._set_cached_database
142-
)
143-
self._connection = self._pool.acquire(
144-
access_mode=access_mode,
145-
timeout=self._config.connection_acquisition_timeout,
146-
database=self._config.database,
147-
bookmarks=self._bookmarks
148-
)
149-
150-
def _disconnect(self):
151-
if self._connection:
152-
self._pool.release(self._connection)
153-
self._connection = None
108+
super()._connect(access_mode)
154109

155110
def _collect_bookmark(self, bookmark):
156111
if bookmark:

0 commit comments

Comments
 (0)