2020
2121from ..._async_compat .util import AsyncUtil
2222from ...data import DataDehydrator
23- from ...exceptions import ResultNotSingleError
23+ from ...exceptions import (
24+ ResultConsumedError ,
25+ ResultNotSingleError ,
26+ )
2427from ...work import ResultSummary
2528from ..io import ConnectionErrorHandler
2629
2730
31+ _RESULT_OUT_OF_SCOPE_ERROR = (
32+ "The result is out of scope. The associated transaction "
33+ "has been closed. Results can only be used while the "
34+ "transaction is open."
35+ )
36+ _RESULT_CONSUMED_ERROR = (
37+ "The result has been consumed. Fetch all needed records before calling "
38+ "Result.consume()."
39+ )
40+
41+
2842class AsyncResult :
2943 """A handler for the result of Cypher query execution. Instances
3044 of this class are typically constructed and returned by
@@ -52,7 +66,11 @@ def __init__(self, connection, hydrant, fetch_size, on_closed,
5266 # there ar more records available to pull from the server
5367 self ._has_more = False
5468 # the result has been fully iterated or consumed
55- self ._closed = False
69+ self ._exhausted = False
70+ # the result has been consumed
71+ self ._consumed = False
72+ # the result has been closed as a result of closing the transaction
73+ self ._out_of_scope = False
5674
5775 @property
5876 def _qid (self ):
@@ -194,7 +212,11 @@ async def __aiter__(self):
194212 self ._pull ()
195213 await self ._connection .send_all ()
196214
197- self ._closed = True
215+ self ._exhausted = True
216+ if self ._out_of_scope :
217+ raise ResultConsumedError (self , _RESULT_OUT_OF_SCOPE_ERROR )
218+ if self ._consumed :
219+ raise ResultConsumedError (self , _RESULT_CONSUMED_ERROR )
198220
199221 async def __anext__ (self ):
200222 return await self .__aiter__ ().__anext__ ()
@@ -203,7 +225,7 @@ async def _attach(self):
203225 """Sets the Result object in an attached state by fetching messages from
204226 the connection to the buffer.
205227 """
206- if self ._closed is False :
228+ if self ._exhausted is False :
207229 while self ._attached is False :
208230 await self ._connection .fetch_message ()
209231
@@ -215,14 +237,18 @@ async def _buffer(self, n=None):
215237 Might ent up with fewer records in the buffer if there are not enough
216238 records available.
217239 """
240+ if self ._out_of_scope :
241+ raise ResultConsumedError (self , _RESULT_OUT_OF_SCOPE_ERROR )
242+ if self ._consumed :
243+ raise ResultConsumedError (self , _RESULT_CONSUMED_ERROR )
218244 if n is not None and len (self ._record_buffer ) >= n :
219245 return
220246 record_buffer = deque ()
221247 async for record in self :
222248 record_buffer .append (record )
223249 if n is not None and len (record_buffer ) >= n :
224250 break
225- self ._closed = False
251+ self ._exhausted = False
226252 if n is None :
227253 self ._record_buffer = record_buffer
228254 else :
@@ -260,6 +286,14 @@ def keys(self):
260286 """
261287 return self ._keys
262288
289+ async def _tx_end (self ):
290+ # Handle closure of the associated transaction.
291+ #
292+ # This will consume the result and mark it at out of scope.
293+ # Subsequent calls to `next` will raise a ResultConsumedError.
294+ await self .consume ()
295+ self ._out_of_scope = True
296+
263297 async def consume (self ):
264298 """Consume the remainder of this result and return a :class:`neo4j.ResultSummary`.
265299
@@ -296,12 +330,14 @@ async def get_two_tx(tx):
296330
297331 :returns: The :class:`neo4j.ResultSummary` for this result
298332 """
299- if self ._closed is False :
333+ if self ._exhausted is False :
300334 self ._discarding = True
301335 async for _ in self :
302336 pass
303337
304- return self ._obtain_summary ()
338+ summary = self ._obtain_summary ()
339+ self ._consumed = True
340+ return summary
305341
306342 async def single (self ):
307343 """Obtain the next and only remaining record from this result if available else return None.
@@ -311,16 +347,21 @@ async def single(self):
311347 the first of these is still returned.
312348
313349 :returns: the next :class:`neo4j.AsyncRecord`.
314- :raises: ResultNotSingleError if not exactly one record is available.
350+
351+ :raises ResultNotSingleError: if not exactly one record is available.
352+ :raises ResultConsumedError: if the transaction from which this result was
353+ obtained has been closed.
315354 """
316355 await self ._buffer (2 )
317356 if not self ._record_buffer :
318357 raise ResultNotSingleError (
358+ self ,
319359 "No records found. "
320360 "Make sure your query returns exactly one record."
321361 )
322362 elif len (self ._record_buffer ) > 1 :
323363 raise ResultNotSingleError (
364+ self ,
324365 "More than one record found. "
325366 "Make sure your query returns exactly one record."
326367 )
@@ -331,6 +372,10 @@ async def peek(self):
331372 This leaves the record in the buffer for further processing.
332373
333374 :returns: the next :class:`.Record` or :const:`None` if none remain
375+
376+ :raises ResultConsumedError: if the transaction from which this result
377+ was obtained has been closed or the Result has been explicitly
378+ consumed.
334379 """
335380 await self ._buffer (1 )
336381 if self ._record_buffer :
@@ -343,6 +388,10 @@ async def graph(self):
343388
344389 :returns: a result graph
345390 :rtype: :class:`neo4j.graph.Graph`
391+
392+ :raises ResultConsumedError: if the transaction from which this result
393+ was obtained has been closed or the Result has been explicitly
394+ consumed.
346395 """
347396 await self ._buffer_all ()
348397 return self ._hydrant .graph
@@ -354,8 +403,13 @@ async def value(self, key=0, default=None):
354403
355404 :param key: field to return for each remaining record. Obtain a single value from the record by index or key.
356405 :param default: default value, used if the index of key is unavailable
406+
357407 :returns: list of individual values
358408 :rtype: list
409+
410+ :raises ResultConsumedError: if the transaction from which this result
411+ was obtained has been closed or the Result has been explicitly
412+ consumed.
359413 """
360414 return [record .value (key , default ) async for record in self ]
361415
@@ -365,8 +419,13 @@ async def values(self, *keys):
365419 See :class:`neo4j.AsyncRecord.values`
366420
367421 :param keys: fields to return for each remaining record. Optionally filtering to include only certain values by index or key.
422+
368423 :returns: list of values lists
369424 :rtype: list
425+
426+ :raises ResultConsumedError: if the transaction from which this result
427+ was obtained has been closed or the Result has been explicitly
428+ consumed.
370429 """
371430 return [record .values (* keys ) async for record in self ]
372431
@@ -376,7 +435,28 @@ async def data(self, *keys):
376435 See :class:`neo4j.AsyncRecord.data`
377436
378437 :param keys: fields to return for each remaining record. Optionally filtering to include only certain values by index or key.
438+
379439 :returns: list of dictionaries
380440 :rtype: list
441+
442+ :raises ResultConsumedError: if the transaction from which this result was
443+ obtained has been closed.
381444 """
382445 return [record .data (* keys ) async for record in self ]
446+
447+ def closed (self ):
448+ """Return True if the result has been closed.
449+
450+ When a result gets consumed :meth:`consume` or the transaction that
451+ owns the result gets closed (committed, rolled back, closed), the
452+ result cannot be used to acquire further records.
453+
454+ In such case, all methods that need to access the Result's records,
455+ will raise a :exc:`ResultConsumedError` when called.
456+
457+ :returns: whether the result is closed.
458+ :rtype: bool
459+
460+ .. versionadded:: 5.0
461+ """
462+ return self ._out_of_scope or self ._consumed
0 commit comments