Skip to content

Commit 10ac871

Browse files
committed
Bookmarking
1 parent f7b74e4 commit 10ac871

File tree

4 files changed

+94
-15
lines changed

4 files changed

+94
-15
lines changed

README.rst

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ Example Usage
3333
with session.begin_transaction() as write_tx:
3434
write_tx.run("CREATE (a:Person {name:{name},age:{age}})", name="Alice", age=33)
3535
write_tx.run("CREATE (a:Person {name:{name},age:{age}})", name="Bob", age=44)
36-
write_tx.success = True
3736
3837
with session.begin_transaction() as read_tx:
3938
result = read_tx.run("MATCH (a:Person) RETURN a.name AS name, a.age AS age")

neo4j/v1/bolt.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,14 @@ class Connection(object):
211211
.. note:: logs at INFO level
212212
"""
213213

214+
in_use = False
215+
216+
closed = False
217+
218+
defunct = False
219+
220+
server_version = None
221+
214222
def __init__(self, sock, **config):
215223
self.socket = sock
216224
self.buffering_socket = BufferingSocket(sock)
@@ -219,9 +227,6 @@ def __init__(self, sock, **config):
219227
self.packer = Packer(self.channel)
220228
self.unpacker = Unpacker()
221229
self.responses = deque()
222-
self.in_use = False
223-
self.closed = False
224-
self.defunct = False
225230

226231
# Determine the user agent and ensure it is a Unicode value
227232
user_agent = config.get("user_agent", DEFAULT_USER_AGENT)
@@ -238,19 +243,21 @@ def __init__(self, sock, **config):
238243
# Pick up the server certificate, if any
239244
self.der_encoded_server_certificate = config.get("der_encoded_server_certificate")
240245

246+
def on_success(metadata):
247+
self.server_version = metadata.get("server")
248+
241249
def on_failure(metadata):
242250
code = metadata.get("code")
243251
error = (Unauthorized if code == "Neo.ClientError.Security.Unauthorized" else
244252
ServiceUnavailable)
245253
raise error(metadata.get("message", "INIT failed"))
246254

247255
response = Response(self)
256+
response.on_success = on_success
248257
response.on_failure = on_failure
249258

250259
self.append(INIT, (self.user_agent, self.auth_dict), response=response)
251-
self.send()
252-
while not response.complete:
253-
self.fetch()
260+
self.sync()
254261

255262
def __del__(self):
256263
self.close()
@@ -359,7 +366,10 @@ def fetch(self):
359366
else:
360367
raise ProtocolError("Unexpected response message with signature %02X" % signature)
361368

362-
def fetch_all(self):
369+
def sync(self):
370+
""" Send and fetch all outstanding messages.
371+
"""
372+
self.send()
363373
while self.responses:
364374
response = self.responses[0]
365375
while not response.complete:

neo4j/v1/session.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,8 @@ class Session(object):
311311

312312
transaction = None
313313

314+
last_bookmark = None
315+
314316
def __init__(self, driver, connection, access_mode=None):
315317
self.driver = driver
316318
self.connection = connection
@@ -336,6 +338,8 @@ def run(self, statement, parameters=None, **kwparameters):
336338
:return: Cypher result
337339
:rtype: :class:`.StatementResult`
338340
"""
341+
self.last_bookmark = None
342+
339343
statement = _norm_statement(statement)
340344
parameters = _norm_parameters(parameters, **kwparameters)
341345

@@ -372,11 +376,11 @@ def close(self):
372376
self.transaction.close()
373377
if self.connection:
374378
if not self.connection.closed:
375-
self.connection.fetch_all()
379+
self.connection.sync()
376380
self.connection.in_use = False
377381
self.connection = None
378382

379-
def begin_transaction(self):
383+
def begin_transaction(self, bookmark=None):
380384
""" Create a new :class:`.Transaction` within this session.
381385
382386
:return: new :class:`.Transaction` instance.
@@ -387,15 +391,23 @@ def begin_transaction(self):
387391
def clear_transaction():
388392
self.transaction = None
389393

390-
self.run("BEGIN")
394+
parameters = {}
395+
if bookmark is not None:
396+
parameters["bookmark"] = bookmark
397+
398+
self.run("BEGIN", parameters)
391399
self.transaction = Transaction(self, on_close=clear_transaction)
392400
return self.transaction
393401

394402
def commit_transaction(self):
395-
self.run("COMMIT")
403+
result = self.run("COMMIT")
404+
self.connection.sync()
405+
summary = result.summary()
406+
self.last_bookmark = summary.metadata.get("bookmark")
396407

397408
def rollback_transaction(self):
398409
self.run("ROLLBACK")
410+
self.connection.sync()
399411

400412

401413
class Transaction(object):
@@ -413,7 +425,7 @@ class Transaction(object):
413425
#: and rolled back otherwise. This attribute can be set in user code
414426
#: multiple times before a transaction completes with only the final
415427
#: value taking effect.
416-
success = False
428+
success = None
417429

418430
#: Indicator to show whether the transaction has been closed, either
419431
#: with commit or rollback.
@@ -427,8 +439,8 @@ def __enter__(self):
427439
return self
428440

429441
def __exit__(self, exc_type, exc_value, traceback):
430-
if exc_value:
431-
self.success = False
442+
if self.success is None:
443+
self.success = not bool(exc_type)
432444
self.close()
433445

434446
def run(self, statement, parameters=None, **kwparameters):

test/test_session.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
# limitations under the License.
2020

2121
from mock import patch
22+
from unittest import skipUnless
23+
from uuid import uuid4
2224

25+
from neo4j.v1 import READ_ACCESS, WRITE_ACCESS
2326
from neo4j.v1.exceptions import CypherError, ResultError
2427
from neo4j.v1.session import GraphDatabase, basic_auth, Record
2528
from neo4j.v1.types import Node, Relationship, Path
@@ -31,6 +34,21 @@
3134
AUTH_TOKEN = basic_auth("neotest", "neotest")
3235

3336

37+
def get_server_version():
38+
driver = GraphDatabase.driver(BOLT_URI, auth=AUTH_TOKEN, encrypted=False)
39+
with driver.session() as session:
40+
full_version = session.connection.server_version
41+
if full_version is None:
42+
return "Neo4j", (3, 0), ()
43+
product, _, tagged_version = full_version.partition("/")
44+
tags = tagged_version.split("-")
45+
version = map(int, tags[0].split("."))
46+
return product, tuple(version), tuple(tags[1:])
47+
48+
49+
SERVER_PRODUCT, SERVER_VERSION, SERVER_TAGS = get_server_version()
50+
51+
3452
class AutoCommitTransactionTestCase(ServerTestCase):
3553

3654
def setUp(self):
@@ -415,12 +433,52 @@ def test_can_rollback_transaction_using_with_block(self):
415433
tx.run("MATCH (a) WHERE id(a) = {n} "
416434
"SET a.foo = {foo}", {"n": node_id, "foo": "bar"})
417435

436+
tx.success = False
437+
418438
# Check the property value
419439
result = session.run("MATCH (a) WHERE id(a) = {n} "
420440
"RETURN a.foo", {"n": node_id})
421441
assert len(list(result)) == 0
422442

423443

444+
class BookmarkingTestCase(ServerTestCase):
445+
446+
def setUp(self):
447+
self.driver = GraphDatabase.driver(BOLT_URI, auth=AUTH_TOKEN, encrypted=False)
448+
449+
def tearDown(self):
450+
self.driver.close()
451+
452+
@skipUnless(SERVER_VERSION >= (3, 1), "Bookmarking is not supported by this version of Neo4j")
453+
def test_can_obtain_bookmark_after_commit(self):
454+
with self.driver.session() as session:
455+
with session.begin_transaction() as tx:
456+
tx.run("RETURN 1")
457+
assert session.last_bookmark is not None
458+
459+
@skipUnless(SERVER_VERSION >= (3, 1), "Bookmarking is not supported by this version of Neo4j")
460+
def test_can_pass_bookmark_into_next_transaction(self):
461+
unique_id = uuid4().hex
462+
463+
with self.driver.session(WRITE_ACCESS) as session:
464+
with session.begin_transaction() as tx:
465+
tx.run("CREATE (a:Thing {uuid:$uuid})", uuid=unique_id)
466+
bookmark = session.last_bookmark
467+
468+
assert bookmark is not None
469+
470+
with self.driver.session(READ_ACCESS) as session:
471+
with session.begin_transaction(bookmark) as tx:
472+
result = tx.run("MATCH (a:Thing {uuid:$uuid}) RETURN a", uuid=unique_id)
473+
record_list = list(result)
474+
assert len(record_list) == 1
475+
record = record_list[0]
476+
assert len(record) == 1
477+
thing = record[0]
478+
assert isinstance(thing, Node)
479+
assert thing["uuid"] == unique_id
480+
481+
424482
class ResultConsumptionTestCase(ServerTestCase):
425483

426484
def setUp(self):

0 commit comments

Comments
 (0)