2626from ..io import ConnectionErrorHandler
2727
2828
29+ _RESULT_OUT_OF_SCOPE_ERROR = (
30+ "The result is out of scope. The associated transaction "
31+ "has been closed. Results can only be used while the "
32+ "transaction is open."
33+ )
34+ _RESULT_CONSUMED_ERROR = (
35+ "The result has been consumed. Fetch all needed records before calling "
36+ "Result.consume()."
37+ )
38+
39+
2940class AsyncResult :
3041 """A handler for the result of Cypher query execution. Instances
3142 of this class are typically constructed and returned by
@@ -54,6 +65,10 @@ def __init__(self, connection, hydrant, fetch_size, on_closed,
5465 self ._has_more = False
5566 # the result has been fully iterated or consumed
5667 self ._closed = False
68+ # the result has been consumed
69+ self ._consumed = False
70+ # the result has been closed as a result of closing the transaction
71+ self ._out_of_scope = False
5772
5873 @property
5974 def _qid (self ):
@@ -196,6 +211,10 @@ async def __aiter__(self):
196211 await self ._connection .send_all ()
197212
198213 self ._closed = True
214+ if self ._out_of_scope :
215+ raise RuntimeError (_RESULT_OUT_OF_SCOPE_ERROR )
216+ if self ._consumed :
217+ raise RuntimeError (_RESULT_CONSUMED_ERROR )
199218
200219 async def __anext__ (self ):
201220 return await self .__aiter__ ().__anext__ ()
@@ -216,6 +235,10 @@ async def _buffer(self, n=None):
216235 Might ent up with fewer records in the buffer if there are not enough
217236 records available.
218237 """
238+ if self ._out_of_scope :
239+ raise RuntimeError (_RESULT_OUT_OF_SCOPE_ERROR )
240+ if self ._consumed :
241+ raise RuntimeError (_RESULT_CONSUMED_ERROR )
219242 if n is not None and len (self ._record_buffer ) >= n :
220243 return
221244 record_buffer = deque ()
@@ -261,6 +284,14 @@ def keys(self):
261284 """
262285 return self ._keys
263286
287+ async def _tx_end (self ):
288+ # Handle closure of the associated transaction.
289+ #
290+ # This will consume the result and mark it at out of scope.
291+ # Subsequent calls to `next` will raise a RuntimeError.
292+ await self .consume ()
293+ self ._out_of_scope = True
294+
264295 async def consume (self ):
265296 """Consume the remainder of this result and return a :class:`neo4j.ResultSummary`.
266297
@@ -302,7 +333,9 @@ async def get_two_tx(tx):
302333 async for _ in self :
303334 pass
304335
305- return self ._obtain_summary ()
336+ summary = self ._obtain_summary ()
337+ self ._consumed = True
338+ return summary
306339
307340 async def single (self ):
308341 """Obtain the next and only remaining record from this result if available else return None.
@@ -312,7 +345,10 @@ async def single(self):
312345 the first of these is still returned.
313346
314347 :returns: the next :class:`neo4j.AsyncRecord`.
315- :raises: ResultNotSingleError if not exactly one record is available.
348+
349+ :raises ResultNotSingleError: if not exactly one record is available.
350+ :raises RuntimeError: if the transaction from which this result was
351+ obtained has been closed.
316352 """
317353 await self ._buffer (2 )
318354 if not self ._record_buffer :
@@ -332,6 +368,10 @@ async def peek(self):
332368 This leaves the record in the buffer for further processing.
333369
334370 :returns: the next :class:`.Record` or :const:`None` if none remain
371+
372+ :raises RuntimeError: if the transaction from which this result was
373+ obtained has been closed or the Result has been explicitly
374+ consumed.
335375 """
336376 await self ._buffer (1 )
337377 if self ._record_buffer :
@@ -344,6 +384,10 @@ async def graph(self):
344384
345385 :returns: a result graph
346386 :rtype: :class:`neo4j.graph.Graph`
387+
388+ :raises RuntimeError: if the transaction from which this result was
389+ obtained has been closed or the Result has been explicitly
390+ consumed.
347391 """
348392 await self ._buffer_all ()
349393 return self ._hydrant .graph
@@ -355,8 +399,13 @@ async def value(self, key=0, default=None):
355399
356400 :param key: field to return for each remaining record. Obtain a single value from the record by index or key.
357401 :param default: default value, used if the index of key is unavailable
402+
358403 :returns: list of individual values
359404 :rtype: list
405+
406+ :raises RuntimeError: if the transaction from which this result was
407+ obtained has been closed or the Result has been explicitly
408+ consumed.
360409 """
361410 return [record .value (key , default ) async for record in self ]
362411
@@ -366,8 +415,13 @@ async def values(self, *keys):
366415 See :class:`neo4j.AsyncRecord.values`
367416
368417 :param keys: fields to return for each remaining record. Optionally filtering to include only certain values by index or key.
418+
369419 :returns: list of values lists
370420 :rtype: list
421+
422+ :raises RuntimeError: if the transaction from which this result was
423+ obtained has been closed or the Result has been explicitly
424+ consumed.
371425 """
372426 return [record .values (* keys ) async for record in self ]
373427
@@ -377,7 +431,26 @@ async def data(self, *keys):
377431 See :class:`neo4j.AsyncRecord.data`
378432
379433 :param keys: fields to return for each remaining record. Optionally filtering to include only certain values by index or key.
434+
380435 :returns: list of dictionaries
381436 :rtype: list
437+
438+ :raises RuntimeError: if the transaction from which this result was
439+ obtained has been closed.
382440 """
383441 return [record .data (* keys ) async for record in self ]
442+
443+ def closed (self ):
444+ """Return True if the result is still valid (not closed).
445+
446+ When a result gets consumed :meth:`consume` or the transaction that
447+ owns the result gets closed (committed, rolled back, closed), the
448+ result cannot be used to acquire further records.
449+
450+ In such case, all methods that need to access the Result's records,
451+ will raise a :exc:`RuntimeError` when called.
452+
453+ :returns: whether the result is closed.
454+ :rtype: bool
455+ """
456+ return self ._out_of_scope or self ._consumed
0 commit comments