diff --git a/CHANGELOG.md b/CHANGELOG.md index a695e4207..e61d40cb0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -123,6 +123,11 @@ `protocol_version` and `init_size`. - Introduced `neo4j.exceptions.SessionError` that is raised when trying to execute work on a closed or otherwise terminated session. +- Removed deprecated config options `update_routing_table_timeout` and + `session_connection_timeout`. + Server-side keep-alives communicated through configuration hints together with + the config option `connection_acquisition_timeout` are sufficient to avoid the + driver getting stuck. ## Version 4.4 diff --git a/neo4j/_async/driver.py b/neo4j/_async/driver.py index 6107f17d6..98647a513 100644 --- a/neo4j/_async/driver.py +++ b/neo4j/_async/driver.py @@ -80,7 +80,6 @@ def driver( max_connection_lifetime: float = ..., max_connection_pool_size: int = ..., connection_timeout: float = ..., - update_routing_table_timeout: float = ..., trust: t.Union[ te.Literal["TRUST_ALL_CERTIFICATES"], te.Literal["TRUST_SYSTEM_CA_SIGNED_CERTIFICATES"] @@ -97,7 +96,6 @@ def driver( # undocumented/unsupported options # they may be change or removed any time without prior notice - session_connection_timeout: float = ..., connection_acquisition_timeout: float = ..., max_transaction_retry_time: float = ..., initial_retry_delay: float = ..., @@ -339,7 +337,6 @@ def encrypted(self) -> bool: def session( self, - session_connection_timeout: float = ..., connection_acquisition_timeout: float = ..., max_transaction_retry_time: float = ..., database: t.Optional[str] = ..., diff --git a/neo4j/_async/io/_pool.py b/neo4j/_async/io/_pool.py index 9b9118ff6..82ff5ed60 100644 --- a/neo4j/_async/io/_pool.py +++ b/neo4j/_async/io/_pool.py @@ -33,7 +33,6 @@ AsyncRLock, ) from ..._async_compat.network import AsyncNetworkUtil -from ..._async_compat.util import AsyncUtil from ..._conf import ( PoolConfig, WorkspaceConfig, @@ -41,8 +40,6 @@ from ..._deadline import ( connection_deadline, Deadline, - merge_deadlines, - merge_deadlines_and_timeouts, ) from ..._exceptions import BoltError from ..._routing import RoutingTable @@ -222,18 +219,18 @@ async def health_check(connection_, deadline_): @abc.abstractmethod async def acquire( - self, access_mode, timeout, acquisition_timeout, - database, bookmarks, liveness_check_timeout + self, access_mode, timeout, database, bookmarks, liveness_check_timeout ): """ Acquire a connection to a server that can satisfy a set of parameters. :param access_mode: - :param timeout: total timeout (including potential preparation) - :param acquisition_timeout: timeout for actually acquiring a connection + :param timeout: timeout for the core acquisition + (excluding potential preparation like fetching routing tables). :param database: :param bookmarks: :param liveness_check_timeout: """ + ... def kill_and_release(self, *connections): """ Release connections back into the pool after closing them. @@ -397,12 +394,11 @@ def __repr__(self): self.address) async def acquire( - self, access_mode, timeout, acquisition_timeout, - database, bookmarks, liveness_check_timeout + self, access_mode, timeout, database, bookmarks, liveness_check_timeout ): # The access_mode and database is not needed for a direct connection, # it's just there for consistency. - deadline = merge_deadlines_and_timeouts(timeout, acquisition_timeout) + deadline = Deadline.from_timeout_or_deadline(timeout) return await self._acquire( self.address, deadline, liveness_check_timeout ) @@ -464,22 +460,6 @@ def __repr__(self): """ return "<{} addresses={!r}>".format(self.__class__.__name__, self.get_default_database_initial_router_addresses()) - @asynccontextmanager - async def _refresh_lock_deadline(self, deadline): - timeout = deadline.to_timeout() - if timeout == float("inf"): - timeout = -1 - if not await self.refresh_lock.acquire(timeout=timeout): - raise ClientError( - "pool failed to update routing table within {!r}s (timeout)" - .format(deadline.original_timeout) - ) - - try: - yield - finally: - self.refresh_lock.release() - @property def first_initial_routing_address(self): return self.get_default_database_initial_router_addresses()[0] @@ -513,7 +493,7 @@ async def get_or_create_routing_table(self, database): return self.routing_tables[database] async def fetch_routing_info( - self, address, database, imp_user, bookmarks, deadline + self, address, database, imp_user, bookmarks, acquisition_timeout ): """ Fetch raw routing info from a given router address. @@ -524,32 +504,32 @@ async def fetch_routing_info( :type imp_user: str or None :param bookmarks: iterable of bookmark values after which the routing info should be fetched - :param deadline: connection acquisition deadline + :param acquisition_timeout: connection acquisition timeout :return: list of routing records, or None if no connection could be established or if no readers or writers are present :raise ServiceUnavailable: if the server does not support routing, or if routing support is broken or outdated """ + deadline = Deadline.from_timeout_or_deadline(acquisition_timeout) cx = await self._acquire(address, deadline, None) try: - with connection_deadline(cx, deadline): - routing_table = await cx.route( - database or self.workspace_config.database, - imp_user or self.workspace_config.impersonated_user, - bookmarks - ) + routing_table = await cx.route( + database or self.workspace_config.database, + imp_user or self.workspace_config.impersonated_user, + bookmarks + ) finally: await self.release(cx) return routing_table async def fetch_routing_table( - self, *, address, deadline, database, imp_user, bookmarks + self, *, address, acquisition_timeout, database, imp_user, bookmarks ): """ Fetch a routing table from a given router address. :param address: router address - :param deadline: deadline + :param acquisition_timeout: connection acquisition timeout :param database: the database name :type: str :param imp_user: the user to impersonate while fetching the routing @@ -563,7 +543,7 @@ async def fetch_routing_table( new_routing_info = None try: new_routing_info = await self.fetch_routing_info( - address, database, imp_user, bookmarks, deadline + address, database, imp_user, bookmarks, acquisition_timeout ) except Neo4jError as e: # checks if the code is an error that is caused by the client. In @@ -606,7 +586,7 @@ async def fetch_routing_table( return new_routing_table async def _update_routing_table_from( - self, *routers, database, imp_user, bookmarks, deadline, + self, *routers, database, imp_user, bookmarks, acquisition_timeout, database_callback ): """ Try to update routing tables with the given routers. @@ -621,11 +601,8 @@ async def _update_routing_table_from( async for address in AsyncNetworkUtil.resolve_address( router, resolver=self.pool_config.resolver ): - if deadline.expired(): - return False new_routing_table = await self.fetch_routing_table( - address=address, - deadline=deadline, + address=address, acquisition_timeout=acquisition_timeout, database=database, imp_user=imp_user, bookmarks=bookmarks ) if new_routing_table is not None: @@ -645,7 +622,7 @@ async def _update_routing_table_from( return False async def update_routing_table( - self, *, database, imp_user, bookmarks, timeout=None, + self, *, database, imp_user, bookmarks, acquisition_timeout=None, database_callback=None ): """ Update the routing table from the first router able to provide @@ -656,7 +633,7 @@ async def update_routing_table( table :type imp_user: str or None :param bookmarks: bookmarks used when fetching routing table - :param timeout: timeout in seconds for how long to try updating + :param acquisition_timeout: connection acquisition timeout :param database_callback: A callback function that will be called with the database name as only argument when a new routing table has been acquired. This database name might different from `database` if that @@ -665,10 +642,7 @@ async def update_routing_table( :raise neo4j.exceptions.ServiceUnavailable: """ - deadline = merge_deadlines_and_timeouts( - timeout, self.pool_config.update_routing_table_timeout - ) - async with self._refresh_lock_deadline(deadline): + async with self.refresh_lock: routing_table = await self.get_or_create_routing_table(database) # copied because it can be modified existing_routers = set(routing_table.routers) @@ -681,14 +655,16 @@ async def update_routing_table( if await self._update_routing_table_from( self.first_initial_routing_address, database=database, imp_user=imp_user, bookmarks=bookmarks, - deadline=deadline, database_callback=database_callback + acquisition_timeout=acquisition_timeout, + database_callback=database_callback ): # Why is only the first initial routing address used? return if await self._update_routing_table_from( *(existing_routers - {self.first_initial_routing_address}), database=database, imp_user=imp_user, bookmarks=bookmarks, - deadline=deadline, database_callback=database_callback + acquisition_timeout=acquisition_timeout, + database_callback=database_callback ): return @@ -696,7 +672,7 @@ async def update_routing_table( if await self._update_routing_table_from( self.first_initial_routing_address, database=database, imp_user=imp_user, bookmarks=bookmarks, - deadline=deadline, + acquisition_timeout=acquisition_timeout, database_callback=database_callback ): # Why is only the first initial routing address used? @@ -714,8 +690,8 @@ async def update_connection_pool(self, *, database): await super(AsyncNeo4jPool, self).deactivate(address) async def ensure_routing_table_is_fresh( - self, *, access_mode, database, imp_user, bookmarks, deadline=None, - database_callback=None + self, *, access_mode, database, imp_user, bookmarks, + acquisition_timeout=None, database_callback=None ): """ Update the routing table if stale. @@ -730,7 +706,7 @@ async def ensure_routing_table_is_fresh( :return: `True` if an update was required, `False` otherwise. """ from neo4j.api import READ_ACCESS - async with self._refresh_lock_deadline(deadline): + async with self.refresh_lock: routing_table = await self.get_or_create_routing_table(database) if routing_table.is_fresh(readonly=(access_mode == READ_ACCESS)): # Readers are fresh. @@ -738,7 +714,8 @@ async def ensure_routing_table_is_fresh( await self.update_routing_table( database=database, imp_user=imp_user, bookmarks=bookmarks, - timeout=deadline, database_callback=database_callback + acquisition_timeout=acquisition_timeout, + database_callback=database_callback ) await self.update_connection_pool(database=database) @@ -778,34 +755,24 @@ async def _select_address(self, *, access_mode, database): return choice(addresses_by_usage[min(addresses_by_usage)]) async def acquire( - self, access_mode, timeout, acquisition_timeout, - database, bookmarks, liveness_check_timeout + self, access_mode, timeout, database, bookmarks, liveness_check_timeout ): if access_mode not in (WRITE_ACCESS, READ_ACCESS): raise ClientError("Non valid 'access_mode'; {}".format(access_mode)) if not timeout: raise ClientError("'timeout' must be a float larger than 0; {}" .format(timeout)) - if not acquisition_timeout: - raise ClientError("'acquisition_timeout' must be a float larger " - "than 0; {}".format(acquisition_timeout)) - deadline = Deadline.from_timeout_or_deadline(timeout) from neo4j.api import check_access_mode access_mode = check_access_mode(access_mode) - async with self._refresh_lock_deadline(deadline): + async with self.refresh_lock: log.debug("[#0000] C: %r", self.routing_tables) await self.ensure_routing_table_is_fresh( access_mode=access_mode, database=database, imp_user=None, - bookmarks=bookmarks, deadline=deadline + bookmarks=bookmarks, acquisition_timeout=timeout ) - # Making sure the routing table is fresh is not considered part of the - # connection acquisition. Hence, the acquisition_timeout starts now! - deadline = merge_deadlines( - deadline, Deadline.from_timeout_or_deadline(acquisition_timeout) - ) while True: try: # Get an address for a connection that have the fewest in-use @@ -817,6 +784,7 @@ async def acquire( raise SessionExpired("Failed to obtain connection towards '%s' server." % access_mode) from err try: log.debug("[#0000] C: database=%r address=%r", database, address) + deadline = Deadline.from_timeout_or_deadline(timeout) # should always be a resolved address connection = await self._acquire( address, deadline, liveness_check_timeout diff --git a/neo4j/_async/work/workspace.py b/neo4j/_async/work/workspace.py index 05b7088d4..cd6e17b46 100644 --- a/neo4j/_async/work/workspace.py +++ b/neo4j/_async/work/workspace.py @@ -78,7 +78,7 @@ def _set_cached_database(self, database): self._config.database = database async def _connect(self, access_mode, **acquire_kwargs): - timeout = Deadline(self._config.session_connection_timeout) + acquisition_timeout = self._config.connection_acquisition_timeout if self._connection: # TODO: Investigate this # log.warning("FIXME: should always disconnect before connect") @@ -100,13 +100,12 @@ async def _connect(self, access_mode, **acquire_kwargs): database=self._config.database, imp_user=self._config.impersonated_user, bookmarks=self._bookmarks, - timeout=timeout, + acquisition_timeout=acquisition_timeout, database_callback=self._set_cached_database ) acquire_kwargs_ = { "access_mode": access_mode, - "timeout": timeout, - "acquisition_timeout": self._config.connection_acquisition_timeout, + "timeout": acquisition_timeout, "database": self._config.database, "bookmarks": self._bookmarks, "liveness_check_timeout": None, diff --git a/neo4j/_conf.py b/neo4j/_conf.py index 0912efe89..79cd95d8d 100644 --- a/neo4j/_conf.py +++ b/neo4j/_conf.py @@ -295,13 +295,6 @@ class PoolConfig(Config): connection_timeout = 30.0 # seconds # The maximum amount of time to wait for a TCP connection to be established. - #: Update Routing Table Timout - update_routing_table_timeout = 90.0 # seconds - # The maximum amount of time to wait for updating the routing table. - # This includes everything necessary for this to happen. - # Including opening sockets, requesting and receiving the routing table, - # etc. - #: Trust trust = DeprecatedAlternative( "trusted_certificates", _trust_to_trusted_certificates @@ -392,12 +385,6 @@ class WorkspaceConfig(Config): """ WorkSpace configuration. """ - #: Session Connection Timeout - session_connection_timeout = 120.0 # seconds - # The maximum amount of time to wait for a session to obtain a usable - # read/write connection. This includes everything necessary for this to - # happen. Including fetching routing tables, opening sockets, etc. - #: Connection Acquisition Timeout connection_acquisition_timeout = 60.0 # seconds # The maximum amount of time a session will wait when requesting a connection from the connection pool. diff --git a/neo4j/_sync/driver.py b/neo4j/_sync/driver.py index 5834b46ba..50fbda656 100644 --- a/neo4j/_sync/driver.py +++ b/neo4j/_sync/driver.py @@ -80,7 +80,6 @@ def driver( max_connection_lifetime: float = ..., max_connection_pool_size: int = ..., connection_timeout: float = ..., - update_routing_table_timeout: float = ..., trust: t.Union[ te.Literal["TRUST_ALL_CERTIFICATES"], te.Literal["TRUST_SYSTEM_CA_SIGNED_CERTIFICATES"] @@ -97,7 +96,6 @@ def driver( # undocumented/unsupported options # they may be change or removed any time without prior notice - session_connection_timeout: float = ..., connection_acquisition_timeout: float = ..., max_transaction_retry_time: float = ..., initial_retry_delay: float = ..., @@ -339,7 +337,6 @@ def encrypted(self) -> bool: def session( self, - session_connection_timeout: float = ..., connection_acquisition_timeout: float = ..., max_transaction_retry_time: float = ..., database: t.Optional[str] = ..., diff --git a/neo4j/_sync/io/_pool.py b/neo4j/_sync/io/_pool.py index 61ad373f7..d3ea665f4 100644 --- a/neo4j/_sync/io/_pool.py +++ b/neo4j/_sync/io/_pool.py @@ -33,7 +33,6 @@ RLock, ) from ..._async_compat.network import NetworkUtil -from ..._async_compat.util import Util from ..._conf import ( PoolConfig, WorkspaceConfig, @@ -41,8 +40,6 @@ from ..._deadline import ( connection_deadline, Deadline, - merge_deadlines, - merge_deadlines_and_timeouts, ) from ..._exceptions import BoltError from ..._routing import RoutingTable @@ -222,18 +219,18 @@ def health_check(connection_, deadline_): @abc.abstractmethod def acquire( - self, access_mode, timeout, acquisition_timeout, - database, bookmarks, liveness_check_timeout + self, access_mode, timeout, database, bookmarks, liveness_check_timeout ): """ Acquire a connection to a server that can satisfy a set of parameters. :param access_mode: - :param timeout: total timeout (including potential preparation) - :param acquisition_timeout: timeout for actually acquiring a connection + :param timeout: timeout for the core acquisition + (excluding potential preparation like fetching routing tables). :param database: :param bookmarks: :param liveness_check_timeout: """ + ... def kill_and_release(self, *connections): """ Release connections back into the pool after closing them. @@ -397,12 +394,11 @@ def __repr__(self): self.address) def acquire( - self, access_mode, timeout, acquisition_timeout, - database, bookmarks, liveness_check_timeout + self, access_mode, timeout, database, bookmarks, liveness_check_timeout ): # The access_mode and database is not needed for a direct connection, # it's just there for consistency. - deadline = merge_deadlines_and_timeouts(timeout, acquisition_timeout) + deadline = Deadline.from_timeout_or_deadline(timeout) return self._acquire( self.address, deadline, liveness_check_timeout ) @@ -464,22 +460,6 @@ def __repr__(self): """ return "<{} addresses={!r}>".format(self.__class__.__name__, self.get_default_database_initial_router_addresses()) - @contextmanager - def _refresh_lock_deadline(self, deadline): - timeout = deadline.to_timeout() - if timeout == float("inf"): - timeout = -1 - if not self.refresh_lock.acquire(timeout=timeout): - raise ClientError( - "pool failed to update routing table within {!r}s (timeout)" - .format(deadline.original_timeout) - ) - - try: - yield - finally: - self.refresh_lock.release() - @property def first_initial_routing_address(self): return self.get_default_database_initial_router_addresses()[0] @@ -513,7 +493,7 @@ def get_or_create_routing_table(self, database): return self.routing_tables[database] def fetch_routing_info( - self, address, database, imp_user, bookmarks, deadline + self, address, database, imp_user, bookmarks, acquisition_timeout ): """ Fetch raw routing info from a given router address. @@ -524,32 +504,32 @@ def fetch_routing_info( :type imp_user: str or None :param bookmarks: iterable of bookmark values after which the routing info should be fetched - :param deadline: connection acquisition deadline + :param acquisition_timeout: connection acquisition timeout :return: list of routing records, or None if no connection could be established or if no readers or writers are present :raise ServiceUnavailable: if the server does not support routing, or if routing support is broken or outdated """ + deadline = Deadline.from_timeout_or_deadline(acquisition_timeout) cx = self._acquire(address, deadline, None) try: - with connection_deadline(cx, deadline): - routing_table = cx.route( - database or self.workspace_config.database, - imp_user or self.workspace_config.impersonated_user, - bookmarks - ) + routing_table = cx.route( + database or self.workspace_config.database, + imp_user or self.workspace_config.impersonated_user, + bookmarks + ) finally: self.release(cx) return routing_table def fetch_routing_table( - self, *, address, deadline, database, imp_user, bookmarks + self, *, address, acquisition_timeout, database, imp_user, bookmarks ): """ Fetch a routing table from a given router address. :param address: router address - :param deadline: deadline + :param acquisition_timeout: connection acquisition timeout :param database: the database name :type: str :param imp_user: the user to impersonate while fetching the routing @@ -563,7 +543,7 @@ def fetch_routing_table( new_routing_info = None try: new_routing_info = self.fetch_routing_info( - address, database, imp_user, bookmarks, deadline + address, database, imp_user, bookmarks, acquisition_timeout ) except Neo4jError as e: # checks if the code is an error that is caused by the client. In @@ -606,7 +586,7 @@ def fetch_routing_table( return new_routing_table def _update_routing_table_from( - self, *routers, database, imp_user, bookmarks, deadline, + self, *routers, database, imp_user, bookmarks, acquisition_timeout, database_callback ): """ Try to update routing tables with the given routers. @@ -621,11 +601,8 @@ def _update_routing_table_from( for address in NetworkUtil.resolve_address( router, resolver=self.pool_config.resolver ): - if deadline.expired(): - return False new_routing_table = self.fetch_routing_table( - address=address, - deadline=deadline, + address=address, acquisition_timeout=acquisition_timeout, database=database, imp_user=imp_user, bookmarks=bookmarks ) if new_routing_table is not None: @@ -645,7 +622,7 @@ def _update_routing_table_from( return False def update_routing_table( - self, *, database, imp_user, bookmarks, timeout=None, + self, *, database, imp_user, bookmarks, acquisition_timeout=None, database_callback=None ): """ Update the routing table from the first router able to provide @@ -656,7 +633,7 @@ def update_routing_table( table :type imp_user: str or None :param bookmarks: bookmarks used when fetching routing table - :param timeout: timeout in seconds for how long to try updating + :param acquisition_timeout: connection acquisition timeout :param database_callback: A callback function that will be called with the database name as only argument when a new routing table has been acquired. This database name might different from `database` if that @@ -665,10 +642,7 @@ def update_routing_table( :raise neo4j.exceptions.ServiceUnavailable: """ - deadline = merge_deadlines_and_timeouts( - timeout, self.pool_config.update_routing_table_timeout - ) - with self._refresh_lock_deadline(deadline): + with self.refresh_lock: routing_table = self.get_or_create_routing_table(database) # copied because it can be modified existing_routers = set(routing_table.routers) @@ -681,14 +655,16 @@ def update_routing_table( if self._update_routing_table_from( self.first_initial_routing_address, database=database, imp_user=imp_user, bookmarks=bookmarks, - deadline=deadline, database_callback=database_callback + acquisition_timeout=acquisition_timeout, + database_callback=database_callback ): # Why is only the first initial routing address used? return if self._update_routing_table_from( *(existing_routers - {self.first_initial_routing_address}), database=database, imp_user=imp_user, bookmarks=bookmarks, - deadline=deadline, database_callback=database_callback + acquisition_timeout=acquisition_timeout, + database_callback=database_callback ): return @@ -696,7 +672,7 @@ def update_routing_table( if self._update_routing_table_from( self.first_initial_routing_address, database=database, imp_user=imp_user, bookmarks=bookmarks, - deadline=deadline, + acquisition_timeout=acquisition_timeout, database_callback=database_callback ): # Why is only the first initial routing address used? @@ -714,8 +690,8 @@ def update_connection_pool(self, *, database): super(Neo4jPool, self).deactivate(address) def ensure_routing_table_is_fresh( - self, *, access_mode, database, imp_user, bookmarks, deadline=None, - database_callback=None + self, *, access_mode, database, imp_user, bookmarks, + acquisition_timeout=None, database_callback=None ): """ Update the routing table if stale. @@ -730,7 +706,7 @@ def ensure_routing_table_is_fresh( :return: `True` if an update was required, `False` otherwise. """ from neo4j.api import READ_ACCESS - with self._refresh_lock_deadline(deadline): + with self.refresh_lock: routing_table = self.get_or_create_routing_table(database) if routing_table.is_fresh(readonly=(access_mode == READ_ACCESS)): # Readers are fresh. @@ -738,7 +714,8 @@ def ensure_routing_table_is_fresh( self.update_routing_table( database=database, imp_user=imp_user, bookmarks=bookmarks, - timeout=deadline, database_callback=database_callback + acquisition_timeout=acquisition_timeout, + database_callback=database_callback ) self.update_connection_pool(database=database) @@ -778,34 +755,24 @@ def _select_address(self, *, access_mode, database): return choice(addresses_by_usage[min(addresses_by_usage)]) def acquire( - self, access_mode, timeout, acquisition_timeout, - database, bookmarks, liveness_check_timeout + self, access_mode, timeout, database, bookmarks, liveness_check_timeout ): if access_mode not in (WRITE_ACCESS, READ_ACCESS): raise ClientError("Non valid 'access_mode'; {}".format(access_mode)) if not timeout: raise ClientError("'timeout' must be a float larger than 0; {}" .format(timeout)) - if not acquisition_timeout: - raise ClientError("'acquisition_timeout' must be a float larger " - "than 0; {}".format(acquisition_timeout)) - deadline = Deadline.from_timeout_or_deadline(timeout) from neo4j.api import check_access_mode access_mode = check_access_mode(access_mode) - with self._refresh_lock_deadline(deadline): + with self.refresh_lock: log.debug("[#0000] C: %r", self.routing_tables) self.ensure_routing_table_is_fresh( access_mode=access_mode, database=database, imp_user=None, - bookmarks=bookmarks, deadline=deadline + bookmarks=bookmarks, acquisition_timeout=timeout ) - # Making sure the routing table is fresh is not considered part of the - # connection acquisition. Hence, the acquisition_timeout starts now! - deadline = merge_deadlines( - deadline, Deadline.from_timeout_or_deadline(acquisition_timeout) - ) while True: try: # Get an address for a connection that have the fewest in-use @@ -817,6 +784,7 @@ def acquire( raise SessionExpired("Failed to obtain connection towards '%s' server." % access_mode) from err try: log.debug("[#0000] C: database=%r address=%r", database, address) + deadline = Deadline.from_timeout_or_deadline(timeout) # should always be a resolved address connection = self._acquire( address, deadline, liveness_check_timeout diff --git a/neo4j/_sync/work/workspace.py b/neo4j/_sync/work/workspace.py index 7c3f5865b..c780241a5 100644 --- a/neo4j/_sync/work/workspace.py +++ b/neo4j/_sync/work/workspace.py @@ -78,7 +78,7 @@ def _set_cached_database(self, database): self._config.database = database def _connect(self, access_mode, **acquire_kwargs): - timeout = Deadline(self._config.session_connection_timeout) + acquisition_timeout = self._config.connection_acquisition_timeout if self._connection: # TODO: Investigate this # log.warning("FIXME: should always disconnect before connect") @@ -100,13 +100,12 @@ def _connect(self, access_mode, **acquire_kwargs): database=self._config.database, imp_user=self._config.impersonated_user, bookmarks=self._bookmarks, - timeout=timeout, + acquisition_timeout=acquisition_timeout, database_callback=self._set_cached_database ) acquire_kwargs_ = { "access_mode": access_mode, - "timeout": timeout, - "acquisition_timeout": self._config.connection_acquisition_timeout, + "timeout": acquisition_timeout, "database": self._config.database, "bookmarks": self._bookmarks, "liveness_check_timeout": None, diff --git a/testkitbackend/_async/requests.py b/testkitbackend/_async/requests.py index 0ea955cb6..66da9fe17 100644 --- a/testkitbackend/_async/requests.py +++ b/testkitbackend/_async/requests.py @@ -123,11 +123,12 @@ async def NewDriver(backend, data): ("connectionTimeoutMs", "connection_timeout"), ("maxTxRetryTimeMs", "max_transaction_retry_time"), ("connectionAcquisitionTimeoutMs", "connection_acquisition_timeout"), - ("sessionConnectionTimeoutMs", "session_connection_timeout"), - ("updateRoutingTableTimeoutMs", "update_routing_table_timeout"), ): if data.get(timeout_testkit) is not None: kwargs[timeout_driver] = data[timeout_testkit] / 1000 + for k in ("sessionConnectionTimeoutMs", "updateRoutingTableTimeoutMs"): + if k in data: + data.mark_item_as_read_if_equals(k, None) if data.get("maxConnectionPoolSize"): kwargs["max_connection_pool_size"] = data["maxConnectionPoolSize"] if data.get("fetchSize"): diff --git a/testkitbackend/_sync/requests.py b/testkitbackend/_sync/requests.py index 776cfa95a..c033acff5 100644 --- a/testkitbackend/_sync/requests.py +++ b/testkitbackend/_sync/requests.py @@ -123,11 +123,12 @@ def NewDriver(backend, data): ("connectionTimeoutMs", "connection_timeout"), ("maxTxRetryTimeMs", "max_transaction_retry_time"), ("connectionAcquisitionTimeoutMs", "connection_acquisition_timeout"), - ("sessionConnectionTimeoutMs", "session_connection_timeout"), - ("updateRoutingTableTimeoutMs", "update_routing_table_timeout"), ): if data.get(timeout_testkit) is not None: kwargs[timeout_driver] = data[timeout_testkit] / 1000 + for k in ("sessionConnectionTimeoutMs", "updateRoutingTableTimeoutMs"): + if k in data: + data.mark_item_as_read_if_equals(k, None) if data.get("maxConnectionPoolSize"): kwargs["max_connection_pool_size"] = data["maxConnectionPoolSize"] if data.get("fetchSize"): diff --git a/testkitbackend/test_config.json b/testkitbackend/test_config.json index da189e1cf..8f72b4879 100644 --- a/testkitbackend/test_config.json +++ b/testkitbackend/test_config.json @@ -25,12 +25,10 @@ "Feature:API:Result.Peek": true, "Feature:API:Result.Single": true, "Feature:API:Result.SingleOptional": true, - "Feature:API:SessionConnectionTimeout": true, "Feature:API:SSLConfig": true, "Feature:API:SSLSchemes": true, "Feature:API:Type.Spatial": true, "Feature:API:Type.Temporal": true, - "Feature:API:UpdateRoutingTableTimeout": true, "Feature:Auth:Bearer": true, "Feature:Auth:Custom": true, "Feature:Auth:Kerberos": true, diff --git a/tests/integration/mixed/test_async_cancellation.py b/tests/integration/mixed/test_async_cancellation.py index 496d7ab1e..abb8ff3f1 100644 --- a/tests/integration/mixed/test_async_cancellation.py +++ b/tests/integration/mixed/test_async_cancellation.py @@ -119,7 +119,7 @@ async def test_async_cancellation( uri, auth, mocker, read_func, waits, cancel_count, i ): async with get_async_driver_no_warning( - uri, auth=auth, session_connection_timeout=10 + uri, auth=auth, connection_acquisition_timeout=10 ) as driver: async with driver.session() as session: session._handle_cancellation = mocker.Mock( @@ -183,7 +183,7 @@ async def test_async_cancellation( async def test_async_cancellation_does_not_leak(uri, auth): async with get_async_driver_no_warning( uri, auth=auth, - session_connection_timeout=10, + connection_acquisition_timeout=10, # driver needs to cope with a single connection in the pool! max_connection_pool_size=1, ) as driver: diff --git a/tests/unit/async_/io/test_direct.py b/tests/unit/async_/io/test_direct.py index ec3c40b7a..d8cfbf625 100644 --- a/tests/unit/async_/io/test_direct.py +++ b/tests/unit/async_/io/test_direct.py @@ -91,8 +91,7 @@ async def opener(addr, timeout): self.address = address async def acquire( - self, access_mode, timeout, acquisition_timeout, - database, bookmarks, liveness_check_timeout + self, access_mode, timeout, database, bookmarks, liveness_check_timeout ): return await self._acquire( self.address, timeout, liveness_check_timeout diff --git a/tests/unit/async_/io/test_neo4j_pool.py b/tests/unit/async_/io/test_neo4j_pool.py index 44b1931f4..07b136551 100644 --- a/tests/unit/async_/io/test_neo4j_pool.py +++ b/tests/unit/async_/io/test_neo4j_pool.py @@ -76,13 +76,13 @@ async def test_acquires_new_routing_table_if_deleted(opener): pool = AsyncNeo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx = await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) await pool.release(cx) assert pool.routing_tables.get("test_db") del pool.routing_tables["test_db"] - cx = await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) await pool.release(cx) assert pool.routing_tables.get("test_db") @@ -92,14 +92,14 @@ async def test_acquires_new_routing_table_if_stale(opener): pool = AsyncNeo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx = await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) await pool.release(cx) assert pool.routing_tables.get("test_db") old_value = pool.routing_tables["test_db"].last_updated_time pool.routing_tables["test_db"].ttl = 0 - cx = await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) await pool.release(cx) assert pool.routing_tables["test_db"].last_updated_time > old_value @@ -109,10 +109,10 @@ async def test_removes_old_routing_table(opener): pool = AsyncNeo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx = await pool.acquire(READ_ACCESS, 30, 60, "test_db1", None, None) + cx = await pool.acquire(READ_ACCESS, 30, "test_db1", None, None) await pool.release(cx) assert pool.routing_tables.get("test_db1") - cx = await pool.acquire(READ_ACCESS, 30, 60, "test_db2", None, None) + cx = await pool.acquire(READ_ACCESS, 30, "test_db2", None, None) await pool.release(cx) assert pool.routing_tables.get("test_db2") @@ -121,7 +121,7 @@ async def test_removes_old_routing_table(opener): pool.routing_tables["test_db2"].ttl = \ -RoutingConfig.routing_table_purge_delay - cx = await pool.acquire(READ_ACCESS, 30, 60, "test_db1", None, None) + cx = await pool.acquire(READ_ACCESS, 30, "test_db1", None, None) await pool.release(cx) assert pool.routing_tables["test_db1"].last_updated_time > old_value assert "test_db2" not in pool.routing_tables @@ -135,7 +135,7 @@ async def test_chooses_right_connection_type(opener, type_): ) cx1 = await pool.acquire( READ_ACCESS if type_ == "r" else WRITE_ACCESS, - 30, 60, "test_db", None, None + 30, "test_db", None, None ) await pool.release(cx1) if type_ == "r": @@ -149,9 +149,9 @@ async def test_reuses_connection(opener): pool = AsyncNeo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx1 = await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx1 = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) await pool.release(cx1) - cx2 = await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx2 = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) assert cx1 is cx2 @@ -169,7 +169,7 @@ async def break_connection(): pool = AsyncNeo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx1 = await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx1 = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) await pool.release(cx1) assert cx1 in pool.connections[cx1.addr] # simulate connection going stale (e.g. exceeding) and then breaking when @@ -179,7 +179,7 @@ async def break_connection(): if break_on_close: cx_close_mock_side_effect = cx_close_mock.side_effect cx_close_mock.side_effect = break_connection - cx2 = await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx2 = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) await pool.release(cx2) if break_on_close: cx1.close.assert_called() @@ -196,11 +196,11 @@ async def test_does_not_close_stale_connections_in_use(opener): pool = AsyncNeo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx1 = await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx1 = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) assert cx1 in pool.connections[cx1.addr] # simulate connection going stale (e.g. exceeding) while being in use cx1.stale.return_value = True - cx2 = await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx2 = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) await pool.release(cx2) cx1.close.assert_not_called() assert cx2 is not cx1 @@ -213,7 +213,7 @@ async def test_does_not_close_stale_connections_in_use(opener): # it should be closed when trying to acquire the next connection cx1.close.assert_not_called() - cx3 = await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx3 = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) await pool.release(cx3) cx1.close.assert_called_once() assert cx2 is cx3 @@ -227,7 +227,7 @@ async def test_release_resets_connections(opener): pool = AsyncNeo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx1 = await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx1 = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) cx1.is_reset_mock.return_value = False cx1.is_reset_mock.reset_mock() await pool.release(cx1) @@ -240,7 +240,7 @@ async def test_release_does_not_resets_closed_connections(opener): pool = AsyncNeo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx1 = await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx1 = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) cx1.closed.return_value = True cx1.closed.reset_mock() cx1.is_reset_mock.reset_mock() @@ -255,7 +255,7 @@ async def test_release_does_not_resets_defunct_connections(opener): pool = AsyncNeo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx1 = await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx1 = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) cx1.defunct.return_value = True cx1.defunct.reset_mock() cx1.is_reset_mock.reset_mock() @@ -408,8 +408,8 @@ async def close_side_effect(): pool = AsyncNeo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx1 = await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) - cx2 = await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx1 = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) + cx2 = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) await pool.release(cx1) await pool.release(cx2) @@ -421,7 +421,7 @@ async def close_side_effect(): # unreachable cx1.stale.return_value = True - cx3 = await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx3 = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) assert cx3 is not cx1 assert cx3 is not cx2 @@ -432,9 +432,9 @@ async def test_failing_opener_leaves_connections_in_use_alone(opener): pool = AsyncNeo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx1 = await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx1 = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) opener.side_effect = ServiceUnavailable("Server overloaded") with pytest.raises((ServiceUnavailable, SessionExpired)): - await pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + await pool.acquire(READ_ACCESS, 30, "test_db", None, None) assert not cx1.closed() diff --git a/tests/unit/async_/test_driver.py b/tests/unit/async_/test_driver.py index d263ca23e..a353857a8 100644 --- a/tests/unit/async_/test_driver.py +++ b/tests/unit/async_/test_driver.py @@ -208,7 +208,6 @@ async def test_driver_opens_write_session_by_default(uri, mocker): acquire_mock.assert_called_once_with( access_mode=WRITE_ACCESS, timeout=mocker.ANY, - acquisition_timeout=mocker.ANY, database=mocker.ANY, bookmarks=mocker.ANY, liveness_check_timeout=mocker.ANY diff --git a/tests/unit/common/test_conf.py b/tests/unit/common/test_conf.py index 684dbaa35..72ee07f2d 100644 --- a/tests/unit/common/test_conf.py +++ b/tests/unit/common/test_conf.py @@ -45,7 +45,6 @@ test_pool_config = { "connection_timeout": 30.0, - "update_routing_table_timeout": 90.0, "keep_alive": True, "max_connection_lifetime": 3600, "max_connection_pool_size": 100, @@ -57,7 +56,6 @@ } test_session_config = { - "session_connection_timeout": 120.0, "connection_acquisition_timeout": 60.0, "max_transaction_retry_time": 30.0, "initial_retry_delay": 1.0, diff --git a/tests/unit/sync/io/test_direct.py b/tests/unit/sync/io/test_direct.py index d24f93a34..70c5478d3 100644 --- a/tests/unit/sync/io/test_direct.py +++ b/tests/unit/sync/io/test_direct.py @@ -91,8 +91,7 @@ def opener(addr, timeout): self.address = address def acquire( - self, access_mode, timeout, acquisition_timeout, - database, bookmarks, liveness_check_timeout + self, access_mode, timeout, database, bookmarks, liveness_check_timeout ): return self._acquire( self.address, timeout, liveness_check_timeout diff --git a/tests/unit/sync/io/test_neo4j_pool.py b/tests/unit/sync/io/test_neo4j_pool.py index af10dc7da..6b9f8d7eb 100644 --- a/tests/unit/sync/io/test_neo4j_pool.py +++ b/tests/unit/sync/io/test_neo4j_pool.py @@ -76,13 +76,13 @@ def test_acquires_new_routing_table_if_deleted(opener): pool = Neo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx = pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx = pool.acquire(READ_ACCESS, 30, "test_db", None, None) pool.release(cx) assert pool.routing_tables.get("test_db") del pool.routing_tables["test_db"] - cx = pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx = pool.acquire(READ_ACCESS, 30, "test_db", None, None) pool.release(cx) assert pool.routing_tables.get("test_db") @@ -92,14 +92,14 @@ def test_acquires_new_routing_table_if_stale(opener): pool = Neo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx = pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx = pool.acquire(READ_ACCESS, 30, "test_db", None, None) pool.release(cx) assert pool.routing_tables.get("test_db") old_value = pool.routing_tables["test_db"].last_updated_time pool.routing_tables["test_db"].ttl = 0 - cx = pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx = pool.acquire(READ_ACCESS, 30, "test_db", None, None) pool.release(cx) assert pool.routing_tables["test_db"].last_updated_time > old_value @@ -109,10 +109,10 @@ def test_removes_old_routing_table(opener): pool = Neo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx = pool.acquire(READ_ACCESS, 30, 60, "test_db1", None, None) + cx = pool.acquire(READ_ACCESS, 30, "test_db1", None, None) pool.release(cx) assert pool.routing_tables.get("test_db1") - cx = pool.acquire(READ_ACCESS, 30, 60, "test_db2", None, None) + cx = pool.acquire(READ_ACCESS, 30, "test_db2", None, None) pool.release(cx) assert pool.routing_tables.get("test_db2") @@ -121,7 +121,7 @@ def test_removes_old_routing_table(opener): pool.routing_tables["test_db2"].ttl = \ -RoutingConfig.routing_table_purge_delay - cx = pool.acquire(READ_ACCESS, 30, 60, "test_db1", None, None) + cx = pool.acquire(READ_ACCESS, 30, "test_db1", None, None) pool.release(cx) assert pool.routing_tables["test_db1"].last_updated_time > old_value assert "test_db2" not in pool.routing_tables @@ -135,7 +135,7 @@ def test_chooses_right_connection_type(opener, type_): ) cx1 = pool.acquire( READ_ACCESS if type_ == "r" else WRITE_ACCESS, - 30, 60, "test_db", None, None + 30, "test_db", None, None ) pool.release(cx1) if type_ == "r": @@ -149,9 +149,9 @@ def test_reuses_connection(opener): pool = Neo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx1 = pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None, None) pool.release(cx1) - cx2 = pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx2 = pool.acquire(READ_ACCESS, 30, "test_db", None, None) assert cx1 is cx2 @@ -169,7 +169,7 @@ def break_connection(): pool = Neo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx1 = pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None, None) pool.release(cx1) assert cx1 in pool.connections[cx1.addr] # simulate connection going stale (e.g. exceeding) and then breaking when @@ -179,7 +179,7 @@ def break_connection(): if break_on_close: cx_close_mock_side_effect = cx_close_mock.side_effect cx_close_mock.side_effect = break_connection - cx2 = pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx2 = pool.acquire(READ_ACCESS, 30, "test_db", None, None) pool.release(cx2) if break_on_close: cx1.close.assert_called() @@ -196,11 +196,11 @@ def test_does_not_close_stale_connections_in_use(opener): pool = Neo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx1 = pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None, None) assert cx1 in pool.connections[cx1.addr] # simulate connection going stale (e.g. exceeding) while being in use cx1.stale.return_value = True - cx2 = pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx2 = pool.acquire(READ_ACCESS, 30, "test_db", None, None) pool.release(cx2) cx1.close.assert_not_called() assert cx2 is not cx1 @@ -213,7 +213,7 @@ def test_does_not_close_stale_connections_in_use(opener): # it should be closed when trying to acquire the next connection cx1.close.assert_not_called() - cx3 = pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx3 = pool.acquire(READ_ACCESS, 30, "test_db", None, None) pool.release(cx3) cx1.close.assert_called_once() assert cx2 is cx3 @@ -227,7 +227,7 @@ def test_release_resets_connections(opener): pool = Neo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx1 = pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None, None) cx1.is_reset_mock.return_value = False cx1.is_reset_mock.reset_mock() pool.release(cx1) @@ -240,7 +240,7 @@ def test_release_does_not_resets_closed_connections(opener): pool = Neo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx1 = pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None, None) cx1.closed.return_value = True cx1.closed.reset_mock() cx1.is_reset_mock.reset_mock() @@ -255,7 +255,7 @@ def test_release_does_not_resets_defunct_connections(opener): pool = Neo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx1 = pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None, None) cx1.defunct.return_value = True cx1.defunct.reset_mock() cx1.is_reset_mock.reset_mock() @@ -408,8 +408,8 @@ def close_side_effect(): pool = Neo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx1 = pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) - cx2 = pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None, None) + cx2 = pool.acquire(READ_ACCESS, 30, "test_db", None, None) pool.release(cx1) pool.release(cx2) @@ -421,7 +421,7 @@ def close_side_effect(): # unreachable cx1.stale.return_value = True - cx3 = pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx3 = pool.acquire(READ_ACCESS, 30, "test_db", None, None) assert cx3 is not cx1 assert cx3 is not cx2 @@ -432,9 +432,9 @@ def test_failing_opener_leaves_connections_in_use_alone(opener): pool = Neo4jPool( opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS ) - cx1 = pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None, None) opener.side_effect = ServiceUnavailable("Server overloaded") with pytest.raises((ServiceUnavailable, SessionExpired)): - pool.acquire(READ_ACCESS, 30, 60, "test_db", None, None) + pool.acquire(READ_ACCESS, 30, "test_db", None, None) assert not cx1.closed() diff --git a/tests/unit/sync/test_driver.py b/tests/unit/sync/test_driver.py index df6e904fc..0784cc271 100644 --- a/tests/unit/sync/test_driver.py +++ b/tests/unit/sync/test_driver.py @@ -208,7 +208,6 @@ def test_driver_opens_write_session_by_default(uri, mocker): acquire_mock.assert_called_once_with( access_mode=WRITE_ACCESS, timeout=mocker.ANY, - acquisition_timeout=mocker.ANY, database=mocker.ANY, bookmarks=mocker.ANY, liveness_check_timeout=mocker.ANY