-
Notifications
You must be signed in to change notification settings - Fork 24
Errors propagate through transaction #247
Errors propagate through transaction #247
Conversation
|
||
def _raise_transaction_closed(self): | ||
errors = self._bidirectional_stream.drain_errors() | ||
errors = self._bidirectional_stream.get_errors() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new name, aligned with client-nodejs
def done(self, request_id: UUID): | ||
self._response_collector.remove(request_id) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we now clean up completed single
/stream
from the response collectors, so we don't propagate and print errors for every old operation the transaction also handled but isn't active anymore!
error = TypeDBClientException.of_rpc(e) | ||
self.close(error) | ||
raise error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed this so that we propagate our own exception instead of the gRPC error throughout the transaction queues
return self._response_collector.get_errors() | ||
|
||
def close(self, error: RpcError = None): | ||
def close(self, error: TypeDBClientException = None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new type: use our own exception everywhere
def get(self) -> T: | ||
return self._stream.fetch(self._request_id) | ||
value = self._stream.fetch(self._request_id) | ||
self._stream.done(self._request_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a Single
can immediately remove itself from the transaction stream when the user retrieves the value via get()
collector.close(error) | ||
|
||
def drain_errors(self) -> [RpcError]: | ||
def get_errors(self) -> [TypeDBClientException]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new error typing
error = collector.get_error() | ||
if error is not None: | ||
errors.append(error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get error from each collector queue
self._error: TypeDBClientException = None | ||
|
||
def get(self, block: bool) -> R: | ||
response = self._response_queue.get(block=block) | ||
if response.message: | ||
if response.is_response(): | ||
return response.message | ||
elif response.error: | ||
raise TypeDBClientException.of_rpc(response.error) | ||
else: | ||
elif response.is_done() and self._error is None: | ||
raise TypeDBClientException.of(TRANSACTION_CLOSED) | ||
elif response.is_done() and self._error is not None: | ||
raise TypeDBClientException.of(TRANSACTION_CLOSED_WITH_ERRORS, self._error) | ||
else: | ||
raise TypeDBClientException.of(ILLEGAL_STATE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as in client-nodejs: we now return the recorded error, and don't put the error in the Done
queue slot
typedb/stream/response_collector.py
Outdated
def is_response(self): | ||
return True | ||
|
||
def is_done(self): | ||
return False | ||
|
||
|
||
class Done: | ||
|
||
def __init__(self, error: Optional[RpcError]): | ||
self.error = error | ||
def __init__(self): | ||
pass | ||
|
||
def is_response(self): | ||
return False | ||
|
||
def is_done(self): | ||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the right way to do this in python??
|
||
def __next__(self) -> transaction_proto.Transaction.ResPart: | ||
if not self._has_next(): | ||
self._bidirectional_stream.done(self._request_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we also let the transaction stream know this query stream is done so it can be removed
typedb/stream/response_collector.py
Outdated
def close(self, error: Optional[RpcError]): | ||
def remove(self, request_id: UUID): | ||
with self._collectors_lock: | ||
del self._collectors[request_id] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something feels awkward about this - not the fault of the PR but rather the existing code.
We have a class named ResponseCollector
, which contains an object named _collectors
. This is simply illogical since the type of those collectors is not ResponseCollector
.
Do you have any idea how we might improve the terminology?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a fair point, I think the idea is to see the ResponseCollector
as a single thing - which means the naming inside is off - renamed to _response_queues
Review Summary: |
## What is the goal of this PR? We no longer delete response collectors in a transaction after receiving a response to a "single" request, or receiving a "DONE" message in a stream. This fixes a possible error when loading 50+ answers in one query and then performing a second query. ## What are the changes implemented in this PR? We had previously added code to clean up used response collectors in #247. But this broke in the scenario where we open a transaction, run a query that loads 51 answers (the prefetch size + 1), and then run a second query. The server would respond to the first query with: 50 answers -> CONTINUE -> 1 answer [compensating for latency] -> DONE. The client would respond to CONTINUE with STREAM to keep iterating, and the server would respond to STREAM with a 2nd DONE message. The iterator for query 1 finishes as soon as we see the first DONE message, so we stop reading responses at that point, meaning the second DONE may never be read by the client. But opening the iterator for query 2 causes us to continue reading messages from the transaction stream - note that we have no control over which request is being "currently served"; all responses use the same pipeline, the same gRPC stream. That's why we have the Response Collectors - when we get a response for a request that is different to the request we actually asked for, we need to store it in its respective Collector bucket. We could mitigate the issue by patching the server, but its current behaviour is actually pretty intuitive - if you send it a STREAM request and it has no more answers, it responds with DONE. We could change it to not respond at all, but that would be adding complexity where it is not really necessary to do so. So instead, we're reverting back to the old client behaviour, where the response collectors follow the lifetime of the Transaction, noting that Transactions are typically short-lived so cleanup will be performed in a timely manner anyway.
## What is the goal of this PR? Revert previous changes from #247 and #248, which made query queues and iterators throw the same error idempotently. However, this goes counter to standard usage of iterators and queues, which are not meant to behave idempotently (each item is only returned once, and if they have an error they should no longer be used). ## What are the changes implemented in this PR? * remove idempotent error state of collectors and queues, which back query iterators * note that we still store the error on the transaction bidirectional stream, in case the server throws an exception when there are no query iterators active Note: mirrors change from typedb/typedb-driver#372
What is the goal of this PR?
We align with Client NodeJS version 2.6.1 (mostly the work in typedb/typedb-driver-nodejs#197), which implements a better error propagation mechanism: when an exception occurs, we store it against all the transaction's active transmit queues to retrieve whenever the user tries to perform an operation in the transaction anywhere.
What are the changes implemented in this PR?