Skip to content

Bookmarking #104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ Example Usage
with session.begin_transaction() as write_tx:
write_tx.run("CREATE (a:Person {name:{name},age:{age}})", name="Alice", age=33)
write_tx.run("CREATE (a:Person {name:{name},age:{age}})", name="Bob", age=44)
write_tx.success = True

with session.begin_transaction() as read_tx:
result = read_tx.run("MATCH (a:Person) RETURN a.name AS name, a.age AS age")
Expand Down
24 changes: 17 additions & 7 deletions neo4j/v1/bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ class Connection(object):
.. note:: logs at INFO level
"""

in_use = False

closed = False

defunct = False

server_version = None # TODO: remove this when PR#108 is merged

#: The pool of which this connection is a member
pool = None

Expand All @@ -227,9 +235,6 @@ def __init__(self, sock, **config):
self.packer = Packer(self.channel)
self.unpacker = Unpacker()
self.responses = deque()
self.in_use = False
self.closed = False
self.defunct = False

# Determine the user agent and ensure it is a Unicode value
user_agent = config.get("user_agent", DEFAULT_USER_AGENT)
Expand All @@ -246,19 +251,21 @@ def __init__(self, sock, **config):
# Pick up the server certificate, if any
self.der_encoded_server_certificate = config.get("der_encoded_server_certificate")

def on_success(metadata):
self.server_version = metadata.get("server")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

server_version field is initialized here but where is it read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just used in a test right now. It will conflict with #108 so I'll adjust it properly for that.


def on_failure(metadata):
code = metadata.get("code")
error = (Unauthorized if code == "Neo.ClientError.Security.Unauthorized" else
ServiceUnavailable)
raise error(metadata.get("message", "INIT failed"))

response = Response(self)
response.on_success = on_success
response.on_failure = on_failure

self.append(INIT, (self.user_agent, self.auth_dict), response=response)
self.send()
while not response.complete:
self.fetch()
self.sync()

def __del__(self):
self.close()
Expand Down Expand Up @@ -367,7 +374,10 @@ def fetch(self):
else:
raise ProtocolError("Unexpected response message with signature %02X" % signature)

def fetch_all(self):
def sync(self):
""" Send and fetch all outstanding messages.
"""
self.send()
while self.responses:
response = self.responses[0]
while not response.complete:
Expand Down
28 changes: 21 additions & 7 deletions neo4j/v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ class Session(object):

transaction = None

last_bookmark = None

def __init__(self, connection, access_mode=None):
self.connection = connection
self.access_mode = access_mode
Expand All @@ -265,6 +267,8 @@ def run(self, statement, parameters=None, **kwparameters):
:return: Cypher result
:rtype: :class:`.StatementResult`
"""
self.last_bookmark = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do this initialization together with initialization in line 314?

Copy link
Contributor Author

@technige technige Dec 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes we do. This clears the bookmark before a run, it's not initialisation.


statement = _norm_statement(statement)
parameters = _norm_parameters(parameters, **kwparameters)

Expand Down Expand Up @@ -301,13 +305,15 @@ def close(self):
self.transaction.close()
if self.connection:
if not self.connection.closed:
self.connection.fetch_all()
self.connection.sync()
self.connection.in_use = False
self.connection = None

def begin_transaction(self):
def begin_transaction(self, bookmark=None):
""" Create a new :class:`.Transaction` within this session.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should docstring also say something about the bookmark parameter?


:param bookmark: a bookmark to which the server should
synchronise before beginning the transaction
:return: new :class:`.Transaction` instance.
"""
if self.transaction:
Expand All @@ -316,15 +322,23 @@ def begin_transaction(self):
def clear_transaction():
self.transaction = None

self.run("BEGIN")
parameters = {}
if bookmark is not None:
parameters["bookmark"] = bookmark

self.run("BEGIN", parameters)
self.transaction = Transaction(self, on_close=clear_transaction)
return self.transaction

def commit_transaction(self):
self.run("COMMIT")
result = self.run("COMMIT")
self.connection.sync()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to test that we send commit and rollback messages eagerly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests added for that.

summary = result.summary()
self.last_bookmark = summary.metadata.get("bookmark")

def rollback_transaction(self):
self.run("ROLLBACK")
self.connection.sync()


class Transaction(object):
Expand All @@ -342,7 +356,7 @@ class Transaction(object):
#: and rolled back otherwise. This attribute can be set in user code
#: multiple times before a transaction completes with only the final
#: value taking effect.
success = False
success = None

#: Indicator to show whether the transaction has been closed, either
#: with commit or rollback.
Expand All @@ -356,8 +370,8 @@ def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
if exc_value:
self.success = False
if self.success is None:
self.success = not bool(exc_type)
self.close()

def run(self, statement, parameters=None, **kwparameters):
Expand Down
97 changes: 97 additions & 0 deletions test/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
# limitations under the License.

from mock import patch
from unittest import skipUnless
from uuid import uuid4

from neo4j.v1 import READ_ACCESS, WRITE_ACCESS
from neo4j.v1.exceptions import CypherError, ResultError
from neo4j.v1.session import GraphDatabase, basic_auth, Record
from neo4j.v1.types import Node, Relationship, Path
Expand All @@ -31,6 +34,21 @@
AUTH_TOKEN = basic_auth("neotest", "neotest")


def get_server_version():
driver = GraphDatabase.driver(BOLT_URI, auth=AUTH_TOKEN, encrypted=False)
with driver.session() as session:
full_version = session.connection.server_version
if full_version is None:
return "Neo4j", (3, 0), ()
product, _, tagged_version = full_version.partition("/")
tags = tagged_version.split("-")
version = map(int, tags[0].split("."))
return product, tuple(version), tuple(tags[1:])


SERVER_PRODUCT, SERVER_VERSION, SERVER_TAGS = get_server_version()


class AutoCommitTransactionTestCase(ServerTestCase):

def setUp(self):
Expand Down Expand Up @@ -415,17 +433,60 @@ def test_can_rollback_transaction_using_with_block(self):
tx.run("MATCH (a) WHERE id(a) = {n} "
"SET a.foo = {foo}", {"n": node_id, "foo": "bar"})

tx.success = False

# Check the property value
result = session.run("MATCH (a) WHERE id(a) = {n} "
"RETURN a.foo", {"n": node_id})
assert len(list(result)) == 0


class BookmarkingTestCase(ServerTestCase):

def setUp(self):
self.driver = GraphDatabase.driver(BOLT_URI, auth=AUTH_TOKEN, encrypted=False)

def tearDown(self):
self.driver.close()

@skipUnless(SERVER_VERSION >= (3, 1), "Bookmarking is not supported by this version of Neo4j")
def test_can_obtain_bookmark_after_commit(self):
with self.driver.session() as session:
with session.begin_transaction() as tx:
tx.run("RETURN 1")
assert session.last_bookmark is not None

@skipUnless(SERVER_VERSION >= (3, 1), "Bookmarking is not supported by this version of Neo4j")
def test_can_pass_bookmark_into_next_transaction(self):
unique_id = uuid4().hex

with self.driver.session(WRITE_ACCESS) as session:
with session.begin_transaction() as tx:
tx.run("CREATE (a:Thing {uuid:$uuid})", uuid=unique_id)
bookmark = session.last_bookmark

assert bookmark is not None

with self.driver.session(READ_ACCESS) as session:
with session.begin_transaction(bookmark) as tx:
result = tx.run("MATCH (a:Thing {uuid:$uuid}) RETURN a", uuid=unique_id)
record_list = list(result)
assert len(record_list) == 1
record = record_list[0]
assert len(record) == 1
thing = record[0]
assert isinstance(thing, Node)
assert thing["uuid"] == unique_id


class ResultConsumptionTestCase(ServerTestCase):

def setUp(self):
self.driver = GraphDatabase.driver(BOLT_URI, auth=AUTH_TOKEN, encrypted=False)

def tearDown(self):
self.driver.close()

def test_can_consume_result_immediately(self):
session = self.driver.session()
tx = session.begin_transaction()
Expand Down Expand Up @@ -564,3 +625,39 @@ def test_peek_at_different_stages(self):
# ...when none should follow
with self.assertRaises(ResultError):
result.peek()


class SessionCommitTestCase(ServerTestCase):

def setUp(self):
self.driver = GraphDatabase.driver(BOLT_URI, auth=AUTH_TOKEN)

def tearDown(self):
self.driver.close()

def test_should_sync_after_commit(self):
with self.driver.session() as session:
tx = session.begin_transaction()
result = tx.run("RETURN 1")
tx.commit()
buffer = result._buffer
assert len(buffer) == 1
assert buffer[0][0] == 1


class SessionRollbackTestCase(ServerTestCase):

def setUp(self):
self.driver = GraphDatabase.driver(BOLT_URI, auth=AUTH_TOKEN)

def tearDown(self):
self.driver.close()

def test_should_sync_after_rollback(self):
with self.driver.session() as session:
tx = session.begin_transaction()
result = tx.run("RETURN 1")
tx.rollback()
buffer = result._buffer
assert len(buffer) == 1
assert buffer[0][0] == 1