Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions typedb/stream/bidirectional_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# under the License.
#
from queue import Empty, Queue
from typing import TypeVar, Iterator, Union, Generic, List
from typing import TypeVar, Iterator, Union
from uuid import uuid4, UUID

import typedb_protocol.common.transaction_pb2 as transaction_proto
Expand Down Expand Up @@ -133,8 +133,7 @@ def __init__(self, request_id: UUID, stream: "BidirectionalStream"):
self._stream = stream

def get(self) -> T:
value = self._stream.fetch(self._request_id)
return value
Comment on lines -136 to -137
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline var

return self._stream.fetch(self._request_id)


class RequestIterator(Iterator[Union[transaction_proto.Transaction.Req, StopIteration]]):
Expand Down
21 changes: 9 additions & 12 deletions typedb/stream/response_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
from typing import Generic, TypeVar, Dict, Optional
from uuid import UUID

from typedb.common.exception import TypeDBClientException, TRANSACTION_CLOSED, ILLEGAL_STATE, \
TRANSACTION_CLOSED_WITH_ERRORS
from grpc import RpcError
from typedb.common.exception import TypeDBClientException, TRANSACTION_CLOSED, ILLEGAL_STATE

R = TypeVar('R')

Expand Down Expand Up @@ -54,26 +54,23 @@ class Queue(Generic[R]):

def __init__(self):
self._response_queue: queue.Queue[Response] = queue.Queue()
self._error: TypeDBClientException = None

def get(self, block: bool) -> R:
response = self._response_queue.get(block=block)
if response.is_value():
return response.value
elif response.is_done():
self._raise_transaction_closed_error()
elif response.is_done() and response.error is None:
raise TypeDBClientException.of(TRANSACTION_CLOSED)
elif response.is_done() and response.error is not None:
raise TypeDBClientException.of_rpc(response.error)
Comment on lines +62 to +65
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go back to the old behaviour of retrieving the error from the Done message stored in the queue, which will be thrown exactly once. After that, this queue will always throw transaction closed.

else:
raise TypeDBClientException.of(ILLEGAL_STATE)

def _raise_transaction_closed_error(self):
raise TypeDBClientException.of(TRANSACTION_CLOSED_WITH_ERRORS, self._error) if self._error else TypeDBClientException.of(TRANSACTION_CLOSED)

def put(self, response: R):
self._response_queue.put(ValueResponse(response))

def close(self, error: Optional[TypeDBClientException]):
self._error = error
self._response_queue.put(DoneResponse())
self._response_queue.put(DoneResponse(error))


class Response:
Expand All @@ -96,8 +93,8 @@ def is_value(self):

class DoneResponse(Response):

def __init__(self):
pass
def __init__(self, error: Optional[RpcError]):
self.error = error

def is_done(self):
return True
4 changes: 1 addition & 3 deletions typedb/stream/response_part_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ def _has_next(self) -> bool:
raise TypeDBClientException.of(ILLEGAL_STATE)

def __next__(self) -> transaction_proto.Transaction.ResPart:
if self._bidirectional_stream.get_error() is not None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the error will come from the backing Queue, rather than the bidirectional stream.

raise self._bidirectional_stream.get_error()
elif not self._has_next():
if not self._has_next():
raise StopIteration
else:
self._state = ResponsePartIterator.State.EMPTY
Expand Down