From 2960f058577ed850bfb6ed1980a194b998367d6a Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Tue, 11 Jan 2022 17:40:12 +0100 Subject: [PATCH 1/2] Allow tx timeout to be 0 and send it Altering TestKit back end to treat any exceptions raised inside the driver code (using trace back stack analysis) as `DriverError`s --- neo4j/_async/io/_bolt3.py | 12 ++++--- neo4j/_async/io/_bolt4.py | 30 ++++++++++------- neo4j/_sync/io/_bolt3.py | 12 ++++--- neo4j/_sync/io/_bolt4.py | 30 ++++++++++------- testkitbackend/_async/backend.py | 53 +++++++++++++++++++++---------- testkitbackend/_async/requests.py | 8 ++--- testkitbackend/_sync/backend.py | 53 +++++++++++++++++++++---------- testkitbackend/_sync/requests.py | 8 ++--- testkitbackend/fromtestkit.py | 23 ++++++++------ 9 files changed, 147 insertions(+), 82 deletions(-) diff --git a/neo4j/_async/io/_bolt3.py b/neo4j/_async/io/_bolt3.py index 8bf7e9397..febc442ab 100644 --- a/neo4j/_async/io/_bolt3.py +++ b/neo4j/_async/io/_bolt3.py @@ -229,11 +229,13 @@ def run(self, query, parameters=None, mode=None, bookmarks=None, extra["tx_metadata"] = dict(metadata) except TypeError: raise TypeError("Metadata must be coercible to a dict") - if timeout: + if timeout is not None: try: - extra["tx_timeout"] = int(1000 * timeout) + extra["tx_timeout"] = int(1000 * float(timeout)) except TypeError: raise TypeError("Timeout must be specified as a number of seconds") + if extra["tx_timeout"] < 0: + raise ValueError("Timeout must be a positive number or 0.") fields = (query, parameters, extra) log.debug("[#%04X] C: RUN %s", self.local_port, " ".join(map(repr, fields))) if query.upper() == u"COMMIT": @@ -281,11 +283,13 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, extra["tx_metadata"] = dict(metadata) except TypeError: raise TypeError("Metadata must be coercible to a dict") - if timeout: + if timeout is not None: try: - extra["tx_timeout"] = int(1000 * timeout) + extra["tx_timeout"] = int(1000 * float(timeout)) except TypeError: raise TypeError("Timeout must be specified as a number of seconds") + if extra["tx_timeout"] < 0: + raise ValueError("Timeout must be a positive number or 0.") log.debug("[#%04X] C: BEGIN %r", self.local_port, extra) self._append(b"\x11", (extra,), Response(self, "begin", **handlers)) diff --git a/neo4j/_async/io/_bolt4.py b/neo4j/_async/io/_bolt4.py index 326e7212e..fae37a06c 100644 --- a/neo4j/_async/io/_bolt4.py +++ b/neo4j/_async/io/_bolt4.py @@ -181,11 +181,13 @@ def run(self, query, parameters=None, mode=None, bookmarks=None, extra["tx_metadata"] = dict(metadata) except TypeError: raise TypeError("Metadata must be coercible to a dict") - if timeout: + if timeout is not None: try: - extra["tx_timeout"] = int(1000 * timeout) + extra["tx_timeout"] = int(1000 * float(timeout)) except TypeError: raise TypeError("Timeout must be specified as a number of seconds") + if extra["tx_timeout"] < 0: + raise ValueError("Timeout must be a positive number or 0.") fields = (query, parameters, extra) log.debug("[#%04X] C: RUN %s", self.local_port, " ".join(map(repr, fields))) if query.upper() == u"COMMIT": @@ -232,11 +234,13 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, extra["tx_metadata"] = dict(metadata) except TypeError: raise TypeError("Metadata must be coercible to a dict") - if timeout: + if timeout is not None: try: - extra["tx_timeout"] = int(1000 * timeout) + extra["tx_timeout"] = int(1000 * float(timeout)) except TypeError: raise TypeError("Timeout must be specified as a number of seconds") + if extra["tx_timeout"] < 0: + raise ValueError("Timeout must be a positive number or 0.") log.debug("[#%04X] C: BEGIN %r", self.local_port, extra) self._append(b"\x11", (extra,), Response(self, "begin", **handlers)) @@ -492,12 +496,13 @@ def run(self, query, parameters=None, mode=None, bookmarks=None, extra["tx_metadata"] = dict(metadata) except TypeError: raise TypeError("Metadata must be coercible to a dict") - if timeout: + if timeout is not None: try: - extra["tx_timeout"] = int(1000 * timeout) + extra["tx_timeout"] = int(1000 * float(timeout)) except TypeError: - raise TypeError("Timeout must be specified as a number of " - "seconds") + raise TypeError("Timeout must be specified as a number of seconds") + if extra["tx_timeout"] < 0: + raise ValueError("Timeout must be a positive number or 0.") fields = (query, parameters, extra) log.debug("[#%04X] C: RUN %s", self.local_port, " ".join(map(repr, fields))) @@ -527,11 +532,12 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, extra["tx_metadata"] = dict(metadata) except TypeError: raise TypeError("Metadata must be coercible to a dict") - if timeout: + if timeout is not None: try: - extra["tx_timeout"] = int(1000 * timeout) + extra["tx_timeout"] = int(1000 * float(timeout)) except TypeError: - raise TypeError("Timeout must be specified as a number of " - "seconds") + raise TypeError("Timeout must be specified as a number of seconds") + if extra["tx_timeout"] < 0: + raise ValueError("Timeout must be a positive number or 0.") log.debug("[#%04X] C: BEGIN %r", self.local_port, extra) self._append(b"\x11", (extra,), Response(self, "begin", **handlers)) diff --git a/neo4j/_sync/io/_bolt3.py b/neo4j/_sync/io/_bolt3.py index e19bd4c27..8eac53f09 100644 --- a/neo4j/_sync/io/_bolt3.py +++ b/neo4j/_sync/io/_bolt3.py @@ -229,11 +229,13 @@ def run(self, query, parameters=None, mode=None, bookmarks=None, extra["tx_metadata"] = dict(metadata) except TypeError: raise TypeError("Metadata must be coercible to a dict") - if timeout: + if timeout is not None: try: - extra["tx_timeout"] = int(1000 * timeout) + extra["tx_timeout"] = int(1000 * float(timeout)) except TypeError: raise TypeError("Timeout must be specified as a number of seconds") + if extra["tx_timeout"] < 0: + raise ValueError("Timeout must be a positive number or 0.") fields = (query, parameters, extra) log.debug("[#%04X] C: RUN %s", self.local_port, " ".join(map(repr, fields))) if query.upper() == u"COMMIT": @@ -281,11 +283,13 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, extra["tx_metadata"] = dict(metadata) except TypeError: raise TypeError("Metadata must be coercible to a dict") - if timeout: + if timeout is not None: try: - extra["tx_timeout"] = int(1000 * timeout) + extra["tx_timeout"] = int(1000 * float(timeout)) except TypeError: raise TypeError("Timeout must be specified as a number of seconds") + if extra["tx_timeout"] < 0: + raise ValueError("Timeout must be a positive number or 0.") log.debug("[#%04X] C: BEGIN %r", self.local_port, extra) self._append(b"\x11", (extra,), Response(self, "begin", **handlers)) diff --git a/neo4j/_sync/io/_bolt4.py b/neo4j/_sync/io/_bolt4.py index 332422162..44c807ce6 100644 --- a/neo4j/_sync/io/_bolt4.py +++ b/neo4j/_sync/io/_bolt4.py @@ -181,11 +181,13 @@ def run(self, query, parameters=None, mode=None, bookmarks=None, extra["tx_metadata"] = dict(metadata) except TypeError: raise TypeError("Metadata must be coercible to a dict") - if timeout: + if timeout is not None: try: - extra["tx_timeout"] = int(1000 * timeout) + extra["tx_timeout"] = int(1000 * float(timeout)) except TypeError: raise TypeError("Timeout must be specified as a number of seconds") + if extra["tx_timeout"] < 0: + raise ValueError("Timeout must be a positive number or 0.") fields = (query, parameters, extra) log.debug("[#%04X] C: RUN %s", self.local_port, " ".join(map(repr, fields))) if query.upper() == u"COMMIT": @@ -232,11 +234,13 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, extra["tx_metadata"] = dict(metadata) except TypeError: raise TypeError("Metadata must be coercible to a dict") - if timeout: + if timeout is not None: try: - extra["tx_timeout"] = int(1000 * timeout) + extra["tx_timeout"] = int(1000 * float(timeout)) except TypeError: raise TypeError("Timeout must be specified as a number of seconds") + if extra["tx_timeout"] < 0: + raise ValueError("Timeout must be a positive number or 0.") log.debug("[#%04X] C: BEGIN %r", self.local_port, extra) self._append(b"\x11", (extra,), Response(self, "begin", **handlers)) @@ -492,12 +496,13 @@ def run(self, query, parameters=None, mode=None, bookmarks=None, extra["tx_metadata"] = dict(metadata) except TypeError: raise TypeError("Metadata must be coercible to a dict") - if timeout: + if timeout is not None: try: - extra["tx_timeout"] = int(1000 * timeout) + extra["tx_timeout"] = int(1000 * float(timeout)) except TypeError: - raise TypeError("Timeout must be specified as a number of " - "seconds") + raise TypeError("Timeout must be specified as a number of seconds") + if extra["tx_timeout"] < 0: + raise ValueError("Timeout must be a positive number or 0.") fields = (query, parameters, extra) log.debug("[#%04X] C: RUN %s", self.local_port, " ".join(map(repr, fields))) @@ -527,11 +532,12 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, extra["tx_metadata"] = dict(metadata) except TypeError: raise TypeError("Metadata must be coercible to a dict") - if timeout: + if timeout is not None: try: - extra["tx_timeout"] = int(1000 * timeout) + extra["tx_timeout"] = int(1000 * float(timeout)) except TypeError: - raise TypeError("Timeout must be specified as a number of " - "seconds") + raise TypeError("Timeout must be specified as a number of seconds") + if extra["tx_timeout"] < 0: + raise ValueError("Timeout must be a positive number or 0.") log.debug("[#%04X] C: BEGIN %r", self.local_port, extra) self._append(b"\x11", (extra,), Response(self, "begin", **handlers)) diff --git a/testkitbackend/_async/backend.py b/testkitbackend/_async/backend.py index 014e99e0f..43134759e 100644 --- a/testkitbackend/_async/backend.py +++ b/testkitbackend/_async/backend.py @@ -25,6 +25,7 @@ dumps, loads, ) +from pathlib import Path import traceback from neo4j._exceptions import BoltError @@ -42,6 +43,10 @@ from ..backend import Request +TESTKIT_BACKEND_PATH = Path(__file__).absolute().resolve().parents[1] +DRIVER_PATH = TESTKIT_BACKEND_PATH.parent / "neo4j" + + class AsyncBackend: def __init__(self, rd, wr): self._rd = rd @@ -81,6 +86,30 @@ async def process_request(self): request = request + line return False + @staticmethod + def _exc_stems_from_driver(exc): + stack = traceback.extract_tb(exc.__traceback__) + for frame in stack[-1:1:-1]: + p = Path(frame.filename) + if TESTKIT_BACKEND_PATH in p.parents: + return False + if DRIVER_PATH in p.parents: + return True + + async def _handle_driver_exc(self, exc): + log.debug(traceback.format_exc()) + if isinstance(exc, Neo4jError): + msg = "" if exc.message is None else str(exc.message) + else: + msg = str(exc.args[0]) if exc.args else "" + + key = self.next_key() + self.errors[key] = exc + payload = {"id": key, "errorType": str(type(exc)), "msg": msg} + if isinstance(exc, Neo4jError): + payload["code"] = exc.code + await self.send_response("DriverError", payload) + async def _process(self, request): """ Process a received request by retrieving handler that corresponds to the request name. @@ -104,24 +133,16 @@ async def _process(self, request): ) except (Neo4jError, DriverError, UnsupportedServerProduct, BoltError) as e: - log.debug(traceback.format_exc()) - if isinstance(e, Neo4jError): - msg = "" if e.message is None else str(e.message) - else: - msg = str(e.args[0]) if e.args else "" - - key = self.next_key() - self.errors[key] = e - payload = {"id": key, "errorType": str(type(e)), "msg": msg} - if isinstance(e, Neo4jError): - payload["code"] = e.code - await self.send_response("DriverError", payload) + await self._handle_driver_exc(e) except requests.FrontendError as e: await self.send_response("FrontendError", {"msg": str(e)}) - except Exception: - tb = traceback.format_exc() - log.error(tb) - await self.send_response("BackendError", {"msg": tb}) + except Exception as e: + if self._exc_stems_from_driver(e): + await self._handle_driver_exc(e) + else: + tb = traceback.format_exc() + log.error(tb) + await self.send_response("BackendError", {"msg": tb}) async def send_response(self, name, data): """ Sends a response to backend. diff --git a/testkitbackend/_async/requests.py b/testkitbackend/_async/requests.py index ac84f4573..cd7a2ab6e 100644 --- a/testkitbackend/_async/requests.py +++ b/testkitbackend/_async/requests.py @@ -253,8 +253,8 @@ async def SessionClose(backend, data): async def SessionBeginTransaction(backend, data): key = data["sessionId"] session = backend.sessions[key].session - metadata, timeout = fromtestkit.to_meta_and_timeout(data) - tx = await session.begin_transaction(metadata=metadata, timeout=timeout) + tx_kwargs = fromtestkit.to_tx_kwargs(data) + tx = await session.begin_transaction(**tx_kwargs) key = backend.next_key() backend.transactions[key] = tx await backend.send_response("Transaction", {"id": key}) @@ -272,9 +272,9 @@ async def transactionFunc(backend, data, is_read): key = data["sessionId"] session_tracker = backend.sessions[key] session = session_tracker.session - metadata, timeout = fromtestkit.to_meta_and_timeout(data) + tx_kwargs = fromtestkit.to_tx_kwargs(data) - @neo4j.unit_of_work(metadata=metadata, timeout=timeout) + @neo4j.unit_of_work(**tx_kwargs) async def func(tx): txkey = backend.next_key() backend.transactions[txkey] = tx diff --git a/testkitbackend/_sync/backend.py b/testkitbackend/_sync/backend.py index 9f1bae94c..05c63cee1 100644 --- a/testkitbackend/_sync/backend.py +++ b/testkitbackend/_sync/backend.py @@ -25,6 +25,7 @@ dumps, loads, ) +from pathlib import Path import traceback from neo4j._exceptions import BoltError @@ -42,6 +43,10 @@ from ..backend import Request +TESTKIT_BACKEND_PATH = Path(__file__).absolute().resolve().parents[1] +DRIVER_PATH = TESTKIT_BACKEND_PATH.parent / "neo4j" + + class Backend: def __init__(self, rd, wr): self._rd = rd @@ -81,6 +86,30 @@ def process_request(self): request = request + line return False + @staticmethod + def _exc_stems_from_driver(exc): + stack = traceback.extract_tb(exc.__traceback__) + for frame in stack[-1:1:-1]: + p = Path(frame.filename) + if TESTKIT_BACKEND_PATH in p.parents: + return False + if DRIVER_PATH in p.parents: + return True + + def _handle_driver_exc(self, exc): + log.debug(traceback.format_exc()) + if isinstance(exc, Neo4jError): + msg = "" if exc.message is None else str(exc.message) + else: + msg = str(exc.args[0]) if exc.args else "" + + key = self.next_key() + self.errors[key] = exc + payload = {"id": key, "errorType": str(type(exc)), "msg": msg} + if isinstance(exc, Neo4jError): + payload["code"] = exc.code + self.send_response("DriverError", payload) + def _process(self, request): """ Process a received request by retrieving handler that corresponds to the request name. @@ -104,24 +133,16 @@ def _process(self, request): ) except (Neo4jError, DriverError, UnsupportedServerProduct, BoltError) as e: - log.debug(traceback.format_exc()) - if isinstance(e, Neo4jError): - msg = "" if e.message is None else str(e.message) - else: - msg = str(e.args[0]) if e.args else "" - - key = self.next_key() - self.errors[key] = e - payload = {"id": key, "errorType": str(type(e)), "msg": msg} - if isinstance(e, Neo4jError): - payload["code"] = e.code - self.send_response("DriverError", payload) + self._handle_driver_exc(e) except requests.FrontendError as e: self.send_response("FrontendError", {"msg": str(e)}) - except Exception: - tb = traceback.format_exc() - log.error(tb) - self.send_response("BackendError", {"msg": tb}) + except Exception as e: + if self._exc_stems_from_driver(e): + self._handle_driver_exc(e) + else: + tb = traceback.format_exc() + log.error(tb) + self.send_response("BackendError", {"msg": tb}) def send_response(self, name, data): """ Sends a response to backend. diff --git a/testkitbackend/_sync/requests.py b/testkitbackend/_sync/requests.py index 20a7767af..57300444e 100644 --- a/testkitbackend/_sync/requests.py +++ b/testkitbackend/_sync/requests.py @@ -253,8 +253,8 @@ def SessionClose(backend, data): def SessionBeginTransaction(backend, data): key = data["sessionId"] session = backend.sessions[key].session - metadata, timeout = fromtestkit.to_meta_and_timeout(data) - tx = session.begin_transaction(metadata=metadata, timeout=timeout) + tx_kwargs = fromtestkit.to_tx_kwargs(data) + tx = session.begin_transaction(**tx_kwargs) key = backend.next_key() backend.transactions[key] = tx backend.send_response("Transaction", {"id": key}) @@ -272,9 +272,9 @@ def transactionFunc(backend, data, is_read): key = data["sessionId"] session_tracker = backend.sessions[key] session = session_tracker.session - metadata, timeout = fromtestkit.to_meta_and_timeout(data) + tx_kwargs = fromtestkit.to_tx_kwargs(data) - @neo4j.unit_of_work(metadata=metadata, timeout=timeout) + @neo4j.unit_of_work(**tx_kwargs) def func(tx): txkey = backend.next_key() backend.transactions[txkey] = tx diff --git a/testkitbackend/fromtestkit.py b/testkitbackend/fromtestkit.py index 7cf17b926..39c74aed6 100644 --- a/testkitbackend/fromtestkit.py +++ b/testkitbackend/fromtestkit.py @@ -30,21 +30,24 @@ def to_cypher_and_params(data): return data["cypher"], params_dict -def to_meta_and_timeout(data): +def to_tx_kwargs(data): from .backend import Request - metadata = data.get('txMeta', None) - if isinstance(metadata, Request): - metadata.mark_all_as_read() - timeout = data.get('timeout', None) - if timeout: - timeout = timeout / 1000 - return metadata, timeout + kwargs = {} + if "txMeta" in data: + kwargs["metadata"] = data["txMeta"] + if isinstance(kwargs["metadata"], Request): + kwargs["metadata"].mark_all_as_read() + if "timeout" in data: + kwargs["timeout"] = data["timeout"] + if kwargs["timeout"] is not None: + kwargs["timeout"] /= 1000 + return kwargs def to_query_and_params(data): cypher, param = to_cypher_and_params(data) - metadata, timeout = to_meta_and_timeout(data) - query = Query(cypher, metadata=metadata, timeout=timeout) + tx_kwargs = to_tx_kwargs(data) + query = Query(cypher, **tx_kwargs) return query, param From bc6d413e1dcd818dd2116048ec967a26954202d6 Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Thu, 13 Jan 2022 14:48:09 +0100 Subject: [PATCH 2/2] Update tx timeout docs --- neo4j/work/query.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/neo4j/work/query.py b/neo4j/work/query.py index acefa6e73..56d00e9ea 100644 --- a/neo4j/work/query.py +++ b/neo4j/work/query.py @@ -24,7 +24,7 @@ class Query: :param metadata: metadata attached to the query. :type metadata: dict :param timeout: seconds. - :type timeout: int + :type timeout: float or None """ def __init__(self, text, metadata=None, timeout=None): self.text = text @@ -59,8 +59,10 @@ def count_people_tx(tx): Transactions that execute longer than the configured timeout will be terminated by the database. This functionality allows to limit query/transaction execution time. Specified timeout overrides the default timeout configured in the database using ``dbms.transaction.timeout`` setting. - Value should not represent a duration of zero or negative duration. - :type timeout: int + Value should not represent a negative duration. + A zero duration will make the transaction execute indefinitely. + None will use the default timeout configured in the database. + :type timeout: float or None """ def wrapper(f):