diff --git a/CHANGELOG.md b/CHANGELOG.md index c16bccc02..9b9f08594 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,9 @@ - Python 3.10 support added - Python 3.6 support has been dropped. - `Result`, `Session`, and `Transaction` can no longer be imported from - `neo4j.work`. They should've been imported from `neo4j` all along. + `neo4j.work`. They should've been imported from `neo4j` all along. + Remark: It's recommended to import everything needed directly from `noe4j`, + not its submodules or subpackages. - Experimental pipelines feature has been removed. - Experimental async driver has been added. - `ResultSummary.server.version_info` has been removed. @@ -65,6 +67,17 @@ destructor will ever be called. A `ResourceWarning` is emitted instead. Make sure to configure Python to output those warnings when developing your application locally (it does not by default). +- Result scope: + - Records of Results cannot be accessed (`peek`, `single`, `iter`, ...) + after their owning transaction has been closed, committed, or rolled back. + Previously, this would yield undefined behavior. + It now raises a `ResultConsumedError`. + - Records of Results cannot be accessed (`peek`, `single`, `iter`, ...) + after the Result has been consumed (`Result.consume()`). + Previously, this would always yield no records. + It now raises a `ResultConsumedError`. + - New method `Result.closed()` can be used to check for this condition if + necessary. ## Version 4.4 diff --git a/docs/source/api.rst b/docs/source/api.rst index e0a7c58d7..ff7e21733 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -809,6 +809,8 @@ A :class:`neo4j.Result` is attached to an active connection, through a :class:`n .. automethod:: data + .. automethod:: closed + See https://neo4j.com/docs/driver-manual/current/cypher-workflow/#driver-type-mapping for more about type mapping. diff --git a/docs/source/async_api.rst b/docs/source/async_api.rst index 9e2ae8645..e571473c0 100644 --- a/docs/source/async_api.rst +++ b/docs/source/async_api.rst @@ -501,4 +501,6 @@ A :class:`neo4j.AsyncResult` is attached to an active connection, through a :cla .. automethod:: data + .. automethod:: closed + See https://neo4j.com/docs/driver-manual/current/cypher-workflow/#driver-type-mapping for more about type mapping. diff --git a/neo4j/_async/work/result.py b/neo4j/_async/work/result.py index 783dba497..733a86321 100644 --- a/neo4j/_async/work/result.py +++ b/neo4j/_async/work/result.py @@ -20,11 +20,25 @@ from ..._async_compat.util import AsyncUtil from ...data import DataDehydrator -from ...exceptions import ResultNotSingleError +from ...exceptions import ( + ResultConsumedError, + ResultNotSingleError, +) from ...work import ResultSummary from ..io import ConnectionErrorHandler +_RESULT_OUT_OF_SCOPE_ERROR = ( + "The result is out of scope. The associated transaction " + "has been closed. Results can only be used while the " + "transaction is open." +) +_RESULT_CONSUMED_ERROR = ( + "The result has been consumed. Fetch all needed records before calling " + "Result.consume()." +) + + class AsyncResult: """A handler for the result of Cypher query execution. Instances of this class are typically constructed and returned by @@ -52,7 +66,11 @@ def __init__(self, connection, hydrant, fetch_size, on_closed, # there ar more records available to pull from the server self._has_more = False # the result has been fully iterated or consumed - self._closed = False + self._exhausted = False + # the result has been consumed + self._consumed = False + # the result has been closed as a result of closing the transaction + self._out_of_scope = False @property def _qid(self): @@ -194,7 +212,11 @@ async def __aiter__(self): self._pull() await self._connection.send_all() - self._closed = True + self._exhausted = True + if self._out_of_scope: + raise ResultConsumedError(self, _RESULT_OUT_OF_SCOPE_ERROR) + if self._consumed: + raise ResultConsumedError(self, _RESULT_CONSUMED_ERROR) async def __anext__(self): return await self.__aiter__().__anext__() @@ -203,7 +225,7 @@ async def _attach(self): """Sets the Result object in an attached state by fetching messages from the connection to the buffer. """ - if self._closed is False: + if self._exhausted is False: while self._attached is False: await self._connection.fetch_message() @@ -215,6 +237,10 @@ async def _buffer(self, n=None): Might ent up with fewer records in the buffer if there are not enough records available. """ + if self._out_of_scope: + raise ResultConsumedError(self, _RESULT_OUT_OF_SCOPE_ERROR) + if self._consumed: + raise ResultConsumedError(self, _RESULT_CONSUMED_ERROR) if n is not None and len(self._record_buffer) >= n: return record_buffer = deque() @@ -222,7 +248,7 @@ async def _buffer(self, n=None): record_buffer.append(record) if n is not None and len(record_buffer) >= n: break - self._closed = False + self._exhausted = False if n is None: self._record_buffer = record_buffer else: @@ -260,6 +286,14 @@ def keys(self): """ return self._keys + async def _tx_end(self): + # Handle closure of the associated transaction. + # + # This will consume the result and mark it at out of scope. + # Subsequent calls to `next` will raise a ResultConsumedError. + await self.consume() + self._out_of_scope = True + async def consume(self): """Consume the remainder of this result and return a :class:`neo4j.ResultSummary`. @@ -296,12 +330,14 @@ async def get_two_tx(tx): :returns: The :class:`neo4j.ResultSummary` for this result """ - if self._closed is False: + if self._exhausted is False: self._discarding = True async for _ in self: pass - return self._obtain_summary() + summary = self._obtain_summary() + self._consumed = True + return summary async def single(self): """Obtain the next and only remaining record from this result if available else return None. @@ -311,16 +347,21 @@ async def single(self): the first of these is still returned. :returns: the next :class:`neo4j.AsyncRecord`. - :raises: ResultNotSingleError if not exactly one record is available. + + :raises ResultNotSingleError: if not exactly one record is available. + :raises ResultConsumedError: if the transaction from which this result was + obtained has been closed. """ await self._buffer(2) if not self._record_buffer: raise ResultNotSingleError( + self, "No records found. " "Make sure your query returns exactly one record." ) elif len(self._record_buffer) > 1: raise ResultNotSingleError( + self, "More than one record found. " "Make sure your query returns exactly one record." ) @@ -331,6 +372,10 @@ async def peek(self): This leaves the record in the buffer for further processing. :returns: the next :class:`.Record` or :const:`None` if none remain + + :raises ResultConsumedError: if the transaction from which this result + was obtained has been closed or the Result has been explicitly + consumed. """ await self._buffer(1) if self._record_buffer: @@ -343,6 +388,10 @@ async def graph(self): :returns: a result graph :rtype: :class:`neo4j.graph.Graph` + + :raises ResultConsumedError: if the transaction from which this result + was obtained has been closed or the Result has been explicitly + consumed. """ await self._buffer_all() return self._hydrant.graph @@ -354,8 +403,13 @@ async def value(self, key=0, default=None): :param key: field to return for each remaining record. Obtain a single value from the record by index or key. :param default: default value, used if the index of key is unavailable + :returns: list of individual values :rtype: list + + :raises ResultConsumedError: if the transaction from which this result + was obtained has been closed or the Result has been explicitly + consumed. """ return [record.value(key, default) async for record in self] @@ -365,8 +419,13 @@ async def values(self, *keys): See :class:`neo4j.AsyncRecord.values` :param keys: fields to return for each remaining record. Optionally filtering to include only certain values by index or key. + :returns: list of values lists :rtype: list + + :raises ResultConsumedError: if the transaction from which this result + was obtained has been closed or the Result has been explicitly + consumed. """ return [record.values(*keys) async for record in self] @@ -376,7 +435,28 @@ async def data(self, *keys): See :class:`neo4j.AsyncRecord.data` :param keys: fields to return for each remaining record. Optionally filtering to include only certain values by index or key. + :returns: list of dictionaries :rtype: list + + :raises ResultConsumedError: if the transaction from which this result was + obtained has been closed. """ return [record.data(*keys) async for record in self] + + def closed(self): + """Return True if the result has been closed. + + When a result gets consumed :meth:`consume` or the transaction that + owns the result gets closed (committed, rolled back, closed), the + result cannot be used to acquire further records. + + In such case, all methods that need to access the Result's records, + will raise a :exc:`ResultConsumedError` when called. + + :returns: whether the result is closed. + :rtype: bool + + .. versionadded:: 5.0 + """ + return self._out_of_scope or self._consumed diff --git a/neo4j/_async/work/transaction.py b/neo4j/_async/work/transaction.py index a2ae32f56..83408da41 100644 --- a/neo4j/_async/work/transaction.py +++ b/neo4j/_async/work/transaction.py @@ -78,7 +78,7 @@ async def _error_handler(self, exc): async def _consume_results(self): for result in self._results: - await result.consume() + await result._tx_end() self._results = [] async def run(self, query, parameters=None, **kwparameters): diff --git a/neo4j/_sync/work/result.py b/neo4j/_sync/work/result.py index 82836505e..9970c165b 100644 --- a/neo4j/_sync/work/result.py +++ b/neo4j/_sync/work/result.py @@ -20,11 +20,25 @@ from ..._async_compat.util import Util from ...data import DataDehydrator -from ...exceptions import ResultNotSingleError +from ...exceptions import ( + ResultConsumedError, + ResultNotSingleError, +) from ...work import ResultSummary from ..io import ConnectionErrorHandler +_RESULT_OUT_OF_SCOPE_ERROR = ( + "The result is out of scope. The associated transaction " + "has been closed. Results can only be used while the " + "transaction is open." +) +_RESULT_CONSUMED_ERROR = ( + "The result has been consumed. Fetch all needed records before calling " + "Result.consume()." +) + + class Result: """A handler for the result of Cypher query execution. Instances of this class are typically constructed and returned by @@ -52,7 +66,11 @@ def __init__(self, connection, hydrant, fetch_size, on_closed, # there ar more records available to pull from the server self._has_more = False # the result has been fully iterated or consumed - self._closed = False + self._exhausted = False + # the result has been consumed + self._consumed = False + # the result has been closed as a result of closing the transaction + self._out_of_scope = False @property def _qid(self): @@ -194,7 +212,11 @@ def __iter__(self): self._pull() self._connection.send_all() - self._closed = True + self._exhausted = True + if self._out_of_scope: + raise ResultConsumedError(self, _RESULT_OUT_OF_SCOPE_ERROR) + if self._consumed: + raise ResultConsumedError(self, _RESULT_CONSUMED_ERROR) def __next__(self): return self.__iter__().__next__() @@ -203,7 +225,7 @@ def _attach(self): """Sets the Result object in an attached state by fetching messages from the connection to the buffer. """ - if self._closed is False: + if self._exhausted is False: while self._attached is False: self._connection.fetch_message() @@ -215,6 +237,10 @@ def _buffer(self, n=None): Might ent up with fewer records in the buffer if there are not enough records available. """ + if self._out_of_scope: + raise ResultConsumedError(self, _RESULT_OUT_OF_SCOPE_ERROR) + if self._consumed: + raise ResultConsumedError(self, _RESULT_CONSUMED_ERROR) if n is not None and len(self._record_buffer) >= n: return record_buffer = deque() @@ -222,7 +248,7 @@ def _buffer(self, n=None): record_buffer.append(record) if n is not None and len(record_buffer) >= n: break - self._closed = False + self._exhausted = False if n is None: self._record_buffer = record_buffer else: @@ -260,6 +286,14 @@ def keys(self): """ return self._keys + def _tx_end(self): + # Handle closure of the associated transaction. + # + # This will consume the result and mark it at out of scope. + # Subsequent calls to `next` will raise a ResultConsumedError. + self.consume() + self._out_of_scope = True + def consume(self): """Consume the remainder of this result and return a :class:`neo4j.ResultSummary`. @@ -296,12 +330,14 @@ def get_two_tx(tx): :returns: The :class:`neo4j.ResultSummary` for this result """ - if self._closed is False: + if self._exhausted is False: self._discarding = True for _ in self: pass - return self._obtain_summary() + summary = self._obtain_summary() + self._consumed = True + return summary def single(self): """Obtain the next and only remaining record from this result if available else return None. @@ -311,16 +347,21 @@ def single(self): the first of these is still returned. :returns: the next :class:`neo4j.Record`. - :raises: ResultNotSingleError if not exactly one record is available. + + :raises ResultNotSingleError: if not exactly one record is available. + :raises ResultConsumedError: if the transaction from which this result was + obtained has been closed. """ self._buffer(2) if not self._record_buffer: raise ResultNotSingleError( + self, "No records found. " "Make sure your query returns exactly one record." ) elif len(self._record_buffer) > 1: raise ResultNotSingleError( + self, "More than one record found. " "Make sure your query returns exactly one record." ) @@ -331,6 +372,10 @@ def peek(self): This leaves the record in the buffer for further processing. :returns: the next :class:`.Record` or :const:`None` if none remain + + :raises ResultConsumedError: if the transaction from which this result + was obtained has been closed or the Result has been explicitly + consumed. """ self._buffer(1) if self._record_buffer: @@ -343,6 +388,10 @@ def graph(self): :returns: a result graph :rtype: :class:`neo4j.graph.Graph` + + :raises ResultConsumedError: if the transaction from which this result + was obtained has been closed or the Result has been explicitly + consumed. """ self._buffer_all() return self._hydrant.graph @@ -354,8 +403,13 @@ def value(self, key=0, default=None): :param key: field to return for each remaining record. Obtain a single value from the record by index or key. :param default: default value, used if the index of key is unavailable + :returns: list of individual values :rtype: list + + :raises ResultConsumedError: if the transaction from which this result + was obtained has been closed or the Result has been explicitly + consumed. """ return [record.value(key, default) for record in self] @@ -365,8 +419,13 @@ def values(self, *keys): See :class:`neo4j.Record.values` :param keys: fields to return for each remaining record. Optionally filtering to include only certain values by index or key. + :returns: list of values lists :rtype: list + + :raises ResultConsumedError: if the transaction from which this result + was obtained has been closed or the Result has been explicitly + consumed. """ return [record.values(*keys) for record in self] @@ -376,7 +435,28 @@ def data(self, *keys): See :class:`neo4j.Record.data` :param keys: fields to return for each remaining record. Optionally filtering to include only certain values by index or key. + :returns: list of dictionaries :rtype: list + + :raises ResultConsumedError: if the transaction from which this result was + obtained has been closed. """ return [record.data(*keys) for record in self] + + def closed(self): + """Return True if the result has been closed. + + When a result gets consumed :meth:`consume` or the transaction that + owns the result gets closed (committed, rolled back, closed), the + result cannot be used to acquire further records. + + In such case, all methods that need to access the Result's records, + will raise a :exc:`ResultConsumedError` when called. + + :returns: whether the result is closed. + :rtype: bool + + .. versionadded:: 5.0 + """ + return self._out_of_scope or self._consumed diff --git a/neo4j/_sync/work/transaction.py b/neo4j/_sync/work/transaction.py index 73d082388..916621893 100644 --- a/neo4j/_sync/work/transaction.py +++ b/neo4j/_sync/work/transaction.py @@ -78,7 +78,7 @@ def _error_handler(self, exc): def _consume_results(self): for result in self._results: - result.consume() + result._tx_end() self._results = [] def run(self, query, parameters=None, **kwparameters): diff --git a/neo4j/exceptions.py b/neo4j/exceptions.py index ee481c60a..7da2b2a7e 100644 --- a/neo4j/exceptions.py +++ b/neo4j/exceptions.py @@ -38,6 +38,9 @@ + DriverError + TransactionError + TransactionNestingError + + ResultError + + ResultConsumedError + + ResultNotSingleError + SessionExpired + ServiceUnavailable + RoutingServiceUnavailable @@ -47,7 +50,6 @@ + ConfigurationError + AuthConfigurationError + CertificateConfigurationError - + ResultConsumedError Connector API Errors ==================== @@ -293,6 +295,22 @@ def __init__(self, transaction, *args, **kwargs): self.transaction = transaction +class ResultError(DriverError): + """Raised when an error occurs while using a result object.""" + + def __init__(self, result, *args, **kwargs): + super(ResultError, self).__init__(*args, **kwargs) + self.result = result + + +class ResultConsumedError(ResultError): + """Raised when trying to access records of a consumed result.""" + + +class ResultNotSingleError(ResultError): + """Raised when result.single() detects not exactly one record in result.""" + + class ServiceUnavailable(DriverError): """ Raised when no database service is available. """ @@ -323,15 +341,6 @@ class IncompleteCommit(ServiceUnavailable): """ -class ResultConsumedError(DriverError): - """ Raised when trying to access records after the records have been consumed. - """ - - -class ResultNotSingleError(DriverError): - """Raised when result.single() detects not exactly one record in result.""" - - class ConfigurationError(DriverError): """ Raised when there is an error concerning a configuration. """ diff --git a/tests/unit/async_/work/test_result.py b/tests/unit/async_/work/test_result.py index 3e7ecc294..c30631a21 100644 --- a/tests/unit/async_/work/test_result.py +++ b/tests/unit/async_/work/test_result.py @@ -196,7 +196,7 @@ async def fetch_and_compare_all_records( if limit is not None and len(received_records) == limit: break if limit is None: - assert result._closed + assert result._exhausted elif method == "next": n = len(expected_records) if limit is None else limit for _ in range(n): @@ -205,7 +205,7 @@ async def fetch_and_compare_all_records( if limit is None: with pytest.raises(StopAsyncIteration): await AsyncUtil.next(result) - assert result._closed + assert result._exhausted elif method == "one iter": iter_ = AsyncUtil.iter(result) n = len(expected_records) if limit is None else limit @@ -215,7 +215,7 @@ async def fetch_and_compare_all_records( if limit is None: with pytest.raises(StopAsyncIteration): await AsyncUtil.next(iter_) - assert result._closed + assert result._exhausted elif method == "new iter": n = len(expected_records) if limit is None else limit for _ in range(n): @@ -226,7 +226,7 @@ async def fetch_and_compare_all_records( iter_ = AsyncUtil.iter(result) with pytest.raises(StopAsyncIteration): await AsyncUtil.next(iter_) - assert result._closed + assert result._exhausted else: raise ValueError() assert received_records == expected_records diff --git a/tests/unit/sync/work/test_result.py b/tests/unit/sync/work/test_result.py index 8401db1db..4d3157e9d 100644 --- a/tests/unit/sync/work/test_result.py +++ b/tests/unit/sync/work/test_result.py @@ -196,7 +196,7 @@ def fetch_and_compare_all_records( if limit is not None and len(received_records) == limit: break if limit is None: - assert result._closed + assert result._exhausted elif method == "next": n = len(expected_records) if limit is None else limit for _ in range(n): @@ -205,7 +205,7 @@ def fetch_and_compare_all_records( if limit is None: with pytest.raises(StopIteration): Util.next(result) - assert result._closed + assert result._exhausted elif method == "one iter": iter_ = Util.iter(result) n = len(expected_records) if limit is None else limit @@ -215,7 +215,7 @@ def fetch_and_compare_all_records( if limit is None: with pytest.raises(StopIteration): Util.next(iter_) - assert result._closed + assert result._exhausted elif method == "new iter": n = len(expected_records) if limit is None else limit for _ in range(n): @@ -226,7 +226,7 @@ def fetch_and_compare_all_records( iter_ = Util.iter(result) with pytest.raises(StopIteration): Util.next(iter_) - assert result._closed + assert result._exhausted else: raise ValueError() assert received_records == expected_records