Skip to content

Raise on using Results out of transaction scope #652

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
- Python 3.10 support added
- Python 3.6 support has been dropped.
- `Result`, `Session`, and `Transaction` can no longer be imported from
`neo4j.work`. They should've been imported from `neo4j` all along.
`neo4j.work`. They should've been imported from `neo4j` all along.
Remark: It's recommended to import everything needed directly from `noe4j`,
not its submodules or subpackages.
- Experimental pipelines feature has been removed.
- Experimental async driver has been added.
- `ResultSummary.server.version_info` has been removed.
Expand Down Expand Up @@ -65,6 +67,17 @@
destructor will ever be called. A `ResourceWarning` is emitted instead.
Make sure to configure Python to output those warnings when developing your
application locally (it does not by default).
- Result scope:
- Records of Results cannot be accessed (`peek`, `single`, `iter`, ...)
after their owning transaction has been closed, committed, or rolled back.
Previously, this would yield undefined behavior.
It now raises a `ResultConsumedError`.
- Records of Results cannot be accessed (`peek`, `single`, `iter`, ...)
after the Result has been consumed (`Result.consume()`).
Previously, this would always yield no records.
It now raises a `ResultConsumedError`.
- New method `Result.closed()` can be used to check for this condition if
necessary.


## Version 4.4
Expand Down
2 changes: 2 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,8 @@ A :class:`neo4j.Result` is attached to an active connection, through a :class:`n

.. automethod:: data

.. automethod:: closed

See https://neo4j.com/docs/driver-manual/current/cypher-workflow/#driver-type-mapping for more about type mapping.


Expand Down
2 changes: 2 additions & 0 deletions docs/source/async_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -501,4 +501,6 @@ A :class:`neo4j.AsyncResult` is attached to an active connection, through a :cla

.. automethod:: data

.. automethod:: closed

See https://neo4j.com/docs/driver-manual/current/cypher-workflow/#driver-type-mapping for more about type mapping.
96 changes: 88 additions & 8 deletions neo4j/_async/work/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,25 @@

from ..._async_compat.util import AsyncUtil
from ...data import DataDehydrator
from ...exceptions import ResultNotSingleError
from ...exceptions import (
ResultConsumedError,
ResultNotSingleError,
)
from ...work import ResultSummary
from ..io import ConnectionErrorHandler


_RESULT_OUT_OF_SCOPE_ERROR = (
"The result is out of scope. The associated transaction "
"has been closed. Results can only be used while the "
"transaction is open."
)
_RESULT_CONSUMED_ERROR = (
"The result has been consumed. Fetch all needed records before calling "
"Result.consume()."
)


class AsyncResult:
"""A handler for the result of Cypher query execution. Instances
of this class are typically constructed and returned by
Expand Down Expand Up @@ -52,7 +66,11 @@ def __init__(self, connection, hydrant, fetch_size, on_closed,
# there ar more records available to pull from the server
self._has_more = False
# the result has been fully iterated or consumed
self._closed = False
self._exhausted = False
# the result has been consumed
self._consumed = False
# the result has been closed as a result of closing the transaction
self._out_of_scope = False

@property
def _qid(self):
Expand Down Expand Up @@ -194,7 +212,11 @@ async def __aiter__(self):
self._pull()
await self._connection.send_all()

self._closed = True
self._exhausted = True
if self._out_of_scope:
raise ResultConsumedError(self, _RESULT_OUT_OF_SCOPE_ERROR)
if self._consumed:
raise ResultConsumedError(self, _RESULT_CONSUMED_ERROR)

async def __anext__(self):
return await self.__aiter__().__anext__()
Expand All @@ -203,7 +225,7 @@ async def _attach(self):
"""Sets the Result object in an attached state by fetching messages from
the connection to the buffer.
"""
if self._closed is False:
if self._exhausted is False:
while self._attached is False:
await self._connection.fetch_message()

Expand All @@ -215,14 +237,18 @@ async def _buffer(self, n=None):
Might ent up with fewer records in the buffer if there are not enough
records available.
"""
if self._out_of_scope:
raise ResultConsumedError(self, _RESULT_OUT_OF_SCOPE_ERROR)
if self._consumed:
raise ResultConsumedError(self, _RESULT_CONSUMED_ERROR)
if n is not None and len(self._record_buffer) >= n:
return
record_buffer = deque()
async for record in self:
record_buffer.append(record)
if n is not None and len(record_buffer) >= n:
break
self._closed = False
self._exhausted = False
if n is None:
self._record_buffer = record_buffer
else:
Expand Down Expand Up @@ -260,6 +286,14 @@ def keys(self):
"""
return self._keys

async def _tx_end(self):
# Handle closure of the associated transaction.
#
# This will consume the result and mark it at out of scope.
# Subsequent calls to `next` will raise a ResultConsumedError.
await self.consume()
self._out_of_scope = True

async def consume(self):
"""Consume the remainder of this result and return a :class:`neo4j.ResultSummary`.

Expand Down Expand Up @@ -296,12 +330,14 @@ async def get_two_tx(tx):

:returns: The :class:`neo4j.ResultSummary` for this result
"""
if self._closed is False:
if self._exhausted is False:
self._discarding = True
async for _ in self:
pass

return self._obtain_summary()
summary = self._obtain_summary()
self._consumed = True
return summary

async def single(self):
"""Obtain the next and only remaining record from this result if available else return None.
Expand All @@ -311,16 +347,21 @@ async def single(self):
the first of these is still returned.

:returns: the next :class:`neo4j.AsyncRecord`.
:raises: ResultNotSingleError if not exactly one record is available.

:raises ResultNotSingleError: if not exactly one record is available.
:raises ResultConsumedError: if the transaction from which this result was
obtained has been closed.
"""
await self._buffer(2)
if not self._record_buffer:
raise ResultNotSingleError(
self,
"No records found. "
"Make sure your query returns exactly one record."
)
elif len(self._record_buffer) > 1:
raise ResultNotSingleError(
self,
"More than one record found. "
"Make sure your query returns exactly one record."
)
Expand All @@ -331,6 +372,10 @@ async def peek(self):
This leaves the record in the buffer for further processing.

:returns: the next :class:`.Record` or :const:`None` if none remain

:raises ResultConsumedError: if the transaction from which this result
was obtained has been closed or the Result has been explicitly
consumed.
"""
await self._buffer(1)
if self._record_buffer:
Expand All @@ -343,6 +388,10 @@ async def graph(self):

:returns: a result graph
:rtype: :class:`neo4j.graph.Graph`

:raises ResultConsumedError: if the transaction from which this result
was obtained has been closed or the Result has been explicitly
consumed.
"""
await self._buffer_all()
return self._hydrant.graph
Expand All @@ -354,8 +403,13 @@ async def value(self, key=0, default=None):

:param key: field to return for each remaining record. Obtain a single value from the record by index or key.
:param default: default value, used if the index of key is unavailable

:returns: list of individual values
:rtype: list

:raises ResultConsumedError: if the transaction from which this result
was obtained has been closed or the Result has been explicitly
consumed.
"""
return [record.value(key, default) async for record in self]

Expand All @@ -365,8 +419,13 @@ async def values(self, *keys):
See :class:`neo4j.AsyncRecord.values`

:param keys: fields to return for each remaining record. Optionally filtering to include only certain values by index or key.

:returns: list of values lists
:rtype: list

:raises ResultConsumedError: if the transaction from which this result
was obtained has been closed or the Result has been explicitly
consumed.
"""
return [record.values(*keys) async for record in self]

Expand All @@ -376,7 +435,28 @@ async def data(self, *keys):
See :class:`neo4j.AsyncRecord.data`

:param keys: fields to return for each remaining record. Optionally filtering to include only certain values by index or key.

:returns: list of dictionaries
:rtype: list

:raises ResultConsumedError: if the transaction from which this result was
obtained has been closed.
"""
return [record.data(*keys) async for record in self]

def closed(self):
"""Return True if the result has been closed.

When a result gets consumed :meth:`consume` or the transaction that
owns the result gets closed (committed, rolled back, closed), the
result cannot be used to acquire further records.

In such case, all methods that need to access the Result's records,
will raise a :exc:`ResultConsumedError` when called.

:returns: whether the result is closed.
:rtype: bool

.. versionadded:: 5.0
"""
return self._out_of_scope or self._consumed
2 changes: 1 addition & 1 deletion neo4j/_async/work/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async def _error_handler(self, exc):

async def _consume_results(self):
for result in self._results:
await result.consume()
await result._tx_end()
self._results = []

async def run(self, query, parameters=None, **kwparameters):
Expand Down
Loading