Skip to content

Commit a39c181

Browse files
authored
Merge pull request #104 from neo4j/1.1-bookmarking
Bookmarking
2 parents 0f6d749 + f705004 commit a39c181

File tree

4 files changed

+135
-15
lines changed

4 files changed

+135
-15
lines changed

README.rst

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ Example Usage
3333
with session.begin_transaction() as write_tx:
3434
write_tx.run("CREATE (a:Person {name:{name},age:{age}})", name="Alice", age=33)
3535
write_tx.run("CREATE (a:Person {name:{name},age:{age}})", name="Bob", age=44)
36-
write_tx.success = True
3736
3837
with session.begin_transaction() as read_tx:
3938
result = read_tx.run("MATCH (a:Person) RETURN a.name AS name, a.age AS age")

neo4j/v1/bolt.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,14 @@ class Connection(object):
216216
.. note:: logs at INFO level
217217
"""
218218

219+
in_use = False
220+
221+
closed = False
222+
223+
defunct = False
224+
225+
server_version = None # TODO: remove this when PR#108 is merged
226+
219227
#: The pool of which this connection is a member
220228
pool = None
221229

@@ -227,9 +235,6 @@ def __init__(self, sock, **config):
227235
self.packer = Packer(self.channel)
228236
self.unpacker = Unpacker()
229237
self.responses = deque()
230-
self.in_use = False
231-
self.closed = False
232-
self.defunct = False
233238

234239
# Determine the user agent and ensure it is a Unicode value
235240
user_agent = config.get("user_agent", DEFAULT_USER_AGENT)
@@ -246,19 +251,21 @@ def __init__(self, sock, **config):
246251
# Pick up the server certificate, if any
247252
self.der_encoded_server_certificate = config.get("der_encoded_server_certificate")
248253

254+
def on_success(metadata):
255+
self.server_version = metadata.get("server")
256+
249257
def on_failure(metadata):
250258
code = metadata.get("code")
251259
error = (Unauthorized if code == "Neo.ClientError.Security.Unauthorized" else
252260
ServiceUnavailable)
253261
raise error(metadata.get("message", "INIT failed"))
254262

255263
response = Response(self)
264+
response.on_success = on_success
256265
response.on_failure = on_failure
257266

258267
self.append(INIT, (self.user_agent, self.auth_dict), response=response)
259-
self.send()
260-
while not response.complete:
261-
self.fetch()
268+
self.sync()
262269

263270
def __del__(self):
264271
self.close()
@@ -367,7 +374,10 @@ def fetch(self):
367374
else:
368375
raise ProtocolError("Unexpected response message with signature %02X" % signature)
369376

370-
def fetch_all(self):
377+
def sync(self):
378+
""" Send and fetch all outstanding messages.
379+
"""
380+
self.send()
371381
while self.responses:
372382
response = self.responses[0]
373383
while not response.complete:

neo4j/v1/session.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,8 @@ class Session(object):
241241

242242
transaction = None
243243

244+
last_bookmark = None
245+
244246
def __init__(self, connection, access_mode=None):
245247
self.connection = connection
246248
self.access_mode = access_mode
@@ -265,6 +267,8 @@ def run(self, statement, parameters=None, **kwparameters):
265267
:return: Cypher result
266268
:rtype: :class:`.StatementResult`
267269
"""
270+
self.last_bookmark = None
271+
268272
statement = _norm_statement(statement)
269273
parameters = _norm_parameters(parameters, **kwparameters)
270274

@@ -301,13 +305,15 @@ def close(self):
301305
self.transaction.close()
302306
if self.connection:
303307
if not self.connection.closed:
304-
self.connection.fetch_all()
308+
self.connection.sync()
305309
self.connection.in_use = False
306310
self.connection = None
307311

308-
def begin_transaction(self):
312+
def begin_transaction(self, bookmark=None):
309313
""" Create a new :class:`.Transaction` within this session.
310314
315+
:param bookmark: a bookmark to which the server should
316+
synchronise before beginning the transaction
311317
:return: new :class:`.Transaction` instance.
312318
"""
313319
if self.transaction:
@@ -316,15 +322,23 @@ def begin_transaction(self):
316322
def clear_transaction():
317323
self.transaction = None
318324

319-
self.run("BEGIN")
325+
parameters = {}
326+
if bookmark is not None:
327+
parameters["bookmark"] = bookmark
328+
329+
self.run("BEGIN", parameters)
320330
self.transaction = Transaction(self, on_close=clear_transaction)
321331
return self.transaction
322332

323333
def commit_transaction(self):
324-
self.run("COMMIT")
334+
result = self.run("COMMIT")
335+
self.connection.sync()
336+
summary = result.summary()
337+
self.last_bookmark = summary.metadata.get("bookmark")
325338

326339
def rollback_transaction(self):
327340
self.run("ROLLBACK")
341+
self.connection.sync()
328342

329343

330344
class Transaction(object):
@@ -342,7 +356,7 @@ class Transaction(object):
342356
#: and rolled back otherwise. This attribute can be set in user code
343357
#: multiple times before a transaction completes with only the final
344358
#: value taking effect.
345-
success = False
359+
success = None
346360

347361
#: Indicator to show whether the transaction has been closed, either
348362
#: with commit or rollback.
@@ -356,8 +370,8 @@ def __enter__(self):
356370
return self
357371

358372
def __exit__(self, exc_type, exc_value, traceback):
359-
if exc_value:
360-
self.success = False
373+
if self.success is None:
374+
self.success = not bool(exc_type)
361375
self.close()
362376

363377
def run(self, statement, parameters=None, **kwparameters):

test/test_session.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
# limitations under the License.
2020

2121
from mock import patch
22+
from unittest import skipUnless
23+
from uuid import uuid4
2224

25+
from neo4j.v1 import READ_ACCESS, WRITE_ACCESS
2326
from neo4j.v1.exceptions import CypherError, ResultError
2427
from neo4j.v1.session import GraphDatabase, basic_auth, Record
2528
from neo4j.v1.types import Node, Relationship, Path
@@ -31,6 +34,21 @@
3134
AUTH_TOKEN = basic_auth("neotest", "neotest")
3235

3336

37+
def get_server_version():
38+
driver = GraphDatabase.driver(BOLT_URI, auth=AUTH_TOKEN, encrypted=False)
39+
with driver.session() as session:
40+
full_version = session.connection.server_version
41+
if full_version is None:
42+
return "Neo4j", (3, 0), ()
43+
product, _, tagged_version = full_version.partition("/")
44+
tags = tagged_version.split("-")
45+
version = map(int, tags[0].split("."))
46+
return product, tuple(version), tuple(tags[1:])
47+
48+
49+
SERVER_PRODUCT, SERVER_VERSION, SERVER_TAGS = get_server_version()
50+
51+
3452
class AutoCommitTransactionTestCase(ServerTestCase):
3553

3654
def setUp(self):
@@ -415,17 +433,60 @@ def test_can_rollback_transaction_using_with_block(self):
415433
tx.run("MATCH (a) WHERE id(a) = {n} "
416434
"SET a.foo = {foo}", {"n": node_id, "foo": "bar"})
417435

436+
tx.success = False
437+
418438
# Check the property value
419439
result = session.run("MATCH (a) WHERE id(a) = {n} "
420440
"RETURN a.foo", {"n": node_id})
421441
assert len(list(result)) == 0
422442

423443

444+
class BookmarkingTestCase(ServerTestCase):
445+
446+
def setUp(self):
447+
self.driver = GraphDatabase.driver(BOLT_URI, auth=AUTH_TOKEN, encrypted=False)
448+
449+
def tearDown(self):
450+
self.driver.close()
451+
452+
@skipUnless(SERVER_VERSION >= (3, 1), "Bookmarking is not supported by this version of Neo4j")
453+
def test_can_obtain_bookmark_after_commit(self):
454+
with self.driver.session() as session:
455+
with session.begin_transaction() as tx:
456+
tx.run("RETURN 1")
457+
assert session.last_bookmark is not None
458+
459+
@skipUnless(SERVER_VERSION >= (3, 1), "Bookmarking is not supported by this version of Neo4j")
460+
def test_can_pass_bookmark_into_next_transaction(self):
461+
unique_id = uuid4().hex
462+
463+
with self.driver.session(WRITE_ACCESS) as session:
464+
with session.begin_transaction() as tx:
465+
tx.run("CREATE (a:Thing {uuid:$uuid})", uuid=unique_id)
466+
bookmark = session.last_bookmark
467+
468+
assert bookmark is not None
469+
470+
with self.driver.session(READ_ACCESS) as session:
471+
with session.begin_transaction(bookmark) as tx:
472+
result = tx.run("MATCH (a:Thing {uuid:$uuid}) RETURN a", uuid=unique_id)
473+
record_list = list(result)
474+
assert len(record_list) == 1
475+
record = record_list[0]
476+
assert len(record) == 1
477+
thing = record[0]
478+
assert isinstance(thing, Node)
479+
assert thing["uuid"] == unique_id
480+
481+
424482
class ResultConsumptionTestCase(ServerTestCase):
425483

426484
def setUp(self):
427485
self.driver = GraphDatabase.driver(BOLT_URI, auth=AUTH_TOKEN, encrypted=False)
428486

487+
def tearDown(self):
488+
self.driver.close()
489+
429490
def test_can_consume_result_immediately(self):
430491
session = self.driver.session()
431492
tx = session.begin_transaction()
@@ -564,3 +625,39 @@ def test_peek_at_different_stages(self):
564625
# ...when none should follow
565626
with self.assertRaises(ResultError):
566627
result.peek()
628+
629+
630+
class SessionCommitTestCase(ServerTestCase):
631+
632+
def setUp(self):
633+
self.driver = GraphDatabase.driver(BOLT_URI, auth=AUTH_TOKEN)
634+
635+
def tearDown(self):
636+
self.driver.close()
637+
638+
def test_should_sync_after_commit(self):
639+
with self.driver.session() as session:
640+
tx = session.begin_transaction()
641+
result = tx.run("RETURN 1")
642+
tx.commit()
643+
buffer = result._buffer
644+
assert len(buffer) == 1
645+
assert buffer[0][0] == 1
646+
647+
648+
class SessionRollbackTestCase(ServerTestCase):
649+
650+
def setUp(self):
651+
self.driver = GraphDatabase.driver(BOLT_URI, auth=AUTH_TOKEN)
652+
653+
def tearDown(self):
654+
self.driver.close()
655+
656+
def test_should_sync_after_rollback(self):
657+
with self.driver.session() as session:
658+
tx = session.begin_transaction()
659+
result = tx.run("RETURN 1")
660+
tx.rollback()
661+
buffer = result._buffer
662+
assert len(buffer) == 1
663+
assert buffer[0][0] == 1

0 commit comments

Comments
 (0)