From b9706d86467116abf0fa0127f62db901fc579b4b Mon Sep 17 00:00:00 2001 From: Nigel Small Date: Mon, 5 Dec 2016 10:53:59 +0000 Subject: [PATCH 1/4] Bookmarking --- README.rst | 1 - neo4j/v1/bolt.py | 24 ++++++++++++------ neo4j/v1/session.py | 26 ++++++++++++++------ test/test_session.py | 58 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 94 insertions(+), 15 deletions(-) diff --git a/README.rst b/README.rst index c894fb85d..8d88676ba 100644 --- a/README.rst +++ b/README.rst @@ -33,7 +33,6 @@ Example Usage with session.begin_transaction() as write_tx: write_tx.run("CREATE (a:Person {name:{name},age:{age}})", name="Alice", age=33) write_tx.run("CREATE (a:Person {name:{name},age:{age}})", name="Bob", age=44) - write_tx.success = True with session.begin_transaction() as read_tx: result = read_tx.run("MATCH (a:Person) RETURN a.name AS name, a.age AS age") diff --git a/neo4j/v1/bolt.py b/neo4j/v1/bolt.py index f237ed564..85ec15975 100644 --- a/neo4j/v1/bolt.py +++ b/neo4j/v1/bolt.py @@ -216,6 +216,14 @@ class Connection(object): .. note:: logs at INFO level """ + in_use = False + + closed = False + + defunct = False + + server_version = None + #: The pool of which this connection is a member pool = None @@ -227,9 +235,6 @@ def __init__(self, sock, **config): self.packer = Packer(self.channel) self.unpacker = Unpacker() self.responses = deque() - self.in_use = False - self.closed = False - self.defunct = False # Determine the user agent and ensure it is a Unicode value user_agent = config.get("user_agent", DEFAULT_USER_AGENT) @@ -246,6 +251,9 @@ def __init__(self, sock, **config): # Pick up the server certificate, if any self.der_encoded_server_certificate = config.get("der_encoded_server_certificate") + def on_success(metadata): + self.server_version = metadata.get("server") + def on_failure(metadata): code = metadata.get("code") error = (Unauthorized if code == "Neo.ClientError.Security.Unauthorized" else @@ -253,12 +261,11 @@ def on_failure(metadata): raise error(metadata.get("message", "INIT failed")) response = Response(self) + response.on_success = on_success response.on_failure = on_failure self.append(INIT, (self.user_agent, self.auth_dict), response=response) - self.send() - while not response.complete: - self.fetch() + self.sync() def __del__(self): self.close() @@ -367,7 +374,10 @@ def fetch(self): else: raise ProtocolError("Unexpected response message with signature %02X" % signature) - def fetch_all(self): + def sync(self): + """ Send and fetch all outstanding messages. + """ + self.send() while self.responses: response = self.responses[0] while not response.complete: diff --git a/neo4j/v1/session.py b/neo4j/v1/session.py index 2b96c4352..3f4182c5c 100644 --- a/neo4j/v1/session.py +++ b/neo4j/v1/session.py @@ -241,6 +241,8 @@ class Session(object): transaction = None + last_bookmark = None + def __init__(self, connection, access_mode=None): self.connection = connection self.access_mode = access_mode @@ -265,6 +267,8 @@ def run(self, statement, parameters=None, **kwparameters): :return: Cypher result :rtype: :class:`.StatementResult` """ + self.last_bookmark = None + statement = _norm_statement(statement) parameters = _norm_parameters(parameters, **kwparameters) @@ -301,11 +305,11 @@ def close(self): self.transaction.close() if self.connection: if not self.connection.closed: - self.connection.fetch_all() + self.connection.sync() self.connection.in_use = False self.connection = None - def begin_transaction(self): + def begin_transaction(self, bookmark=None): """ Create a new :class:`.Transaction` within this session. :return: new :class:`.Transaction` instance. @@ -316,15 +320,23 @@ def begin_transaction(self): def clear_transaction(): self.transaction = None - self.run("BEGIN") + parameters = {} + if bookmark is not None: + parameters["bookmark"] = bookmark + + self.run("BEGIN", parameters) self.transaction = Transaction(self, on_close=clear_transaction) return self.transaction def commit_transaction(self): - self.run("COMMIT") + result = self.run("COMMIT") + self.connection.sync() + summary = result.summary() + self.last_bookmark = summary.metadata.get("bookmark") def rollback_transaction(self): self.run("ROLLBACK") + self.connection.sync() class Transaction(object): @@ -342,7 +354,7 @@ class Transaction(object): #: and rolled back otherwise. This attribute can be set in user code #: multiple times before a transaction completes with only the final #: value taking effect. - success = False + success = None #: Indicator to show whether the transaction has been closed, either #: with commit or rollback. @@ -356,8 +368,8 @@ def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): - if exc_value: - self.success = False + if self.success is None: + self.success = not bool(exc_type) self.close() def run(self, statement, parameters=None, **kwparameters): diff --git a/test/test_session.py b/test/test_session.py index b8d2022c7..2f49fc1f5 100644 --- a/test/test_session.py +++ b/test/test_session.py @@ -19,7 +19,10 @@ # limitations under the License. from mock import patch +from unittest import skipUnless +from uuid import uuid4 +from neo4j.v1 import READ_ACCESS, WRITE_ACCESS from neo4j.v1.exceptions import CypherError, ResultError from neo4j.v1.session import GraphDatabase, basic_auth, Record from neo4j.v1.types import Node, Relationship, Path @@ -31,6 +34,21 @@ AUTH_TOKEN = basic_auth("neotest", "neotest") +def get_server_version(): + driver = GraphDatabase.driver(BOLT_URI, auth=AUTH_TOKEN, encrypted=False) + with driver.session() as session: + full_version = session.connection.server_version + if full_version is None: + return "Neo4j", (3, 0), () + product, _, tagged_version = full_version.partition("/") + tags = tagged_version.split("-") + version = map(int, tags[0].split(".")) + return product, tuple(version), tuple(tags[1:]) + + +SERVER_PRODUCT, SERVER_VERSION, SERVER_TAGS = get_server_version() + + class AutoCommitTransactionTestCase(ServerTestCase): def setUp(self): @@ -415,12 +433,52 @@ def test_can_rollback_transaction_using_with_block(self): tx.run("MATCH (a) WHERE id(a) = {n} " "SET a.foo = {foo}", {"n": node_id, "foo": "bar"}) + tx.success = False + # Check the property value result = session.run("MATCH (a) WHERE id(a) = {n} " "RETURN a.foo", {"n": node_id}) assert len(list(result)) == 0 +class BookmarkingTestCase(ServerTestCase): + + def setUp(self): + self.driver = GraphDatabase.driver(BOLT_URI, auth=AUTH_TOKEN, encrypted=False) + + def tearDown(self): + self.driver.close() + + @skipUnless(SERVER_VERSION >= (3, 1), "Bookmarking is not supported by this version of Neo4j") + def test_can_obtain_bookmark_after_commit(self): + with self.driver.session() as session: + with session.begin_transaction() as tx: + tx.run("RETURN 1") + assert session.last_bookmark is not None + + @skipUnless(SERVER_VERSION >= (3, 1), "Bookmarking is not supported by this version of Neo4j") + def test_can_pass_bookmark_into_next_transaction(self): + unique_id = uuid4().hex + + with self.driver.session(WRITE_ACCESS) as session: + with session.begin_transaction() as tx: + tx.run("CREATE (a:Thing {uuid:$uuid})", uuid=unique_id) + bookmark = session.last_bookmark + + assert bookmark is not None + + with self.driver.session(READ_ACCESS) as session: + with session.begin_transaction(bookmark) as tx: + result = tx.run("MATCH (a:Thing {uuid:$uuid}) RETURN a", uuid=unique_id) + record_list = list(result) + assert len(record_list) == 1 + record = record_list[0] + assert len(record) == 1 + thing = record[0] + assert isinstance(thing, Node) + assert thing["uuid"] == unique_id + + class ResultConsumptionTestCase(ServerTestCase): def setUp(self): From a8314d110f8b70453995cfd5923ebd08ae137dee Mon Sep 17 00:00:00 2001 From: Nigel Small Date: Wed, 14 Dec 2016 09:38:50 +0000 Subject: [PATCH 2/4] Added docstring --- neo4j/v1/session.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/neo4j/v1/session.py b/neo4j/v1/session.py index 3f4182c5c..b92fc2dcd 100644 --- a/neo4j/v1/session.py +++ b/neo4j/v1/session.py @@ -312,6 +312,8 @@ def close(self): def begin_transaction(self, bookmark=None): """ Create a new :class:`.Transaction` within this session. + :param bookmark: a bookmark to which the server should + synchronise before beginning the transaction :return: new :class:`.Transaction` instance. """ if self.transaction: From 4332e68e99056a9204a7f8f3b3493ee584ffa043 Mon Sep 17 00:00:00 2001 From: Nigel Small Date: Wed, 14 Dec 2016 15:02:59 +0000 Subject: [PATCH 3/4] Tests for checking sync after commit/rollback --- test/test_session.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/test/test_session.py b/test/test_session.py index 2f49fc1f5..6fc4fd3fd 100644 --- a/test/test_session.py +++ b/test/test_session.py @@ -484,6 +484,9 @@ class ResultConsumptionTestCase(ServerTestCase): def setUp(self): self.driver = GraphDatabase.driver(BOLT_URI, auth=AUTH_TOKEN, encrypted=False) + def tearDown(self): + self.driver.close() + def test_can_consume_result_immediately(self): session = self.driver.session() tx = session.begin_transaction() @@ -622,3 +625,39 @@ def test_peek_at_different_stages(self): # ...when none should follow with self.assertRaises(ResultError): result.peek() + + +class SessionCommitTestCase(ServerTestCase): + + def setUp(self): + self.driver = GraphDatabase.driver(BOLT_URI, auth=AUTH_TOKEN) + + def tearDown(self): + self.driver.close() + + def test_should_sync_after_commit(self): + with self.driver.session() as session: + tx = session.begin_transaction() + result = tx.run("RETURN 1") + tx.commit() + buffer = result._buffer + assert len(buffer) == 1 + assert buffer[0][0] == 1 + + +class SessionRollbackTestCase(ServerTestCase): + + def setUp(self): + self.driver = GraphDatabase.driver(BOLT_URI, auth=AUTH_TOKEN) + + def tearDown(self): + self.driver.close() + + def test_should_sync_after_rollback(self): + with self.driver.session() as session: + tx = session.begin_transaction() + result = tx.run("RETURN 1") + tx.rollback() + buffer = result._buffer + assert len(buffer) == 1 + assert buffer[0][0] == 1 From f705004bd090d0665d21c6e3fe4785a2ae78abb1 Mon Sep 17 00:00:00 2001 From: Nigel Small Date: Wed, 14 Dec 2016 15:05:14 +0000 Subject: [PATCH 4/4] added TODO --- neo4j/v1/bolt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neo4j/v1/bolt.py b/neo4j/v1/bolt.py index 85ec15975..417aba0fc 100644 --- a/neo4j/v1/bolt.py +++ b/neo4j/v1/bolt.py @@ -222,7 +222,7 @@ class Connection(object): defunct = False - server_version = None + server_version = None # TODO: remove this when PR#108 is merged #: The pool of which this connection is a member pool = None