diff --git a/docs/source/api.rst b/docs/source/api.rst index 6976951ad..e5c0ec5d4 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -396,6 +396,7 @@ Additional configuration can be provided via the :class:`neo4j.Driver` construct + :ref:`encrypted-ref` + :ref:`keep-alive-ref` + :ref:`max-connection-lifetime-ref` ++ :ref:`liveness-check-timeout-ref` + :ref:`max-connection-pool-size-ref` + :ref:`max-transaction-retry-time-ref` + :ref:`resolver-ref` @@ -471,6 +472,33 @@ The maximum duration in seconds that the driver will keep a connection for befor :Default: ``3600`` +.. _liveness-check-timeout-ref: + +``liveness_check_timeout`` +-------------------------- +Pooled connections that have been idle in the pool for longer than this timeout (specified in seconds) will be tested +before they are used again, to ensure they are still live. +If this option is set too low, additional network round trips will be incurred when acquiring a connection, which causes +a performance hit. + +If this is set high, you may receive sessions that are backed by no longer live connections, which will lead to +exceptions in your application. +Assuming the database is running, these exceptions will go away if you retry or use a driver API with built-in retries. + +Hence, this parameter tunes a balance between the likelihood of your application seeing connection problems, and +performance. + +You normally should not need to tune this parameter. +No connection liveliness check is done by default (:data:`None`). +A value of ``0`` means connections will always be tested for validity. +Negative values are not allowed. + +:Type: :class:`float` or :data:`None` +:Default: :data:`None` + +.. versionadded:: 5.15 + + .. _max-connection-pool-size-ref: ``max_connection_pool_size`` @@ -533,8 +561,8 @@ For example: resolver=custom_resolver) -:Type: ``Callable | None`` -:Default: ``None`` +:Type: ``Callable`` or :data:`None` +:Default: :data:`None` .. _trust-ref: @@ -619,7 +647,7 @@ custom ``ssl_context`` is configured. -------------- Specify the client agent name. -:Type: ``str`` +:Type: :class:`str` :Default: *The Python Driver will generate a user agent name.* diff --git a/src/neo4j/_async/driver.py b/src/neo4j/_async/driver.py index 8bbbb9d2f..1bba68bae 100644 --- a/src/neo4j/_async/driver.py +++ b/src/neo4j/_async/driver.py @@ -36,6 +36,7 @@ from .._async_compat.util import AsyncUtil from .._conf import ( Config, + ConfigurationError, PoolConfig, SessionConfig, TrustAll, @@ -125,11 +126,9 @@ def driver( cls, uri: str, *, - auth: t.Union[ - _TAuth, - AsyncAuthManager, - ] = ..., + auth: t.Union[_TAuth, AsyncAuthManager] = ..., max_connection_lifetime: float = ..., + liveness_check_timeout: t.Optional[float] = ..., max_connection_pool_size: int = ..., connection_timeout: float = ..., trust: t.Union[ @@ -151,9 +150,10 @@ def driver( notifications_disabled_categories: t.Optional[ t.Iterable[T_NotificationDisabledCategory] ] = ..., + telemetry_disabled: bool = ..., # undocumented/unsupported options - # they may be change or removed any time without prior notice + # they may be changed or removed any time without prior notice connection_acquisition_timeout: float = ..., max_transaction_retry_time: float = ..., initial_retry_delay: float = ..., @@ -164,7 +164,6 @@ def driver( impersonated_user: t.Optional[str] = ..., bookmark_manager: t.Union[AsyncBookmarkManager, BookmarkManager, None] = ..., - telemetry_disabled: bool = ..., ) -> AsyncDriver: ... @@ -172,11 +171,10 @@ def driver( @classmethod def driver( - cls, uri: str, *, - auth: t.Union[ - _TAuth, - AsyncAuthManager, - ] = None, + cls, + uri: str, + *, + auth: t.Union[_TAuth, AsyncAuthManager] = None, **config ) -> AsyncDriver: """Create a driver. @@ -202,7 +200,6 @@ def driver( TRUST_ALL_CERTIFICATES, TRUST_SYSTEM_CA_SIGNED_CERTIFICATES ): - from ..exceptions import ConfigurationError raise ConfigurationError( "The config setting `trust` values are {!r}" .format( @@ -216,8 +213,8 @@ def driver( if ("trusted_certificates" in config.keys() and not isinstance(config["trusted_certificates"], TrustStore)): - raise ConnectionError( - "The config setting `trusted_certificates` must be of " + raise ConfigurationError( + 'The config setting "trusted_certificates" must be of ' "type neo4j.TrustAll, neo4j.TrustCustomCAs, or" "neo4j.TrustSystemCAs but was {}".format( type(config["trusted_certificates"]) @@ -229,7 +226,6 @@ def driver( or "trust" in config.keys() or "trusted_certificates" in config.keys() or "ssl_context" in config.keys())): - from ..exceptions import ConfigurationError # TODO: 6.0 - remove "trust" from error message raise ConfigurationError( @@ -257,12 +253,22 @@ def driver( config["encrypted"] = True config["trusted_certificates"] = TrustAll() _normalize_notifications_config(config) + liveness_check_timeout = config.get("liveness_check_timeout") + if ( + liveness_check_timeout is not None + and liveness_check_timeout < 0 + ): + raise ConfigurationError( + 'The config setting "liveness_check_timeout" must be ' + "greater than or equal to 0 but was " + f"{liveness_check_timeout}." + ) assert driver_type in (DRIVER_BOLT, DRIVER_NEO4J) if driver_type == DRIVER_BOLT: if parse_routing_context(parsed.query): deprecation_warn( - "Creating a direct driver (`bolt://` scheme) with " + 'Creating a direct driver ("bolt://" scheme) with ' "routing context (URI parameters) is deprecated. They " "will be ignored. This will raise an error in a " 'future release. Given URI "{}"'.format(uri), diff --git a/src/neo4j/_async/io/_bolt.py b/src/neo4j/_async/io/_bolt.py index 04e922fda..903c205c4 100644 --- a/src/neo4j/_async/io/_bolt.py +++ b/src/neo4j/_async/io/_bolt.py @@ -21,7 +21,7 @@ import typing as t from collections import deque from logging import getLogger -from time import perf_counter +from time import monotonic from ..._api import TelemetryAPI from ..._async_compat.network import AsyncBoltSocket @@ -159,9 +159,9 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *, self.hydration_handler = self.HYDRATION_HANDLER_CLS() self.responses = deque() self._max_connection_lifetime = max_connection_lifetime - self._creation_timestamp = perf_counter() + self._creation_timestamp = monotonic() self.routing_context = routing_context - self.idle_since = perf_counter() + self.idle_since = monotonic() # Determine the user agent if user_agent: @@ -797,7 +797,7 @@ def _append(self, signature, fields=(), response=None, async def _send_all(self): if await self.outbox.flush(): - self.idle_since = perf_counter() + self.idle_since = monotonic() async def send_all(self): """ Send all queued messages to the server. @@ -847,7 +847,7 @@ async def fetch_message(self): hydration_hooks=self.responses[0].hydration_hooks ) res = await self._process_message(tag, fields) - self.idle_since = perf_counter() + self.idle_since = monotonic() return res async def fetch_all(self): @@ -928,7 +928,7 @@ async def _set_defunct(self, message, error=None, silent=False): def stale(self): return (self._stale or (0 <= self._max_connection_lifetime - <= perf_counter() - self._creation_timestamp)) + <= monotonic() - self._creation_timestamp)) _stale = False @@ -983,7 +983,7 @@ def is_idle_for(self, timeout): :rtype: bool """ - return perf_counter() - self.idle_since > timeout + return monotonic() - self.idle_since > timeout AsyncBoltSocket.Bolt = AsyncBolt # type: ignore diff --git a/src/neo4j/_async/io/_pool.py b/src/neo4j/_async/io/_pool.py index e4c6b7e2e..1e51191b8 100644 --- a/src/neo4j/_async/io/_pool.py +++ b/src/neo4j/_async/io/_pool.py @@ -247,6 +247,8 @@ async def _acquire( auth = AcquireAuth(None) force_auth = auth.force_auth auth = auth.auth + if liveness_check_timeout is None: + liveness_check_timeout = self.pool_config.liveness_check_timeout async def health_check(connection_, deadline_): if (connection_.closed() diff --git a/src/neo4j/_async/work/session.py b/src/neo4j/_async/work/session.py index c93bf7df4..bfe4a9372 100644 --- a/src/neo4j/_async/work/session.py +++ b/src/neo4j/_async/work/session.py @@ -20,7 +20,7 @@ import typing as t from logging import getLogger from random import random -from time import perf_counter +from time import monotonic from ..._api import TelemetryAPI from ..._async_compat import async_sleep @@ -570,8 +570,8 @@ def api_success_cb(meta): return result if t0 == -1: # The timer should be started after the first attempt - t0 = perf_counter() - t1 = perf_counter() + t0 = monotonic() + t1 = monotonic() if t1 - t0 > self._config.max_transaction_retry_time: break delay = next(retry_delay) diff --git a/src/neo4j/_conf.py b/src/neo4j/_conf.py index 3a0fc0c49..91fac0af3 100644 --- a/src/neo4j/_conf.py +++ b/src/neo4j/_conf.py @@ -355,6 +355,10 @@ class PoolConfig(Config): max_connection_lifetime = 3600 # seconds # The maximum duration the driver will keep a connection for before being removed from the pool. + #: Timeout after which idle connections will be checked for liveness + #: before returned from the pool. + liveness_check_timeout = None + #: Max Connection Pool Size max_connection_pool_size = 100 # The maximum total number of connections allowed, per host (i.e. cluster nodes), to be managed by the connection pool. diff --git a/src/neo4j/_deadline.py b/src/neo4j/_deadline.py index e44c8fa14..c8fb205dc 100644 --- a/src/neo4j/_deadline.py +++ b/src/neo4j/_deadline.py @@ -15,7 +15,7 @@ from contextlib import contextmanager -from time import perf_counter +from time import monotonic class Deadline: @@ -23,7 +23,7 @@ def __init__(self, timeout): if timeout is None or timeout == float("inf"): self._deadline = float("inf") else: - self._deadline = perf_counter() + timeout + self._deadline = monotonic() + timeout self._original_timeout = timeout @property @@ -36,7 +36,7 @@ def expired(self): def to_timeout(self): if self._deadline == float("inf"): return None - timeout = self._deadline - perf_counter() + timeout = self._deadline - monotonic() return timeout if timeout > 0 else 0 def __eq__(self, other): diff --git a/src/neo4j/_routing.py b/src/neo4j/_routing.py index 5bc7f63b1..7762d6715 100644 --- a/src/neo4j/_routing.py +++ b/src/neo4j/_routing.py @@ -16,7 +16,7 @@ from collections.abc import MutableSet from logging import getLogger -from time import perf_counter +from time import monotonic from .addressing import Address @@ -106,7 +106,7 @@ def __init__(self, *, database, routers=(), readers=(), writers=(), ttl=0): self.readers = OrderedSet(readers) self.writers = OrderedSet(writers) self.initialized_without_writers = not self.writers - self.last_updated_time = perf_counter() + self.last_updated_time = monotonic() self.ttl = ttl self.database = database @@ -127,7 +127,7 @@ def is_fresh(self, readonly=False): """ Indicator for whether routing information is still usable. """ assert isinstance(readonly, bool) - expired = self.last_updated_time + self.ttl <= perf_counter() + expired = self.last_updated_time + self.ttl <= monotonic() if readonly: has_server_for_mode = bool(self.readers) else: @@ -146,7 +146,7 @@ def should_be_purged_from_memory(self): :rtype: bool """ from ._conf import RoutingConfig - perf_time = perf_counter() + perf_time = monotonic() res = self.last_updated_time + self.ttl + RoutingConfig.routing_table_purge_delay <= perf_time log.debug("[#0000] _: purge check: " "last_updated_time=%r, ttl=%r, perf_time=%r => %r", @@ -161,7 +161,7 @@ def update(self, new_routing_table): self.readers.replace(new_routing_table.readers) self.writers.replace(new_routing_table.writers) self.initialized_without_writers = not self.writers - self.last_updated_time = perf_counter() + self.last_updated_time = monotonic() self.ttl = new_routing_table.ttl log.debug("[#0000] _: updated table=%r", self) diff --git a/src/neo4j/_sync/driver.py b/src/neo4j/_sync/driver.py index 0bed94284..4273c4054 100644 --- a/src/neo4j/_sync/driver.py +++ b/src/neo4j/_sync/driver.py @@ -36,6 +36,7 @@ from .._async_compat.util import Util from .._conf import ( Config, + ConfigurationError, PoolConfig, SessionConfig, TrustAll, @@ -124,11 +125,9 @@ def driver( cls, uri: str, *, - auth: t.Union[ - _TAuth, - AuthManager, - ] = ..., + auth: t.Union[_TAuth, AuthManager] = ..., max_connection_lifetime: float = ..., + liveness_check_timeout: t.Optional[float] = ..., max_connection_pool_size: int = ..., connection_timeout: float = ..., trust: t.Union[ @@ -150,9 +149,10 @@ def driver( notifications_disabled_categories: t.Optional[ t.Iterable[T_NotificationDisabledCategory] ] = ..., + telemetry_disabled: bool = ..., # undocumented/unsupported options - # they may be change or removed any time without prior notice + # they may be changed or removed any time without prior notice connection_acquisition_timeout: float = ..., max_transaction_retry_time: float = ..., initial_retry_delay: float = ..., @@ -163,7 +163,6 @@ def driver( impersonated_user: t.Optional[str] = ..., bookmark_manager: t.Union[BookmarkManager, BookmarkManager, None] = ..., - telemetry_disabled: bool = ..., ) -> Driver: ... @@ -171,11 +170,10 @@ def driver( @classmethod def driver( - cls, uri: str, *, - auth: t.Union[ - _TAuth, - AuthManager, - ] = None, + cls, + uri: str, + *, + auth: t.Union[_TAuth, AuthManager] = None, **config ) -> Driver: """Create a driver. @@ -201,7 +199,6 @@ def driver( TRUST_ALL_CERTIFICATES, TRUST_SYSTEM_CA_SIGNED_CERTIFICATES ): - from ..exceptions import ConfigurationError raise ConfigurationError( "The config setting `trust` values are {!r}" .format( @@ -215,8 +212,8 @@ def driver( if ("trusted_certificates" in config.keys() and not isinstance(config["trusted_certificates"], TrustStore)): - raise ConnectionError( - "The config setting `trusted_certificates` must be of " + raise ConfigurationError( + 'The config setting "trusted_certificates" must be of ' "type neo4j.TrustAll, neo4j.TrustCustomCAs, or" "neo4j.TrustSystemCAs but was {}".format( type(config["trusted_certificates"]) @@ -228,7 +225,6 @@ def driver( or "trust" in config.keys() or "trusted_certificates" in config.keys() or "ssl_context" in config.keys())): - from ..exceptions import ConfigurationError # TODO: 6.0 - remove "trust" from error message raise ConfigurationError( @@ -256,12 +252,22 @@ def driver( config["encrypted"] = True config["trusted_certificates"] = TrustAll() _normalize_notifications_config(config) + liveness_check_timeout = config.get("liveness_check_timeout") + if ( + liveness_check_timeout is not None + and liveness_check_timeout < 0 + ): + raise ConfigurationError( + 'The config setting "liveness_check_timeout" must be ' + "greater than or equal to 0 but was " + f"{liveness_check_timeout}." + ) assert driver_type in (DRIVER_BOLT, DRIVER_NEO4J) if driver_type == DRIVER_BOLT: if parse_routing_context(parsed.query): deprecation_warn( - "Creating a direct driver (`bolt://` scheme) with " + 'Creating a direct driver ("bolt://" scheme) with ' "routing context (URI parameters) is deprecated. They " "will be ignored. This will raise an error in a " 'future release. Given URI "{}"'.format(uri), diff --git a/src/neo4j/_sync/io/_bolt.py b/src/neo4j/_sync/io/_bolt.py index 389cc092a..967d63036 100644 --- a/src/neo4j/_sync/io/_bolt.py +++ b/src/neo4j/_sync/io/_bolt.py @@ -21,7 +21,7 @@ import typing as t from collections import deque from logging import getLogger -from time import perf_counter +from time import monotonic from ..._api import TelemetryAPI from ..._async_compat.network import BoltSocket @@ -159,9 +159,9 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *, self.hydration_handler = self.HYDRATION_HANDLER_CLS() self.responses = deque() self._max_connection_lifetime = max_connection_lifetime - self._creation_timestamp = perf_counter() + self._creation_timestamp = monotonic() self.routing_context = routing_context - self.idle_since = perf_counter() + self.idle_since = monotonic() # Determine the user agent if user_agent: @@ -797,7 +797,7 @@ def _append(self, signature, fields=(), response=None, def _send_all(self): if self.outbox.flush(): - self.idle_since = perf_counter() + self.idle_since = monotonic() def send_all(self): """ Send all queued messages to the server. @@ -847,7 +847,7 @@ def fetch_message(self): hydration_hooks=self.responses[0].hydration_hooks ) res = self._process_message(tag, fields) - self.idle_since = perf_counter() + self.idle_since = monotonic() return res def fetch_all(self): @@ -928,7 +928,7 @@ def _set_defunct(self, message, error=None, silent=False): def stale(self): return (self._stale or (0 <= self._max_connection_lifetime - <= perf_counter() - self._creation_timestamp)) + <= monotonic() - self._creation_timestamp)) _stale = False @@ -983,7 +983,7 @@ def is_idle_for(self, timeout): :rtype: bool """ - return perf_counter() - self.idle_since > timeout + return monotonic() - self.idle_since > timeout BoltSocket.Bolt = Bolt # type: ignore diff --git a/src/neo4j/_sync/io/_pool.py b/src/neo4j/_sync/io/_pool.py index 5655a5dad..baabfd41b 100644 --- a/src/neo4j/_sync/io/_pool.py +++ b/src/neo4j/_sync/io/_pool.py @@ -244,6 +244,8 @@ def _acquire( auth = AcquireAuth(None) force_auth = auth.force_auth auth = auth.auth + if liveness_check_timeout is None: + liveness_check_timeout = self.pool_config.liveness_check_timeout def health_check(connection_, deadline_): if (connection_.closed() diff --git a/src/neo4j/_sync/work/session.py b/src/neo4j/_sync/work/session.py index 4153885f5..13b283fe9 100644 --- a/src/neo4j/_sync/work/session.py +++ b/src/neo4j/_sync/work/session.py @@ -20,7 +20,7 @@ import typing as t from logging import getLogger from random import random -from time import perf_counter +from time import monotonic from ..._api import TelemetryAPI from ..._async_compat import sleep @@ -570,8 +570,8 @@ def api_success_cb(meta): return result if t0 == -1: # The timer should be started after the first attempt - t0 = perf_counter() - t1 = perf_counter() + t0 = monotonic() + t1 = monotonic() if t1 - t0 > self._config.max_transaction_retry_time: break delay = next(retry_delay) diff --git a/testkitbackend/_async/requests.py b/testkitbackend/_async/requests.py index be95a9ed7..405df1d0a 100644 --- a/testkitbackend/_async/requests.py +++ b/testkitbackend/_async/requests.py @@ -38,10 +38,6 @@ test_subtest_skips, totestkit, ) -from .._warning_check import ( - warning_check, - warnings_check, -) from ..exceptions import MarkdAsDriverException @@ -119,6 +115,7 @@ async def NewDriver(backend, data): ("connectionTimeoutMs", "connection_timeout"), ("maxTxRetryTimeMs", "max_transaction_retry_time"), ("connectionAcquisitionTimeoutMs", "connection_acquisition_timeout"), + ("livenessCheckTimeoutMs", "liveness_check_timeout"), ): if data.get(timeout_testkit) is not None: kwargs[timeout_driver] = data[timeout_testkit] / 1000 @@ -147,7 +144,6 @@ async def NewDriver(backend, data): for cert in data["trustedCertificates"]) kwargs["trusted_certificates"] = neo4j.TrustCustomCAs(*cert_paths) fromtestkit.set_notifications_config(kwargs, data) - data.mark_item_as_read_if_equals("livenessCheckTimeoutMs", None) driver = neo4j.AsyncGraphDatabase.driver( data["uri"], auth=auth, user_agent=data["userAgent"], **kwargs, @@ -375,6 +371,9 @@ async def ExecuteQuery(backend, data): else: bookmark_manager = backend.bookmark_managers[bookmark_manager_id] kwargs["bookmark_manager_"] = bookmark_manager + if "authorizationToken" in config: + kwargs["auth_"] = fromtestkit.to_auth_token(config, + "authorizationToken") eager_result = await driver.execute_query(query, params, **kwargs) await backend.send_response("EagerResult", { @@ -796,6 +795,23 @@ async def GetRoutingTable(backend, data): response_data[role] = list(map(str, addresses)) await backend.send_response("RoutingTable", response_data) + +async def GetConnectionPoolMetrics(backend, data): + driver_id = data["driverId"] + address = neo4j.Address.parse(data["address"]) + driver = backend.drivers[driver_id] + connections = driver._pool.connections.get(address, ()) + in_use = ( + sum(c.in_use for c in connections) + + driver._pool.connections_reservations[address] + ) + idle = len(connections) - in_use + await backend.send_response("ConnectionPoolMetrics", { + "inUse": in_use, + "idle": idle, + }) + + async def FakeTimeInstall(backend, _data): assert backend.fake_time is None assert backend.fake_time_ticker is None @@ -804,6 +820,7 @@ async def FakeTimeInstall(backend, _data): backend.fake_time_ticker = backend.fake_time.start() await backend.send_response("FakeTimeAck", {}) + async def FakeTimeTick(backend, data): assert backend.fake_time is not None assert backend.fake_time_ticker is not None diff --git a/testkitbackend/_sync/requests.py b/testkitbackend/_sync/requests.py index b85e1c8a0..3e64f77d8 100644 --- a/testkitbackend/_sync/requests.py +++ b/testkitbackend/_sync/requests.py @@ -38,10 +38,6 @@ test_subtest_skips, totestkit, ) -from .._warning_check import ( - warning_check, - warnings_check, -) from ..exceptions import MarkdAsDriverException @@ -119,6 +115,7 @@ def NewDriver(backend, data): ("connectionTimeoutMs", "connection_timeout"), ("maxTxRetryTimeMs", "max_transaction_retry_time"), ("connectionAcquisitionTimeoutMs", "connection_acquisition_timeout"), + ("livenessCheckTimeoutMs", "liveness_check_timeout"), ): if data.get(timeout_testkit) is not None: kwargs[timeout_driver] = data[timeout_testkit] / 1000 @@ -147,7 +144,6 @@ def NewDriver(backend, data): for cert in data["trustedCertificates"]) kwargs["trusted_certificates"] = neo4j.TrustCustomCAs(*cert_paths) fromtestkit.set_notifications_config(kwargs, data) - data.mark_item_as_read_if_equals("livenessCheckTimeoutMs", None) driver = neo4j.GraphDatabase.driver( data["uri"], auth=auth, user_agent=data["userAgent"], **kwargs, @@ -375,6 +371,9 @@ def ExecuteQuery(backend, data): else: bookmark_manager = backend.bookmark_managers[bookmark_manager_id] kwargs["bookmark_manager_"] = bookmark_manager + if "authorizationToken" in config: + kwargs["auth_"] = fromtestkit.to_auth_token(config, + "authorizationToken") eager_result = driver.execute_query(query, params, **kwargs) backend.send_response("EagerResult", { @@ -796,6 +795,23 @@ def GetRoutingTable(backend, data): response_data[role] = list(map(str, addresses)) backend.send_response("RoutingTable", response_data) + +def GetConnectionPoolMetrics(backend, data): + driver_id = data["driverId"] + address = neo4j.Address.parse(data["address"]) + driver = backend.drivers[driver_id] + connections = driver._pool.connections.get(address, ()) + in_use = ( + sum(c.in_use for c in connections) + + driver._pool.connections_reservations[address] + ) + idle = len(connections) - in_use + backend.send_response("ConnectionPoolMetrics", { + "inUse": in_use, + "idle": idle, + }) + + def FakeTimeInstall(backend, _data): assert backend.fake_time is None assert backend.fake_time_ticker is None @@ -804,6 +820,7 @@ def FakeTimeInstall(backend, _data): backend.fake_time_ticker = backend.fake_time.start() backend.send_response("FakeTimeAck", {}) + def FakeTimeTick(backend, data): assert backend.fake_time is not None assert backend.fake_time_ticker is not None diff --git a/testkitbackend/test_config.json b/testkitbackend/test_config.json index 92af6d4ab..2f11e9d46 100644 --- a/testkitbackend/test_config.json +++ b/testkitbackend/test_config.json @@ -13,7 +13,9 @@ "'neo4j.datatypes.test_temporal_types.TestDataTypes.test_should_echo_all_timezone_ids'": "test_subtest_skips.dt_conversion", "'neo4j.datatypes.test_temporal_types.TestDataTypes.test_date_time_cypher_created_tz_id'": - "test_subtest_skips.tz_id" + "test_subtest_skips.tz_id", + "stub\\.routing\\.test_routing_v[0-9x]+\\.RoutingV[0-9x]+\\.test_should_drop_connections_failing_liveness_check": + "Liveness check error handling is not (yet) unified: https://github.com/neo-technology/drivers-adr/pull/83" }, "features": { "Feature:API:BookmarkManager": true, @@ -25,7 +27,7 @@ "Feature:API:Driver.VerifyConnectivity": true, "Feature:API:Driver.SupportsSessionAuth": true, "Feature:API:Driver:NotificationsConfig": true, - "Feature:API:Liveness.Check": false, + "Feature:API:Liveness.Check": true, "Feature:API:Result.List": true, "Feature:API:Result.Peek": true, "Feature:API:Result.Single": true, diff --git a/tests/unit/async_/io/test_direct.py b/tests/unit/async_/io/test_direct.py index 3123d9a69..24a3d26ec 100644 --- a/tests/unit/async_/io/test_direct.py +++ b/tests/unit/async_/io/test_direct.py @@ -34,67 +34,23 @@ from ...._async_compat import mark_async_test -class AsyncFakeSocket: - def __init__(self, address): - self.address = address - - def getpeername(self): - return self.address - - async def sendall(self, data): - return - - def close(self): - return - - -class AsyncQuickConnection: - def __init__(self, socket): - self.socket = socket - self.address = socket.getpeername() - self.local_port = self.address[1] - self.connection_id = "bolt-1234" - - @property - def is_reset(self): - return True - - def stale(self): - return False - - async def reset(self): - pass - - def re_auth(self, auth, auth_manager, force=False): - return False - - def close(self): - self.socket.close() - - def closed(self): - return False - - def defunct(self): - return False - - def timedout(self): - return False - - def assert_re_auth_support(self): - pass - - class AsyncFakeBoltPool(AsyncIOPool): is_direct_pool = False - def __init__(self, address, *, auth=None, **config): + def __init__(self, connection_gen, address, *, auth=None, **config): + self.buffered_connection_mocks = [] config["auth"] = static_auth(None) self.pool_config, self.workspace_config = Config.consume_chain(config, PoolConfig, WorkspaceConfig) if config: raise ValueError("Unexpected config keys: %s" % ", ".join(config.keys())) async def opener(addr, auth, timeout): - return AsyncQuickConnection(AsyncFakeSocket(addr)) + if self.buffered_connection_mocks: + mock = self.buffered_connection_mocks.pop() + else: + mock = connection_gen() + mock.address = addr + return mock super().__init__(opener, self.pool_config, self.workspace_config) self.address = address @@ -147,12 +103,14 @@ async def test_bolt_connection_ping_timeout(): @pytest.fixture -async def pool(): - async with AsyncFakeBoltPool(("127.0.0.1", 7687)) as pool: +async def pool(async_fake_connection_generator): + async with AsyncFakeBoltPool( + async_fake_connection_generator, ("127.0.0.1", 7687) + ) as pool: yield pool -def assert_pool_size( address, expected_active, expected_inactive, pool): +def assert_pool_size(address, expected_active, expected_inactive, pool): try: connections = pool.connections[address] except KeyError: @@ -225,8 +183,10 @@ async def test_pool_in_use_count(pool): @mark_async_test -async def test_pool_max_conn_pool_size(pool): - async with AsyncFakeBoltPool((), max_connection_pool_size=1) as pool: +async def test_pool_max_conn_pool_size(async_fake_connection_generator): + async with AsyncFakeBoltPool( + async_fake_connection_generator, (), max_connection_pool_size=1 + ) as pool: address = neo4j.Address(("127.0.0.1", 7687)) await pool._acquire(address, None, Deadline(0), None) assert pool.in_use_connection_count(address) == 1 @@ -237,22 +197,67 @@ async def test_pool_max_conn_pool_size(pool): @pytest.mark.parametrize("is_reset", (True, False)) @mark_async_test -async def test_pool_reset_when_released(is_reset, pool, mocker): +async def test_pool_reset_when_released( + is_reset, pool, async_fake_connection_generator +): + connection_mock = async_fake_connection_generator() + pool.buffered_connection_mocks.append(connection_mock) address = neo4j.Address(("127.0.0.1", 7687)) - quick_connection_name = AsyncQuickConnection.__name__ - is_reset_mock = mocker.patch( - f"{__name__}.{quick_connection_name}.is_reset", - new_callable=mocker.PropertyMock - ) - reset_mock = mocker.patch( - f"{__name__}.{quick_connection_name}.reset", - new_callable=mocker.AsyncMock - ) + is_reset_mock = connection_mock.is_reset_mock + reset_mock = connection_mock.reset is_reset_mock.return_value = is_reset connection = await pool._acquire(address, None, Deadline(3), None) - assert isinstance(connection, AsyncQuickConnection) assert is_reset_mock.call_count == 0 assert reset_mock.call_count == 0 await pool.release(connection) assert is_reset_mock.call_count == 1 assert reset_mock.call_count == int(not is_reset) + + +@pytest.mark.parametrize("config_timeout", (None, 0, 0.2, 1234)) +@pytest.mark.parametrize("acquire_timeout", (None, 0, 0.2, 1234)) +@mark_async_test +async def test_liveness_check( + config_timeout, acquire_timeout, async_fake_connection_generator +): + effective_timeout = config_timeout + if acquire_timeout is not None: + effective_timeout = acquire_timeout + async with AsyncFakeBoltPool( + async_fake_connection_generator, ("127.0.0.1", 7687), + liveness_check_timeout=config_timeout, + ) as pool: + address = neo4j.Address(("127.0.0.1", 7687)) + # pre-populate pool + cx1 = await pool._acquire(address, None, Deadline(3), None) + await pool.release(cx1) + cx1.reset.assert_not_called() + cx1.is_idle_for.assert_not_called() + + # simulate just before timeout + cx1.is_idle_for.return_value = False + + cx2 = await pool._acquire(address, None, Deadline(3), acquire_timeout) + assert cx2 is cx1 + if effective_timeout is not None: + cx1.is_idle_for.assert_called_once_with(effective_timeout) + else: + cx1.is_idle_for.assert_not_called() + await pool.release(cx1) + cx1.reset.assert_not_called() + + # simulate after timeout + cx1.is_idle_for.return_value = True + cx1.is_idle_for.reset_mock() + + cx2 = await pool._acquire(address, None, Deadline(3), acquire_timeout) + assert cx2 is cx1 + if effective_timeout is not None: + cx1.is_idle_for.assert_called_once_with(effective_timeout) + cx1.reset.assert_awaited_once() + else: + cx1.is_idle_for.assert_not_called() + cx1.reset.assert_not_called() + cx1.reset.reset_mock() + await pool.release(cx1) + cx1.reset.assert_not_called() diff --git a/tests/unit/async_/test_driver.py b/tests/unit/async_/test_driver.py index e5418d46b..db0fdadaf 100644 --- a/tests/unit/async_/test_driver.py +++ b/tests/unit/async_/test_driver.py @@ -124,48 +124,48 @@ async def test_routing_driver_constructor(protocol, host, port, params, auth_tok @pytest.mark.parametrize( ("test_config", "expected_failure", "expected_failure_message"), ( - ({"encrypted": False}, ConfigurationError, "The config settings"), - ({"encrypted": True}, ConfigurationError, "The config settings"), + ({"encrypted": False}, ConfigurationError, '"encrypted"'), + ({"encrypted": True}, ConfigurationError, '"encrypted"'), ( {"encrypted": True, "trust": TRUST_ALL_CERTIFICATES}, - ConfigurationError, "The config settings" + ConfigurationError, '"encrypted"' ), ( {"trust": TRUST_ALL_CERTIFICATES}, - ConfigurationError, "The config settings" + ConfigurationError, '"trust"' ), ( {"trust": TRUST_SYSTEM_CA_SIGNED_CERTIFICATES}, - ConfigurationError, "The config settings" + ConfigurationError, '"trust"' ), ( {"encrypted": True, "trusted_certificates": TrustAll()}, - ConfigurationError, "The config settings" + ConfigurationError, '"encrypted"' ), ( {"trusted_certificates": TrustAll()}, - ConfigurationError, "The config settings" + ConfigurationError, '"trusted_certificates"' ), ( {"trusted_certificates": TrustSystemCAs()}, - ConfigurationError, "The config settings" + ConfigurationError, '"trusted_certificates"' ), ( {"trusted_certificates": TrustCustomCAs("foo", "bar")}, - ConfigurationError, "The config settings" + ConfigurationError, '"trusted_certificates"' ), ( {"ssl_context": None}, - ConfigurationError, "The config settings" + ConfigurationError, '"ssl_context"' ), ( {"ssl_context": ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)}, - ConfigurationError, "The config settings" + ConfigurationError, '"ssl_context"' ), ) ) @mark_async_test -async def test_driver_config_error( +async def test_driver_config_error_uri_conflict( test_uri, test_config, expected_failure, expected_failure_message ): def driver_builder(): @@ -176,7 +176,7 @@ def driver_builder(): return AsyncGraphDatabase.driver(test_uri, **test_config) if "+" in test_uri: - # `+s` and `+ssc` are short hand syntax for not having to configure the + # `+s` and `+ssc` are shorthand syntax for not having to configure the # encryption behavior of the driver. Specifying both is invalid. with pytest.raises(expected_failure, match=expected_failure_message): driver_builder() @@ -210,6 +210,22 @@ def test_driver_trust_config_error( AsyncGraphDatabase.driver("bolt://127.0.0.1:9001", **test_config) +@pytest.mark.parametrize( + ("test_config", "expected_failure", "expected_failure_message"), + ( + ( + {"liveness_check_timeout": -1}, + ConfigurationError, '"liveness_check_timeout"' + ), + ) +) +def test_driver_liveness_timeout_config_error( + test_config, expected_failure, expected_failure_message +): + with pytest.raises(expected_failure, match=expected_failure_message): + AsyncGraphDatabase.driver("bolt://127.0.0.1:9001", **test_config) + + @pytest.mark.parametrize("uri", ( "bolt://127.0.0.1:9000", "neo4j://127.0.0.1:9000", diff --git a/tests/unit/common/test_conf.py b/tests/unit/common/test_conf.py index 4a46b6879..4e0ce912c 100644 --- a/tests/unit/common/test_conf.py +++ b/tests/unit/common/test_conf.py @@ -47,6 +47,7 @@ "connection_timeout": 30.0, "keep_alive": True, "max_connection_lifetime": 3600, + "liveness_check_timeout": None, "max_connection_pool_size": 100, "resolver": None, "encrypted": False, diff --git a/tests/unit/mixed/io/test_direct.py b/tests/unit/mixed/io/test_direct.py index 1f8a8ac77..6f9fb8f89 100644 --- a/tests/unit/mixed/io/test_direct.py +++ b/tests/unit/mixed/io/test_direct.py @@ -29,15 +29,13 @@ from neo4j._async.io._pool import AcquireAuth as AsyncAcquireAuth from neo4j._deadline import Deadline from neo4j._sync.io._pool import AcquireAuth -from neo4j.auth_management import ( - AsyncAuthManagers, - AuthManagers, -) +from ...async_.conftest import async_fake_connection_generator from ...async_.io.test_direct import AsyncFakeBoltPool from ...async_.test_auth_manager import ( static_auth_manager as static_async_auth_manager, ) +from ...sync.conftest import fake_connection_generator from ...sync.io.test_direct import FakeBoltPool from ...sync.test_auth_manager import static_auth_manager from ._common import ( @@ -61,7 +59,7 @@ def assert_pool_size(self, address, expected_active, expected_inactive, == len([cx for cx in connections if not cx.in_use])) @pytest.mark.parametrize("pre_populated", (0, 3, 5)) - def test_multithread(self, pre_populated): + def test_multithread(self, pre_populated, fake_connection_generator): connections_lock = Lock() connections = [] pre_populated_connections = [] @@ -77,7 +75,9 @@ def acquire_release_conn(pool_, address_, acquired_counter_, release_event_.wait() pool_.release(conn_) - with FakeBoltPool((), max_connection_pool_size=5) as pool: + with FakeBoltPool( + fake_connection_generator, (), max_connection_pool_size=5 + ) as pool: address = ("127.0.0.1", 7687) acquired_counter = MultiEvent() release_event = Event() @@ -121,7 +121,7 @@ def acquire_release_conn(pool_, address_, acquired_counter_, # The pool size is still 5, but all are free self.assert_pool_size(address, 0, 5, pool) - def test_full_pool_re_auth(self, mocker): + def test_full_pool_re_auth(self, fake_connection_generator, mocker): address = ("127.0.0.1", 7687) acquire_auth1 = AcquireAuth(auth=static_auth_manager( ("user1", "pass1")) @@ -145,7 +145,6 @@ def acquire1(pool_): if waiters: break time.sleep(0.001) - cx.re_auth = mocker.Mock(spec=cx.re_auth) pool_.release(cx) def acquire2(pool_): @@ -156,7 +155,9 @@ def acquire2(pool_): assert auth2 in cx.re_auth.call_args.args pool_.release(cx) - with FakeBoltPool((), max_connection_pool_size=1) as pool: + with FakeBoltPool( + fake_connection_generator, (), max_connection_pool_size=1 + ) as pool: t1 = threading.Thread(target=acquire1, args=(pool,), daemon=True) t2 = threading.Thread(target=acquire2, args=(pool,), daemon=True) t1.start() @@ -166,7 +167,9 @@ def acquire2(pool_): @pytest.mark.parametrize("pre_populated", (0, 3, 5)) @pytest.mark.asyncio - async def test_multi_coroutine(self, pre_populated): + async def test_multi_coroutine( + self, pre_populated, async_fake_connection_generator + ): connections = [] pre_populated_connections = [] @@ -200,7 +203,9 @@ async def waiter(pool_, acquired_counter_, release_event_): # The pool size is still 5, but all are free self.assert_pool_size(address, 0, 5, pool_) - async with AsyncFakeBoltPool((), max_connection_pool_size=5) as pool: + async with AsyncFakeBoltPool( + async_fake_connection_generator, (), max_connection_pool_size=5 + ) as pool: address = ("127.0.0.1", 7687) acquired_counter = AsyncMultiEvent() release_event = AsyncEvent() @@ -227,7 +232,9 @@ async def waiter(pool_, acquired_counter_, release_event_): ) @pytest.mark.asyncio - async def test_full_pool_re_auth_async(self, mocker): + async def test_full_pool_re_auth_async( + self, async_fake_connection_generator, mocker + ): address = ("127.0.0.1", 7687) acquire_auth1 = AsyncAcquireAuth(auth=static_async_auth_manager( ("user1", "pass1")) @@ -242,7 +249,6 @@ async def acquire1(pool_): cx1 = cx while len(pool_.cond._waiters) == 0: await asyncio.sleep(0) - cx.re_auth = mocker.Mock(spec=cx.re_auth) await pool_.release(cx) async def acquire2(pool_): @@ -255,5 +261,7 @@ async def acquire2(pool_): assert auth2 in cx.re_auth.call_args.args await pool_.release(cx) - async with AsyncFakeBoltPool((), max_connection_pool_size=1) as pool: + async with AsyncFakeBoltPool( + async_fake_connection_generator, (), max_connection_pool_size=1 + ) as pool: await asyncio.gather(acquire1(pool), acquire2(pool)) diff --git a/tests/unit/sync/io/test_direct.py b/tests/unit/sync/io/test_direct.py index 9f01fa4fe..9723d7f0c 100644 --- a/tests/unit/sync/io/test_direct.py +++ b/tests/unit/sync/io/test_direct.py @@ -34,67 +34,23 @@ from ...._async_compat import mark_sync_test -class FakeSocket: - def __init__(self, address): - self.address = address - - def getpeername(self): - return self.address - - def sendall(self, data): - return - - def close(self): - return - - -class QuickConnection: - def __init__(self, socket): - self.socket = socket - self.address = socket.getpeername() - self.local_port = self.address[1] - self.connection_id = "bolt-1234" - - @property - def is_reset(self): - return True - - def stale(self): - return False - - def reset(self): - pass - - def re_auth(self, auth, auth_manager, force=False): - return False - - def close(self): - self.socket.close() - - def closed(self): - return False - - def defunct(self): - return False - - def timedout(self): - return False - - def assert_re_auth_support(self): - pass - - class FakeBoltPool(IOPool): is_direct_pool = False - def __init__(self, address, *, auth=None, **config): + def __init__(self, connection_gen, address, *, auth=None, **config): + self.buffered_connection_mocks = [] config["auth"] = static_auth(None) self.pool_config, self.workspace_config = Config.consume_chain(config, PoolConfig, WorkspaceConfig) if config: raise ValueError("Unexpected config keys: %s" % ", ".join(config.keys())) def opener(addr, auth, timeout): - return QuickConnection(FakeSocket(addr)) + if self.buffered_connection_mocks: + mock = self.buffered_connection_mocks.pop() + else: + mock = connection_gen() + mock.address = addr + return mock super().__init__(opener, self.pool_config, self.workspace_config) self.address = address @@ -147,12 +103,14 @@ def test_bolt_connection_ping_timeout(): @pytest.fixture -def pool(): - with FakeBoltPool(("127.0.0.1", 7687)) as pool: +def pool(fake_connection_generator): + with FakeBoltPool( + fake_connection_generator, ("127.0.0.1", 7687) + ) as pool: yield pool -def assert_pool_size( address, expected_active, expected_inactive, pool): +def assert_pool_size(address, expected_active, expected_inactive, pool): try: connections = pool.connections[address] except KeyError: @@ -225,8 +183,10 @@ def test_pool_in_use_count(pool): @mark_sync_test -def test_pool_max_conn_pool_size(pool): - with FakeBoltPool((), max_connection_pool_size=1) as pool: +def test_pool_max_conn_pool_size(fake_connection_generator): + with FakeBoltPool( + fake_connection_generator, (), max_connection_pool_size=1 + ) as pool: address = neo4j.Address(("127.0.0.1", 7687)) pool._acquire(address, None, Deadline(0), None) assert pool.in_use_connection_count(address) == 1 @@ -237,22 +197,67 @@ def test_pool_max_conn_pool_size(pool): @pytest.mark.parametrize("is_reset", (True, False)) @mark_sync_test -def test_pool_reset_when_released(is_reset, pool, mocker): +def test_pool_reset_when_released( + is_reset, pool, fake_connection_generator +): + connection_mock = fake_connection_generator() + pool.buffered_connection_mocks.append(connection_mock) address = neo4j.Address(("127.0.0.1", 7687)) - quick_connection_name = QuickConnection.__name__ - is_reset_mock = mocker.patch( - f"{__name__}.{quick_connection_name}.is_reset", - new_callable=mocker.PropertyMock - ) - reset_mock = mocker.patch( - f"{__name__}.{quick_connection_name}.reset", - new_callable=mocker.MagicMock - ) + is_reset_mock = connection_mock.is_reset_mock + reset_mock = connection_mock.reset is_reset_mock.return_value = is_reset connection = pool._acquire(address, None, Deadline(3), None) - assert isinstance(connection, QuickConnection) assert is_reset_mock.call_count == 0 assert reset_mock.call_count == 0 pool.release(connection) assert is_reset_mock.call_count == 1 assert reset_mock.call_count == int(not is_reset) + + +@pytest.mark.parametrize("config_timeout", (None, 0, 0.2, 1234)) +@pytest.mark.parametrize("acquire_timeout", (None, 0, 0.2, 1234)) +@mark_sync_test +def test_liveness_check( + config_timeout, acquire_timeout, fake_connection_generator +): + effective_timeout = config_timeout + if acquire_timeout is not None: + effective_timeout = acquire_timeout + with FakeBoltPool( + fake_connection_generator, ("127.0.0.1", 7687), + liveness_check_timeout=config_timeout, + ) as pool: + address = neo4j.Address(("127.0.0.1", 7687)) + # pre-populate pool + cx1 = pool._acquire(address, None, Deadline(3), None) + pool.release(cx1) + cx1.reset.assert_not_called() + cx1.is_idle_for.assert_not_called() + + # simulate just before timeout + cx1.is_idle_for.return_value = False + + cx2 = pool._acquire(address, None, Deadline(3), acquire_timeout) + assert cx2 is cx1 + if effective_timeout is not None: + cx1.is_idle_for.assert_called_once_with(effective_timeout) + else: + cx1.is_idle_for.assert_not_called() + pool.release(cx1) + cx1.reset.assert_not_called() + + # simulate after timeout + cx1.is_idle_for.return_value = True + cx1.is_idle_for.reset_mock() + + cx2 = pool._acquire(address, None, Deadline(3), acquire_timeout) + assert cx2 is cx1 + if effective_timeout is not None: + cx1.is_idle_for.assert_called_once_with(effective_timeout) + cx1.reset.assert_called_once() + else: + cx1.is_idle_for.assert_not_called() + cx1.reset.assert_not_called() + cx1.reset.reset_mock() + pool.release(cx1) + cx1.reset.assert_not_called() diff --git a/tests/unit/sync/test_driver.py b/tests/unit/sync/test_driver.py index 2d9767d58..454fe3199 100644 --- a/tests/unit/sync/test_driver.py +++ b/tests/unit/sync/test_driver.py @@ -123,48 +123,48 @@ def test_routing_driver_constructor(protocol, host, port, params, auth_token): @pytest.mark.parametrize( ("test_config", "expected_failure", "expected_failure_message"), ( - ({"encrypted": False}, ConfigurationError, "The config settings"), - ({"encrypted": True}, ConfigurationError, "The config settings"), + ({"encrypted": False}, ConfigurationError, '"encrypted"'), + ({"encrypted": True}, ConfigurationError, '"encrypted"'), ( {"encrypted": True, "trust": TRUST_ALL_CERTIFICATES}, - ConfigurationError, "The config settings" + ConfigurationError, '"encrypted"' ), ( {"trust": TRUST_ALL_CERTIFICATES}, - ConfigurationError, "The config settings" + ConfigurationError, '"trust"' ), ( {"trust": TRUST_SYSTEM_CA_SIGNED_CERTIFICATES}, - ConfigurationError, "The config settings" + ConfigurationError, '"trust"' ), ( {"encrypted": True, "trusted_certificates": TrustAll()}, - ConfigurationError, "The config settings" + ConfigurationError, '"encrypted"' ), ( {"trusted_certificates": TrustAll()}, - ConfigurationError, "The config settings" + ConfigurationError, '"trusted_certificates"' ), ( {"trusted_certificates": TrustSystemCAs()}, - ConfigurationError, "The config settings" + ConfigurationError, '"trusted_certificates"' ), ( {"trusted_certificates": TrustCustomCAs("foo", "bar")}, - ConfigurationError, "The config settings" + ConfigurationError, '"trusted_certificates"' ), ( {"ssl_context": None}, - ConfigurationError, "The config settings" + ConfigurationError, '"ssl_context"' ), ( {"ssl_context": ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)}, - ConfigurationError, "The config settings" + ConfigurationError, '"ssl_context"' ), ) ) @mark_sync_test -def test_driver_config_error( +def test_driver_config_error_uri_conflict( test_uri, test_config, expected_failure, expected_failure_message ): def driver_builder(): @@ -175,7 +175,7 @@ def driver_builder(): return GraphDatabase.driver(test_uri, **test_config) if "+" in test_uri: - # `+s` and `+ssc` are short hand syntax for not having to configure the + # `+s` and `+ssc` are shorthand syntax for not having to configure the # encryption behavior of the driver. Specifying both is invalid. with pytest.raises(expected_failure, match=expected_failure_message): driver_builder() @@ -209,6 +209,22 @@ def test_driver_trust_config_error( GraphDatabase.driver("bolt://127.0.0.1:9001", **test_config) +@pytest.mark.parametrize( + ("test_config", "expected_failure", "expected_failure_message"), + ( + ( + {"liveness_check_timeout": -1}, + ConfigurationError, '"liveness_check_timeout"' + ), + ) +) +def test_driver_liveness_timeout_config_error( + test_config, expected_failure, expected_failure_message +): + with pytest.raises(expected_failure, match=expected_failure_message): + GraphDatabase.driver("bolt://127.0.0.1:9001", **test_config) + + @pytest.mark.parametrize("uri", ( "bolt://127.0.0.1:9000", "neo4j://127.0.0.1:9000",