From 315077fbc9041096bd564f79c04a91459b6d92ce Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Tue, 3 Aug 2021 09:37:49 +0200 Subject: [PATCH 1/4] Don't send RESET on READY (clean) connections --- neo4j/io/__init__.py | 6 +-- neo4j/io/_bolt3.py | 78 ++++++++++++++++++++++++++++----- neo4j/io/_bolt4.py | 57 +++++++++++++++++++----- neo4j/io/_common.py | 3 +- testkitbackend/test_config.json | 2 +- 5 files changed, 117 insertions(+), 29 deletions(-) diff --git a/neo4j/io/__init__.py b/neo4j/io/__init__.py index ccde6da01..2ed57bc78 100644 --- a/neo4j/io/__init__.py +++ b/neo4j/io/__init__.py @@ -123,7 +123,7 @@ class Bolt(abc.ABC): PROTOCOL_VERSION = None # flag if connection needs RESET to go back to READY state - _is_reset = True + is_reset = False # The socket in_use = False @@ -460,10 +460,6 @@ def rollback(self, **handlers): """ Appends a ROLLBACK message to the output queue.""" pass - @property - def is_reset(self): - return self._is_reset - @abc.abstractmethod def reset(self): """ Appends a RESET message to the outgoing queue, sends it and consumes diff --git a/neo4j/io/_bolt3.py b/neo4j/io/_bolt3.py index 53d1109a1..24a65368c 100644 --- a/neo4j/io/_bolt3.py +++ b/neo4j/io/_bolt3.py @@ -18,6 +18,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from enum import Enum from logging import getLogger from ssl import SSLSocket @@ -52,6 +53,38 @@ log = getLogger("neo4j") +class ServerStates(Enum): + CONNECTED = "CONNECTED" + READY = "READY" + STREAMING = "STREAMING" + TX_READY_OR_TX_STREAMING = "TX_READY||TX_STREAMING" + FAILED = "FAILED" + + +STATE_TRANSITIONS = { + ServerStates.CONNECTED: { + "hello": ServerStates.READY, + }, + ServerStates.READY: { + "run": ServerStates.STREAMING, + "begin": ServerStates.TX_READY_OR_TX_STREAMING, + }, + ServerStates.STREAMING: { + "pull": ServerStates.READY, + "discard": ServerStates.READY, + "reset": ServerStates.READY, + }, + ServerStates.TX_READY_OR_TX_STREAMING: { + "commit": ServerStates.READY, + "rollback": ServerStates.READY, + "reset": ServerStates.READY, + }, + ServerStates.FAILED: { + "reset": ServerStates.READY, + } +} + + class Bolt3(Bolt): """ Protocol handler for Bolt 3. @@ -64,6 +97,16 @@ class Bolt3(Bolt): supports_multiple_databases = False + _server_state = ServerStates.CONNECTED + + @property + def is_reset(self): + if self.responses: + # we can't be sure of the server's state as there are still pending + # responses. + return False + return self._server_state == ServerStates.READY + @property def encrypted(self): return isinstance(self.socket, SSLSocket) @@ -92,7 +135,8 @@ def hello(self): logged_headers["credentials"] = "*******" log.debug("[#%04X] C: HELLO %r", self.local_port, logged_headers) self._append(b"\x01", (headers,), - response=InitResponse(self, on_success=self.server_info.update)) + response=InitResponse(self, "hello", + on_success=self.server_info.update)) self.send_all() self.fetch_all() check_supported_server_product(self.server_info.agent) @@ -155,20 +199,20 @@ def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None, fields = (query, parameters, extra) log.debug("[#%04X] C: RUN %s", self.local_port, " ".join(map(repr, fields))) if query.upper() == u"COMMIT": - self._append(b"\x10", fields, CommitResponse(self, **handlers)) + self._append(b"\x10", fields, CommitResponse(self, "run", + **handlers)) else: - self._append(b"\x10", fields, Response(self, **handlers)) - self._is_reset = False + self._append(b"\x10", fields, Response(self, "run", **handlers)) def discard(self, n=-1, qid=-1, **handlers): # Just ignore n and qid, it is not supported in the Bolt 3 Protocol. log.debug("[#%04X] C: DISCARD_ALL", self.local_port) - self._append(b"\x2F", (), Response(self, **handlers)) + self._append(b"\x2F", (), Response(self, "discard", **handlers)) def pull(self, n=-1, qid=-1, **handlers): # Just ignore n and qid, it is not supported in the Bolt 3 Protocol. log.debug("[#%04X] C: PULL_ALL", self.local_port) - self._append(b"\x3F", (), Response(self, **handlers)) + self._append(b"\x3F", (), Response(self, "pull", **handlers)) self._is_reset = False def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, db=None, **handlers): @@ -193,16 +237,16 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, db=None, except TypeError: raise TypeError("Timeout must be specified as a number of seconds") log.debug("[#%04X] C: BEGIN %r", self.local_port, extra) - self._append(b"\x11", (extra,), Response(self, **handlers)) + self._append(b"\x11", (extra,), Response(self, "begin", **handlers)) self._is_reset = False def commit(self, **handlers): log.debug("[#%04X] C: COMMIT", self.local_port) - self._append(b"\x12", (), CommitResponse(self, **handlers)) + self._append(b"\x12", (), CommitResponse(self, "commit", **handlers)) def rollback(self, **handlers): log.debug("[#%04X] C: ROLLBACK", self.local_port) - self._append(b"\x13", (), Response(self, **handlers)) + self._append(b"\x13", (), Response(self, "rollback", **handlers)) def reset(self): """ Add a RESET message to the outgoing queue, send @@ -213,11 +257,22 @@ def fail(metadata): raise BoltProtocolError("RESET failed %r" % metadata, address=self.unresolved_address) log.debug("[#%04X] C: RESET", self.local_port) - self._append(b"\x0F", response=Response(self, on_failure=fail)) + self._append(b"\x0F", response=Response(self, "reset", on_failure=fail)) self.send_all() self.fetch_all() self._is_reset = True + def _update_server_state_on_success(self, metadata, message): + if metadata.get("has_more"): + return + state_before = self._server_state + self._server_state = STATE_TRANSITIONS\ + .get(self._server_state, {})\ + .get(message, self._server_state) + if state_before != self._server_state: + log.debug("[#%04X] State: %s", self.local_port, + self._server_state.name) + def fetch_message(self): """ Receive at most one message from the server, if available. @@ -249,12 +304,15 @@ def fetch_message(self): response.complete = True if summary_signature == b"\x70": log.debug("[#%04X] S: SUCCESS %r", self.local_port, summary_metadata) + self._update_server_state_on_success(summary_metadata, + response.message) response.on_success(summary_metadata or {}) elif summary_signature == b"\x7E": log.debug("[#%04X] S: IGNORED", self.local_port) response.on_ignored(summary_metadata or {}) elif summary_signature == b"\x7F": log.debug("[#%04X] S: FAILURE %r", self.local_port, summary_metadata) + self._server_state = ServerStates.FAILED try: response.on_failure(summary_metadata or {}) except (ServiceUnavailable, DatabaseUnavailable): diff --git a/neo4j/io/_bolt4.py b/neo4j/io/_bolt4.py index 8f123e337..a9d476da2 100644 --- a/neo4j/io/_bolt4.py +++ b/neo4j/io/_bolt4.py @@ -18,6 +18,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from enum import Enum from logging import getLogger from ssl import SSLSocket @@ -37,7 +38,6 @@ Neo4jError, NotALeader, ServiceUnavailable, - SessionExpired, ) from neo4j.io import ( Bolt, @@ -48,6 +48,10 @@ InitResponse, Response, ) +from neo4j.io._bolt3 import ( + ServerStates, + STATE_TRANSITIONS, +) log = getLogger("neo4j") @@ -65,6 +69,16 @@ class Bolt4x0(Bolt): supports_multiple_databases = True + _server_state = ServerStates.CONNECTED + + @property + def is_reset(self): + if self.responses: + # we can't be sure of the server's state as there are still pending + # responses. + return False + return self._server_state == ServerStates.READY + @property def encrypted(self): return isinstance(self.socket, SSLSocket) @@ -93,7 +107,8 @@ def hello(self): logged_headers["credentials"] = "*******" log.debug("[#%04X] C: HELLO %r", self.local_port, logged_headers) self._append(b"\x01", (headers,), - response=InitResponse(self, on_success=self.server_info.update)) + response=InitResponse(self, "hello", + on_success=self.server_info.update)) self.send_all() self.fetch_all() check_supported_server_product(self.server_info.agent) @@ -162,9 +177,10 @@ def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None, fields = (query, parameters, extra) log.debug("[#%04X] C: RUN %s", self.local_port, " ".join(map(repr, fields))) if query.upper() == u"COMMIT": - self._append(b"\x10", fields, CommitResponse(self, **handlers)) + self._append(b"\x10", fields, CommitResponse(self, "run", + **handlers)) else: - self._append(b"\x10", fields, Response(self, **handlers)) + self._append(b"\x10", fields, Response(self, "run", **handlers)) self._is_reset = False def discard(self, n=-1, qid=-1, **handlers): @@ -172,14 +188,14 @@ def discard(self, n=-1, qid=-1, **handlers): if qid != -1: extra["qid"] = qid log.debug("[#%04X] C: DISCARD %r", self.local_port, extra) - self._append(b"\x2F", (extra,), Response(self, **handlers)) + self._append(b"\x2F", (extra,), Response(self, "discard", **handlers)) def pull(self, n=-1, qid=-1, **handlers): extra = {"n": n} if qid != -1: extra["qid"] = qid log.debug("[#%04X] C: PULL %r", self.local_port, extra) - self._append(b"\x3F", (extra,), Response(self, **handlers)) + self._append(b"\x3F", (extra,), Response(self, "pull", **handlers)) self._is_reset = False def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, @@ -205,16 +221,16 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, except TypeError: raise TypeError("Timeout must be specified as a number of seconds") log.debug("[#%04X] C: BEGIN %r", self.local_port, extra) - self._append(b"\x11", (extra,), Response(self, **handlers)) + self._append(b"\x11", (extra,), Response(self, "begin", **handlers)) self._is_reset = False def commit(self, **handlers): log.debug("[#%04X] C: COMMIT", self.local_port) - self._append(b"\x12", (), CommitResponse(self, **handlers)) + self._append(b"\x12", (), CommitResponse(self, "commit", **handlers)) def rollback(self, **handlers): log.debug("[#%04X] C: ROLLBACK", self.local_port) - self._append(b"\x13", (), Response(self, **handlers)) + self._append(b"\x13", (), Response(self, "rollback", **handlers)) def reset(self): """ Add a RESET message to the outgoing queue, send @@ -225,11 +241,22 @@ def fail(metadata): raise BoltProtocolError("RESET failed %r" % metadata, self.unresolved_address) log.debug("[#%04X] C: RESET", self.local_port) - self._append(b"\x0F", response=Response(self, on_failure=fail)) + self._append(b"\x0F", response=Response(self, "reset", on_failure=fail)) self.send_all() self.fetch_all() self._is_reset = True + def _update_server_state_on_success(self, metadata, message): + if metadata.get("has_more"): + return + state_before = self._server_state + self._server_state = STATE_TRANSITIONS\ + .get(self._server_state, {})\ + .get(message, self._server_state) + if state_before != self._server_state: + log.debug("[#%04X] [%s]", self.local_port, + self._server_state.name) + def fetch_message(self): """ Receive at most one message from the server, if available. @@ -261,12 +288,15 @@ def fetch_message(self): response.complete = True if summary_signature == b"\x70": log.debug("[#%04X] S: SUCCESS %r", self.local_port, summary_metadata) + self._update_server_state_on_success(summary_metadata, + response.message) response.on_success(summary_metadata or {}) elif summary_signature == b"\x7E": log.debug("[#%04X] S: IGNORED", self.local_port) response.on_ignored(summary_metadata or {}) elif summary_signature == b"\x7F": log.debug("[#%04X] S: FAILURE %r", self.local_port, summary_metadata) + self._server_state = ServerStates.FAILED try: response.on_failure(summary_metadata or {}) except (ServiceUnavailable, DatabaseUnavailable): @@ -372,7 +402,9 @@ def fail(md): else: bookmarks = list(bookmarks) self._append(b"\x66", (routing_context, bookmarks, database), - response=Response(self, on_success=metadata.update, on_failure=fail)) + response=Response(self, "route", + on_success=metadata.update, + on_failure=fail)) self.send_all() self.fetch_all() return [metadata.get("rt")] @@ -400,7 +432,8 @@ def on_success(metadata): logged_headers["credentials"] = "*******" log.debug("[#%04X] C: HELLO %r", self.local_port, logged_headers) self._append(b"\x01", (headers,), - response=InitResponse(self, on_success=on_success)) + response=InitResponse(self, "hello", + on_success=on_success)) self.send_all() self.fetch_all() check_supported_server_product(self.server_info.agent) diff --git a/neo4j/io/_common.py b/neo4j/io/_common.py index fc543499e..0dc8b2a3b 100644 --- a/neo4j/io/_common.py +++ b/neo4j/io/_common.py @@ -144,9 +144,10 @@ class Response: more detail messages followed by one summary message). """ - def __init__(self, connection, **handlers): + def __init__(self, connection, message, **handlers): self.connection = connection self.handlers = handlers + self.message = message self.complete = False def on_records(self, records): diff --git a/testkitbackend/test_config.json b/testkitbackend/test_config.json index c4637eb4e..8e4450e8b 100644 --- a/testkitbackend/test_config.json +++ b/testkitbackend/test_config.json @@ -34,7 +34,7 @@ "features": { "AuthorizationExpiredTreatment": true, "Optimization:ImplicitDefaultArguments": true, - "Optimization:MinimalResets": "Driver resets some clean connections when put back into pool", + "Optimization:MinimalResets": true, "Optimization:ConnectionReuse": true, "Optimization:PullPipelining": true, "ConfHint:connection.recv_timeout_seconds": true, From 74d346e5c541d964d6c95986d1c4dd715cc18f68 Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Tue, 3 Aug 2021 12:09:54 +0200 Subject: [PATCH 2/4] Code clean-up --- neo4j/io/_bolt3.py | 89 +++++++++++++++++++++++++--------------------- neo4j/io/_bolt4.py | 35 ++++++++---------- 2 files changed, 63 insertions(+), 61 deletions(-) diff --git a/neo4j/io/_bolt3.py b/neo4j/io/_bolt3.py index 24a65368c..0a9cf512d 100644 --- a/neo4j/io/_bolt3.py +++ b/neo4j/io/_bolt3.py @@ -61,28 +61,43 @@ class ServerStates(Enum): FAILED = "FAILED" -STATE_TRANSITIONS = { - ServerStates.CONNECTED: { - "hello": ServerStates.READY, - }, - ServerStates.READY: { - "run": ServerStates.STREAMING, - "begin": ServerStates.TX_READY_OR_TX_STREAMING, - }, - ServerStates.STREAMING: { - "pull": ServerStates.READY, - "discard": ServerStates.READY, - "reset": ServerStates.READY, - }, - ServerStates.TX_READY_OR_TX_STREAMING: { - "commit": ServerStates.READY, - "rollback": ServerStates.READY, - "reset": ServerStates.READY, - }, - ServerStates.FAILED: { - "reset": ServerStates.READY, +class ServerStateManager: + _STATE_TRANSITIONS = { + ServerStates.CONNECTED: { + "hello": ServerStates.READY, + }, + ServerStates.READY: { + "run": ServerStates.STREAMING, + "begin": ServerStates.TX_READY_OR_TX_STREAMING, + }, + ServerStates.STREAMING: { + "pull": ServerStates.READY, + "discard": ServerStates.READY, + "reset": ServerStates.READY, + }, + ServerStates.TX_READY_OR_TX_STREAMING: { + "commit": ServerStates.READY, + "rollback": ServerStates.READY, + "reset": ServerStates.READY, + }, + ServerStates.FAILED: { + "reset": ServerStates.READY, + } } -} + + def __init__(self, init_state, on_change=None): + self.state = init_state + self._on_change = on_change + + def transition(self, message, metadata): + if metadata.get("has_more"): + return + state_before = self.state + self.state = self._STATE_TRANSITIONS\ + .get(self.state, {})\ + .get(message, self.state) + if state_before != self.state and callable(self._on_change): + self._on_change(state_before, self.state) class Bolt3(Bolt): @@ -97,7 +112,15 @@ class Bolt3(Bolt): supports_multiple_databases = False - _server_state = ServerStates.CONNECTED + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._server_state_manager = ServerStateManager( + ServerStates.CONNECTED, on_change=self._on_server_state_change + ) + + def _on_server_state_change(self, old_state, new_state): + log.debug("[#%04X] State: %s > %s", self.local_port, + old_state.name, new_state.name) @property def is_reset(self): @@ -105,7 +128,7 @@ def is_reset(self): # we can't be sure of the server's state as there are still pending # responses. return False - return self._server_state == ServerStates.READY + return self._server_state_manager.state == ServerStates.READY @property def encrypted(self): @@ -213,7 +236,6 @@ def pull(self, n=-1, qid=-1, **handlers): # Just ignore n and qid, it is not supported in the Bolt 3 Protocol. log.debug("[#%04X] C: PULL_ALL", self.local_port) self._append(b"\x3F", (), Response(self, "pull", **handlers)) - self._is_reset = False def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, db=None, **handlers): if db is not None: @@ -238,7 +260,6 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, db=None, raise TypeError("Timeout must be specified as a number of seconds") log.debug("[#%04X] C: BEGIN %r", self.local_port, extra) self._append(b"\x11", (extra,), Response(self, "begin", **handlers)) - self._is_reset = False def commit(self, **handlers): log.debug("[#%04X] C: COMMIT", self.local_port) @@ -260,18 +281,6 @@ def fail(metadata): self._append(b"\x0F", response=Response(self, "reset", on_failure=fail)) self.send_all() self.fetch_all() - self._is_reset = True - - def _update_server_state_on_success(self, metadata, message): - if metadata.get("has_more"): - return - state_before = self._server_state - self._server_state = STATE_TRANSITIONS\ - .get(self._server_state, {})\ - .get(message, self._server_state) - if state_before != self._server_state: - log.debug("[#%04X] State: %s", self.local_port, - self._server_state.name) def fetch_message(self): """ Receive at most one message from the server, if available. @@ -304,15 +313,15 @@ def fetch_message(self): response.complete = True if summary_signature == b"\x70": log.debug("[#%04X] S: SUCCESS %r", self.local_port, summary_metadata) - self._update_server_state_on_success(summary_metadata, - response.message) + self._server_state_manager.transition(response.message, + summary_metadata) response.on_success(summary_metadata or {}) elif summary_signature == b"\x7E": log.debug("[#%04X] S: IGNORED", self.local_port) response.on_ignored(summary_metadata or {}) elif summary_signature == b"\x7F": log.debug("[#%04X] S: FAILURE %r", self.local_port, summary_metadata) - self._server_state = ServerStates.FAILED + self._server_state_manager.state = ServerStates.FAILED try: response.on_failure(summary_metadata or {}) except (ServiceUnavailable, DatabaseUnavailable): diff --git a/neo4j/io/_bolt4.py b/neo4j/io/_bolt4.py index a9d476da2..d966a6972 100644 --- a/neo4j/io/_bolt4.py +++ b/neo4j/io/_bolt4.py @@ -49,8 +49,8 @@ Response, ) from neo4j.io._bolt3 import ( + ServerStateManager, ServerStates, - STATE_TRANSITIONS, ) @@ -69,7 +69,15 @@ class Bolt4x0(Bolt): supports_multiple_databases = True - _server_state = ServerStates.CONNECTED + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._server_state_manager = ServerStateManager( + ServerStates.CONNECTED, on_change=self._on_server_state_change + ) + + def _on_server_state_change(self, old_state, new_state): + log.debug("[#%04X] State: %s > %s", self.local_port, + old_state.name, new_state.name) @property def is_reset(self): @@ -77,7 +85,7 @@ def is_reset(self): # we can't be sure of the server's state as there are still pending # responses. return False - return self._server_state == ServerStates.READY + return self._server_state_manager.state == ServerStates.READY @property def encrypted(self): @@ -181,7 +189,6 @@ def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None, **handlers)) else: self._append(b"\x10", fields, Response(self, "run", **handlers)) - self._is_reset = False def discard(self, n=-1, qid=-1, **handlers): extra = {"n": n} @@ -196,7 +203,6 @@ def pull(self, n=-1, qid=-1, **handlers): extra["qid"] = qid log.debug("[#%04X] C: PULL %r", self.local_port, extra) self._append(b"\x3F", (extra,), Response(self, "pull", **handlers)) - self._is_reset = False def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, db=None, **handlers): @@ -222,7 +228,6 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, raise TypeError("Timeout must be specified as a number of seconds") log.debug("[#%04X] C: BEGIN %r", self.local_port, extra) self._append(b"\x11", (extra,), Response(self, "begin", **handlers)) - self._is_reset = False def commit(self, **handlers): log.debug("[#%04X] C: COMMIT", self.local_port) @@ -244,18 +249,6 @@ def fail(metadata): self._append(b"\x0F", response=Response(self, "reset", on_failure=fail)) self.send_all() self.fetch_all() - self._is_reset = True - - def _update_server_state_on_success(self, metadata, message): - if metadata.get("has_more"): - return - state_before = self._server_state - self._server_state = STATE_TRANSITIONS\ - .get(self._server_state, {})\ - .get(message, self._server_state) - if state_before != self._server_state: - log.debug("[#%04X] [%s]", self.local_port, - self._server_state.name) def fetch_message(self): """ Receive at most one message from the server, if available. @@ -288,15 +281,15 @@ def fetch_message(self): response.complete = True if summary_signature == b"\x70": log.debug("[#%04X] S: SUCCESS %r", self.local_port, summary_metadata) - self._update_server_state_on_success(summary_metadata, - response.message) + self._server_state_manager.transition(response.message, + summary_metadata) response.on_success(summary_metadata or {}) elif summary_signature == b"\x7E": log.debug("[#%04X] S: IGNORED", self.local_port) response.on_ignored(summary_metadata or {}) elif summary_signature == b"\x7F": log.debug("[#%04X] S: FAILURE %r", self.local_port, summary_metadata) - self._server_state = ServerStates.FAILED + self._server_state_manager.state = ServerStates.FAILED try: response.on_failure(summary_metadata or {}) except (ServiceUnavailable, DatabaseUnavailable): From 82018c10d101968beae94a22ccb710c6370dca02 Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Wed, 4 Aug 2021 10:11:44 +0200 Subject: [PATCH 3/4] Fix double reset when pipelining. --- neo4j/io/_bolt3.py | 7 ++++--- neo4j/io/_bolt4.py | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/neo4j/io/_bolt3.py b/neo4j/io/_bolt3.py index 0a9cf512d..fe7608b0b 100644 --- a/neo4j/io/_bolt3.py +++ b/neo4j/io/_bolt3.py @@ -125,9 +125,10 @@ def _on_server_state_change(self, old_state, new_state): @property def is_reset(self): if self.responses: - # we can't be sure of the server's state as there are still pending - # responses. - return False + # We can't be sure of the server's state as there are still pending + # responses. Unless the last message we sent was RESET. In that case + # the server state will always be READY when we're done. + return self.responses[-1].message == "reset" return self._server_state_manager.state == ServerStates.READY @property diff --git a/neo4j/io/_bolt4.py b/neo4j/io/_bolt4.py index d966a6972..4b7c2045c 100644 --- a/neo4j/io/_bolt4.py +++ b/neo4j/io/_bolt4.py @@ -82,9 +82,10 @@ def _on_server_state_change(self, old_state, new_state): @property def is_reset(self): if self.responses: - # we can't be sure of the server's state as there are still pending - # responses. - return False + # We can't be sure of the server's state as there are still pending + # responses. Unless the last message we sent was RESET. In that case + # the server state will always be READY when we're done. + return self.responses[-1].message == "reset" return self._server_state_manager.state == ServerStates.READY @property From a59b8f23321aa0d23ee817d955be4a0c9cf66e76 Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Wed, 4 Aug 2021 10:15:55 +0200 Subject: [PATCH 4/4] Enable TestKit test --- testkitbackend/test_config.json | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/testkitbackend/test_config.json b/testkitbackend/test_config.json index 21ce3db3c..8e4450e8b 100644 --- a/testkitbackend/test_config.json +++ b/testkitbackend/test_config.json @@ -29,9 +29,7 @@ "stub.session_run_parameters.test_session_run_parameters.TestSessionRunParameters.test_empty_query": "Driver rejects empty queries before sending it to the server", "tls.tlsversions.TestTlsVersions.test_1_1": - "TLSv1.1 and below are disabled in the driver", - "stub.disconnects.test_disconnects.TestDisconnects.test_fail_on_reset": - "Driver silently ignores all errors on releasing connections back into the pool." + "TLSv1.1 and below are disabled in the driver" }, "features": { "AuthorizationExpiredTreatment": true,