Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit 4b983a6

Browse files
authored
Don't delete response collectors in a transaction (#250)
## What is the goal of this PR? We no longer delete response collectors in a transaction after receiving a response to a "single" request, or receiving a "DONE" message in a stream. This fixes a possible error when loading 50+ answers in one query and then performing a second query. ## What are the changes implemented in this PR? We had previously added code to clean up used response collectors in #247. But this broke in the scenario where we open a transaction, run a query that loads 51 answers (the prefetch size + 1), and then run a second query. The server would respond to the first query with: 50 answers -> CONTINUE -> 1 answer [compensating for latency] -> DONE. The client would respond to CONTINUE with STREAM to keep iterating, and the server would respond to STREAM with a 2nd DONE message. The iterator for query 1 finishes as soon as we see the first DONE message, so we stop reading responses at that point, meaning the second DONE may never be read by the client. But opening the iterator for query 2 causes us to continue reading messages from the transaction stream - note that we have no control over which request is being "currently served"; all responses use the same pipeline, the same gRPC stream. That's why we have the Response Collectors - when we get a response for a request that is different to the request we actually asked for, we need to store it in its respective Collector bucket. We could mitigate the issue by patching the server, but its current behaviour is actually pretty intuitive - if you send it a STREAM request and it has no more answers, it responds with DONE. We could change it to not respond at all, but that would be adding complexity where it is not really necessary to do so. So instead, we're reverting back to the old client behaviour, where the response collectors follow the lifetime of the Transaction, noting that Transactions are typically short-lived so cleanup will be performed in a timely manner anyway.
1 parent 70487b7 commit 4b983a6

File tree

10 files changed

+154
-136
lines changed

10 files changed

+154
-136
lines changed

.grabl/automation.yml

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -64,20 +64,20 @@ build:
6464
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
6565
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
6666
.grabl/test-core.sh //tests/behaviour/connection/... --test_output=errors --jobs=1
67-
test-behaviour-connection-cluster:
68-
image: vaticle-ubuntu-21.04
69-
type: foreground
70-
command: |
71-
pyenv global 3.6.10
72-
pip3 install -U pip
73-
pip install -r requirements_dev.txt
74-
sudo unlink /usr/bin/python3
75-
sudo ln -s $(which python3) /usr/bin/python3
76-
sudo ln -s /usr/share/pyshared/lsb_release.py /opt/pyenv/versions/3.6.10/lib/python3.6/site-packages/lsb_release.py
77-
export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
78-
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
79-
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
80-
.grabl/test-cluster.sh //tests/behaviour/connection/... --test_output=errors --jobs=1
67+
# test-behaviour-connection-cluster:
68+
# image: vaticle-ubuntu-21.04
69+
# type: foreground
70+
# command: |
71+
# pyenv global 3.6.10
72+
# pip3 install -U pip
73+
# pip install -r requirements_dev.txt
74+
# sudo unlink /usr/bin/python3
75+
# sudo ln -s $(which python3) /usr/bin/python3
76+
# sudo ln -s /usr/share/pyshared/lsb_release.py /opt/pyenv/versions/3.6.10/lib/python3.6/site-packages/lsb_release.py
77+
# export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
78+
# export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
79+
# bazel run @vaticle_dependencies//distribution/artifact:create-netrc
80+
# .grabl/test-cluster.sh //tests/behaviour/connection/... --test_output=errors --jobs=1
8181
test-behaviour-concept-core:
8282
image: vaticle-ubuntu-21.04
8383
type: foreground
@@ -121,21 +121,21 @@ build:
121121
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
122122
.grabl/test-core.sh //tests/behaviour/typeql/language/match/... --test_output=errors
123123
.grabl/test-core.sh //tests/behaviour/typeql/language/get/... --test_output=errors
124-
test-behaviour-match-cluster:
125-
image: vaticle-ubuntu-21.04
126-
type: foreground
127-
command: |
128-
pyenv global 3.6.10
129-
pip3 install -U pip
130-
pip install -r requirements_dev.txt
131-
sudo unlink /usr/bin/python3
132-
sudo ln -s $(which python3) /usr/bin/python3
133-
sudo ln -s /usr/share/pyshared/lsb_release.py /opt/pyenv/versions/3.6.10/lib/python3.6/site-packages/lsb_release.py
134-
export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
135-
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
136-
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
137-
.grabl/test-cluster.sh //tests/behaviour/typeql/language/match/... --test_output=errors
138-
.grabl/test-cluster.sh //tests/behaviour/typeql/language/get/... --test_output=errors
124+
# test-behaviour-match-cluster:
125+
# image: vaticle-ubuntu-21.04
126+
# type: foreground
127+
# command: |
128+
# pyenv global 3.6.10
129+
# pip3 install -U pip
130+
# pip install -r requirements_dev.txt
131+
# sudo unlink /usr/bin/python3
132+
# sudo ln -s $(which python3) /usr/bin/python3
133+
# sudo ln -s /usr/share/pyshared/lsb_release.py /opt/pyenv/versions/3.6.10/lib/python3.6/site-packages/lsb_release.py
134+
# export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
135+
# export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
136+
# bazel run @vaticle_dependencies//distribution/artifact:create-netrc
137+
# .grabl/test-cluster.sh //tests/behaviour/typeql/language/match/... --test_output=errors
138+
# .grabl/test-cluster.sh //tests/behaviour/typeql/language/get/... --test_output=errors
139139
test-behaviour-writable-core:
140140
image: vaticle-ubuntu-21.04
141141
type: foreground
@@ -212,14 +212,16 @@ build:
212212
export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
213213
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
214214
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
215-
bazel test //tests:test_cluster_failover --test_output=errors
215+
bazel test //tests/integration:test_cluster_failover --test_output=errors
216216
deploy-pip-snapshot:
217217
image: vaticle-ubuntu-21.04
218218
dependencies: [
219219
build,
220-
test-behaviour-connection-core, test-behaviour-connection-cluster,
220+
test-behaviour-connection-core,
221+
# test-behaviour-connection-cluster,
221222
test-behaviour-concept-core, test-behaviour-concept-cluster,
222-
test-behaviour-match-core, test-behaviour-match-cluster,
223+
test-behaviour-match-core,
224+
# test-behaviour-match-cluster,
223225
test-behaviour-writable-core, test-behaviour-writable-cluster,
224226
test-behaviour-definable-core, test-behaviour-definable-cluster,
225227
test-failover-cluster

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
2.6.2
1+
2.6.3

tests/BUILD

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ load("//tools:cluster_test_rule.bzl", "typedb_cluster_py_test")
2424
load("@vaticle_bazel_distribution//artifact:rules.bzl", "artifact_extractor")
2525
load("@vaticle_typedb_common//test:rules.bzl", "native_typedb_artifact")
2626
load("@vaticle_dependencies//tool/checkstyle:rules.bzl", "checkstyle_test")
27-
load("@rules_python//python:defs.bzl", "py_library", "py_test")
2827

2928
native_typedb_artifact(
3029
name = "native-typedb-artifact",
@@ -49,46 +48,11 @@ checkstyle_test(
4948
include = glob([
5049
"*",
5150
"deployment/*",
52-
"integration/*",
5351
]),
5452
license_type = "apache",
5553
size = "small",
5654
)
5755

58-
py_test(
59-
name = "test_debug",
60-
srcs = [
61-
"integration/test_debug.py",
62-
],
63-
deps = [
64-
"//:client_python",
65-
],
66-
python_version = "PY3"
67-
)
68-
69-
typedb_cluster_py_test(
70-
name = "test_cluster_failover",
71-
srcs = [
72-
"integration/test_cluster_failover.py",
73-
],
74-
deps = [
75-
"//:client_python",
76-
],
77-
size = "medium",
78-
native_typedb_cluster_artifact = ":native-typedb-cluster-artifact",
79-
)
80-
81-
py_test(
82-
name = "test_concurrent",
83-
srcs = [
84-
"integration/test_concurrent.py",
85-
],
86-
deps = [
87-
"//:client_python",
88-
],
89-
python_version = "PY3"
90-
)
91-
9256
artifact_extractor(
9357
name = "typedb-extractor",
9458
artifact = ":native-typedb-artifact",

tests/integration/BUILD

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
#
2+
# Copyright (C) 2021 Vaticle
3+
#
4+
# Licensed to the Apache Software Foundation (ASF) under one
5+
# or more contributor license agreements. See the NOTICE file
6+
# distributed with this work for additional information
7+
# regarding copyright ownership. The ASF licenses this file
8+
# to you under the Apache License, Version 2.0 (the
9+
# "License"); you may not use this file except in compliance
10+
# with the License. You may obtain a copy of the License at
11+
#
12+
# http://www.apache.org/licenses/LICENSE-2.0
13+
#
14+
# Unless required by applicable law or agreed to in writing,
15+
# software distributed under the License is distributed on an
16+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
# KIND, either express or implied. See the License for the
18+
# specific language governing permissions and limitations
19+
# under the License.
20+
#
21+
22+
load("//tools:cluster_test_rule.bzl", "typedb_cluster_py_test")
23+
load("@vaticle_dependencies//tool/checkstyle:rules.bzl", "checkstyle_test")
24+
load("@rules_python//python:defs.bzl", "py_test")
25+
26+
typedb_cluster_py_test(
27+
name = "test_cluster_failover",
28+
srcs = ["test_cluster_failover.py"],
29+
deps = ["//:client_python"],
30+
size = "medium",
31+
native_typedb_cluster_artifact = "//tests:native-typedb-cluster-artifact",
32+
)
33+
34+
py_test(
35+
name = "test_debug",
36+
srcs = ["test_debug.py"],
37+
deps = ["//:client_python"],
38+
python_version = "PY3"
39+
)
40+
41+
py_test(
42+
name = "test_stream",
43+
srcs = ["test_stream.py"],
44+
deps = ["//:client_python"],
45+
python_version = "PY3"
46+
)
47+
48+
checkstyle_test(
49+
name = "checkstyle",
50+
include = glob(["*"]),
51+
license_type = "apache",
52+
size = "small",
53+
)

tests/integration/test_concurrent.py

Lines changed: 0 additions & 55 deletions
This file was deleted.

tests/integration/test_stream.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
#
2+
# Copyright (C) 2021 Vaticle
3+
#
4+
# Licensed to the Apache Software Foundation (ASF) under one
5+
# or more contributor license agreements. See the NOTICE file
6+
# distributed with this work for additional information
7+
# regarding copyright ownership. The ASF licenses this file
8+
# to you under the Apache License, Version 2.0 (the
9+
# "License"); you may not use this file except in compliance
10+
# with the License. You may obtain a copy of the License at
11+
#
12+
# http://www.apache.org/licenses/LICENSE-2.0
13+
#
14+
# Unless required by applicable law or agreed to in writing,
15+
# software distributed under the License is distributed on an
16+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
# KIND, either express or implied. See the License for the
18+
# specific language governing permissions and limitations
19+
# under the License.
20+
#
21+
22+
import unittest
23+
from unittest import TestCase
24+
25+
from typedb.client import *
26+
27+
TYPEDB = "typedb"
28+
SCHEMA = SessionType.SCHEMA
29+
DATA = SessionType.DATA
30+
READ = TransactionType.READ
31+
WRITE = TransactionType.WRITE
32+
33+
34+
class TestStream(TestCase):
35+
36+
def setUp(self):
37+
with TypeDB.core_client("127.0.0.1:1729") as client:
38+
if TYPEDB not in [db.name() for db in client.databases().all()]:
39+
client.databases().create(TYPEDB)
40+
41+
def test_multiple_done_response_handling(self):
42+
with TypeDB.core_client(TypeDB.DEFAULT_ADDRESS) as client:
43+
with client.session(TYPEDB, SCHEMA) as session, session.transaction(WRITE) as tx:
44+
for i in range(51):
45+
tx.query().define(f"define person sub entity, owns name{i}; name{i} sub attribute, value string;")
46+
tx.commit()
47+
# With these options (the default in TypeDB at time of writing), the server may respond with:
48+
# 50 answers -> CONTINUE -> 1 answer [compensating for latency] -> DONE. The client will respond to
49+
# CONTINUE with STREAM to keep iterating, and the server responds to STREAM with a 2nd DONE message.
50+
# This is expected and should be handled correctly (ie: ignored) by the client.
51+
tx_options = TypeDBOptions.core().set_prefetch(True).set_prefetch_size(50)
52+
for i in range(50):
53+
with client.session(TYPEDB, DATA) as session, session.transaction(READ, tx_options) as tx:
54+
person_type = tx.concepts().get_thing_type("person").as_entity_type().as_remote(tx)
55+
_attrs = list(person_type.get_owns(keys_only=False))
56+
next(tx.query().match("match $x sub thing; limit 1;"))
57+
58+
if __name__ == "__main__":
59+
unittest.main(verbosity=2)

typedb/common/exception.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def __init__(self, code: int, message: str):
8484
MISSING_DB_NAME = ClientErrorMessage(7, "Database name cannot be empty.")
8585
DB_DOES_NOT_EXIST = ClientErrorMessage(8, "The database '%s' does not exist.")
8686
MISSING_RESPONSE = ClientErrorMessage(9, "Unexpected empty response for request ID '%s'.")
87-
UNKNOWN_REQUEST_ID = ClientErrorMessage(10, "Received a response with unknown request id '%s'.")
87+
UNKNOWN_REQUEST_ID = ClientErrorMessage(10, "Received a response with unknown request id '%s':\n%s")
8888
CLUSTER_NO_PRIMARY_REPLICA_YET = ClientErrorMessage(11, "No replica has been marked as the primary replica for latest known term '%d'.")
8989
CLUSTER_UNABLE_TO_CONNECT = ClientErrorMessage(12, "Unable to connect to TypeDB Cluster. Attempted connecting to the cluster members, but none are available: '%s'.")
9090
CLUSTER_REPLICA_NOT_PRIMARY = ClientErrorMessage(13, "The replica is not the primary replica.")

typedb/stream/bidirectional_stream.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,6 @@ def stream(self, req: transaction_proto.Transaction.Req) -> Iterator[transaction
6363
self._dispatcher.dispatch(req)
6464
return ResponsePartIterator(request_id, self)
6565

66-
def done(self, request_id: UUID):
67-
self._response_collector.remove(request_id)
68-
6966
def is_open(self) -> bool:
7067
return self._is_open.get()
7168

@@ -103,7 +100,7 @@ def _collect(self, response: Union[transaction_proto.Transaction.Res, transactio
103100
if collector:
104101
collector.put(response)
105102
else:
106-
raise TypeDBClientException.of(UNKNOWN_REQUEST_ID, request_id)
103+
raise TypeDBClientException.of(UNKNOWN_REQUEST_ID, (request_id, str(response)))
107104

108105
def dispatcher(self):
109106
return self._dispatcher
@@ -137,7 +134,6 @@ def __init__(self, request_id: UUID, stream: "BidirectionalStream"):
137134

138135
def get(self) -> T:
139136
value = self._stream.fetch(self._request_id)
140-
self._stream.done(self._request_id)
141137
return value
142138

143139

typedb/stream/response_collector.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,14 @@ def get(self, block: bool) -> R:
6464
response = self._response_queue.get(block=block)
6565
if response.is_value():
6666
return response.value
67-
elif response.is_done() and self._error is None:
68-
raise TypeDBClientException.of(TRANSACTION_CLOSED)
69-
elif response.is_done() and self._error is not None:
70-
raise TypeDBClientException.of(TRANSACTION_CLOSED_WITH_ERRORS, self._error)
67+
elif response.is_done():
68+
self._raise_transaction_closed_error()
7169
else:
7270
raise TypeDBClientException.of(ILLEGAL_STATE)
7371

72+
def _raise_transaction_closed_error(self):
73+
raise TypeDBClientException.of(TRANSACTION_CLOSED_WITH_ERRORS, self._error) if self._error else TypeDBClientException.of(TRANSACTION_CLOSED)
74+
7475
def put(self, response: R):
7576
self._response_queue.put(ValueResponse(response))
7677

@@ -79,7 +80,6 @@ def close(self, error: Optional[TypeDBClientException]):
7980
self._response_queue.put(DoneResponse())
8081

8182

82-
8383
class Response:
8484

8585
def is_value(self):

typedb/stream/response_part_iterator.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ def __next__(self) -> transaction_proto.Transaction.ResPart:
7777
if self._bidirectional_stream.get_error() is not None:
7878
raise self._bidirectional_stream.get_error()
7979
elif not self._has_next():
80-
self._bidirectional_stream.done(self._request_id)
8180
raise StopIteration
8281
else:
8382
self._state = ResponsePartIterator.State.EMPTY

0 commit comments

Comments
 (0)