Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions neo4j/_async/io/_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ async def _acquire_new(self, address, timeout):
return connection
return None

async def _acquire(self, address, timeout, lifeness_check_timeout):
async def _acquire(self, address, timeout, liveness_check_timeout):
""" Acquire a connection to a given address from the pool.
The address supplied should always be an IP address, not
a host name.
Expand All @@ -152,8 +152,8 @@ async def health_check(connection_):
or connection_.defunct()
or connection_.stale()):
return False
if lifeness_check_timeout is not None:
if connection_.is_idle_for(lifeness_check_timeout):
if liveness_check_timeout is not None:
if connection_.is_idle_for(liveness_check_timeout):
try:
await connection_.reset()
except (OSError, ServiceUnavailable, SessionExpired):
Expand Down Expand Up @@ -186,15 +186,15 @@ async def health_check(connection_):
@abc.abstractmethod
async def acquire(
self, access_mode=None, timeout=None, database=None, bookmarks=None,
lifeness_check_timeout=None
liveness_check_timeout=None
):
""" Acquire a connection to a server that can satisfy a set of parameters.

:param access_mode:
:param timeout:
:param database:
:param bookmarks:
:param lifeness_check_timeout:
:param liveness_check_timeout:
"""

async def release(self, *connections):
Expand Down Expand Up @@ -311,12 +311,12 @@ def __repr__(self):

async def acquire(
self, access_mode=None, timeout=None, database=None, bookmarks=None,
lifeness_check_timeout=None
liveness_check_timeout=None
):
# The access_mode and database is not needed for a direct connection,
# it's just there for consistency.
return await self._acquire(
self.address, timeout, lifeness_check_timeout
self.address, timeout, liveness_check_timeout
)


Expand Down Expand Up @@ -666,7 +666,7 @@ async def _select_address(self, *, access_mode, database):

async def acquire(
self, access_mode=None, timeout=None, database=None, bookmarks=None,
lifeness_check_timeout=None
liveness_check_timeout=None
):
if access_mode not in (WRITE_ACCESS, READ_ACCESS):
raise ClientError("Non valid 'access_mode'; {}".format(access_mode))
Expand Down Expand Up @@ -697,7 +697,7 @@ async def acquire(
log.debug("[#0000] C: <ACQUIRE ADDRESS> database=%r address=%r", database, address)
# should always be a resolved address
connection = await self._acquire(
address, timeout, lifeness_check_timeout
address, timeout, liveness_check_timeout
)
except (ServiceUnavailable, SessionExpired):
await self.deactivate(address=address)
Expand Down
2 changes: 1 addition & 1 deletion neo4j/_async/work/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ async def _result_error(self, _):

async def _get_server_info(self):
assert not self._connection
await self._connect(READ_ACCESS, lifeness_check_timeout=0)
await self._connect(READ_ACCESS, liveness_check_timeout=0)
server_info = self._connection.server_info
await self._disconnect()
return server_info
Expand Down
18 changes: 9 additions & 9 deletions neo4j/_sync/io/_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def _acquire_new(self, address, timeout):
return connection
return None

def _acquire(self, address, timeout, lifeness_check_timeout):
def _acquire(self, address, timeout, liveness_check_timeout):
""" Acquire a connection to a given address from the pool.
The address supplied should always be an IP address, not
a host name.
Expand All @@ -152,8 +152,8 @@ def health_check(connection_):
or connection_.defunct()
or connection_.stale()):
return False
if lifeness_check_timeout is not None:
if connection_.is_idle_for(lifeness_check_timeout):
if liveness_check_timeout is not None:
if connection_.is_idle_for(liveness_check_timeout):
try:
connection_.reset()
except (OSError, ServiceUnavailable, SessionExpired):
Expand Down Expand Up @@ -186,15 +186,15 @@ def health_check(connection_):
@abc.abstractmethod
def acquire(
self, access_mode=None, timeout=None, database=None, bookmarks=None,
lifeness_check_timeout=None
liveness_check_timeout=None
):
""" Acquire a connection to a server that can satisfy a set of parameters.

:param access_mode:
:param timeout:
:param database:
:param bookmarks:
:param lifeness_check_timeout:
:param liveness_check_timeout:
"""

def release(self, *connections):
Expand Down Expand Up @@ -311,12 +311,12 @@ def __repr__(self):

def acquire(
self, access_mode=None, timeout=None, database=None, bookmarks=None,
lifeness_check_timeout=None
liveness_check_timeout=None
):
# The access_mode and database is not needed for a direct connection,
# it's just there for consistency.
return self._acquire(
self.address, timeout, lifeness_check_timeout
self.address, timeout, liveness_check_timeout
)


Expand Down Expand Up @@ -666,7 +666,7 @@ def _select_address(self, *, access_mode, database):

def acquire(
self, access_mode=None, timeout=None, database=None, bookmarks=None,
lifeness_check_timeout=None
liveness_check_timeout=None
):
if access_mode not in (WRITE_ACCESS, READ_ACCESS):
raise ClientError("Non valid 'access_mode'; {}".format(access_mode))
Expand Down Expand Up @@ -697,7 +697,7 @@ def acquire(
log.debug("[#0000] C: <ACQUIRE ADDRESS> database=%r address=%r", database, address)
# should always be a resolved address
connection = self._acquire(
address, timeout, lifeness_check_timeout
address, timeout, liveness_check_timeout
)
except (ServiceUnavailable, SessionExpired):
self.deactivate(address=address)
Expand Down
2 changes: 1 addition & 1 deletion neo4j/_sync/work/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def _result_error(self, _):

def _get_server_info(self):
assert not self._connection
self._connect(READ_ACCESS, lifeness_check_timeout=0)
self._connect(READ_ACCESS, liveness_check_timeout=0)
server_info = self._connection.server_info
self._disconnect()
return server_info
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/async_/io/test_direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ async def opener(addr, timeout):

async def acquire(
self, access_mode=None, timeout=None, database=None, bookmarks=None,
lifeness_check_timeout=None
liveness_check_timeout=None
):
return await self._acquire(self.address, timeout,
lifeness_check_timeout)
liveness_check_timeout)


@mark_async_test
Expand Down
74 changes: 37 additions & 37 deletions tests/unit/async_/io/test_neo4j_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,29 +258,29 @@ async def test_release_does_not_resets_defunct_connections(opener):
cx1.reset.asset_not_called()


@pytest.mark.parametrize("lifeness_timeout", (0, 1, 2))
@pytest.mark.parametrize("liveness_timeout", (0, 1, 2))
@mark_async_test
async def test_acquire_performs_no_lifeness_check_on_fresh_connection(
opener, lifeness_timeout
async def test_acquire_performs_no_liveness_check_on_fresh_connection(
opener, liveness_timeout
):
pool = AsyncNeo4jPool(
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
)
cx1 = await pool._acquire(READER_ADDRESS, 30, lifeness_timeout)
cx1 = await pool._acquire(READER_ADDRESS, 30, liveness_timeout)
assert cx1.addr == READER_ADDRESS
cx1.reset.asset_not_called()


@pytest.mark.parametrize("lifeness_timeout", (0, 1, 2))
@pytest.mark.parametrize("liveness_timeout", (0, 1, 2))
@mark_async_test
async def test_acquire_performs_lifeness_check_on_existing_connection(
opener, lifeness_timeout
async def test_acquire_performs_liveness_check_on_existing_connection(
opener, liveness_timeout
):
pool = AsyncNeo4jPool(
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
)
# populate the pool with a connection
cx1 = await pool._acquire(READER_ADDRESS, 30, lifeness_timeout)
cx1 = await pool._acquire(READER_ADDRESS, 30, liveness_timeout)

# make sure we assume the right state
assert cx1.addr == READER_ADDRESS
Expand All @@ -293,68 +293,68 @@ async def test_acquire_performs_lifeness_check_on_existing_connection(
await pool.release(cx1)
cx1.reset.assert_not_called()

# then acquire it again and assert the lifeness check was performed
cx2 = await pool._acquire(READER_ADDRESS, 30, lifeness_timeout)
# then acquire it again and assert the liveness check was performed
cx2 = await pool._acquire(READER_ADDRESS, 30, liveness_timeout)
assert cx1 is cx2
cx1.is_idle_for.assert_called_once_with(lifeness_timeout)
cx1.is_idle_for.assert_called_once_with(liveness_timeout)
cx2.reset.assert_awaited_once()


@pytest.mark.parametrize("lifeness_error",
@pytest.mark.parametrize("liveness_error",
(OSError, ServiceUnavailable, SessionExpired))
@mark_async_test
async def test_acquire_creates_connection_on_failed_lifeness_check(
opener, lifeness_error
async def test_acquire_creates_connection_on_failed_liveness_check(
opener, liveness_error
):
def lifeness_side_effect(*args, **kwargs):
raise lifeness_error("lifeness check failed")
def liveness_side_effect(*args, **kwargs):
raise liveness_error("liveness check failed")

lifeness_timeout = 1
liveness_timeout = 1
pool = AsyncNeo4jPool(
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
)
# populate the pool with a connection
cx1 = await pool._acquire(READER_ADDRESS, 30, lifeness_timeout)
cx1 = await pool._acquire(READER_ADDRESS, 30, liveness_timeout)

# make sure we assume the right state
assert cx1.addr == READER_ADDRESS
cx1.is_idle_for.assert_not_called()
cx1.reset.assert_not_called()

cx1.is_idle_for.return_value = True
# simulate cx1 failing lifeness check
cx1.reset.side_effect = lifeness_side_effect
# simulate cx1 failing liveness check
cx1.reset.side_effect = liveness_side_effect

# release the connection
await pool.release(cx1)
cx1.reset.assert_not_called()

# then acquire it again and assert the lifeness check was performed
cx2 = await pool._acquire(READER_ADDRESS, 30, lifeness_timeout)
# then acquire it again and assert the liveness check was performed
cx2 = await pool._acquire(READER_ADDRESS, 30, liveness_timeout)
assert cx1 is not cx2
assert cx1.addr == cx2.addr
cx1.is_idle_for.assert_called_once_with(lifeness_timeout)
cx1.is_idle_for.assert_called_once_with(liveness_timeout)
cx2.reset.assert_not_called()
assert cx1 not in pool.connections[cx1.addr]
assert cx2 in pool.connections[cx1.addr]


@pytest.mark.parametrize("lifeness_error",
@pytest.mark.parametrize("liveness_error",
(OSError, ServiceUnavailable, SessionExpired))
@mark_async_test
async def test_acquire_returns_other_connection_on_failed_lifeness_check(
opener, lifeness_error
async def test_acquire_returns_other_connection_on_failed_liveness_check(
opener, liveness_error
):
def lifeness_side_effect(*args, **kwargs):
raise lifeness_error("lifeness check failed")
def liveness_side_effect(*args, **kwargs):
raise liveness_error("liveness check failed")

lifeness_timeout = 1
liveness_timeout = 1
pool = AsyncNeo4jPool(
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
)
# populate the pool with a connection
cx1 = await pool._acquire(READER_ADDRESS, 30, lifeness_timeout)
cx2 = await pool._acquire(READER_ADDRESS, 30, lifeness_timeout)
cx1 = await pool._acquire(READER_ADDRESS, 30, liveness_timeout)
cx2 = await pool._acquire(READER_ADDRESS, 30, liveness_timeout)

# make sure we assume the right state
assert cx1.addr == READER_ADDRESS
Expand All @@ -366,21 +366,21 @@ def lifeness_side_effect(*args, **kwargs):

cx1.is_idle_for.return_value = True
cx2.is_idle_for.return_value = True
# simulate cx1 failing lifeness check
cx1.reset.side_effect = lifeness_side_effect
# simulate cx1 failing liveness check
cx1.reset.side_effect = liveness_side_effect

# release the connection
await pool.release(cx1)
await pool.release(cx2)
cx1.reset.assert_not_called()
cx2.reset.assert_not_called()

# then acquire it again and assert the lifeness check was performed
cx3 = await pool._acquire(READER_ADDRESS, 30, lifeness_timeout)
# then acquire it again and assert the liveness check was performed
cx3 = await pool._acquire(READER_ADDRESS, 30, liveness_timeout)
assert cx3 is cx2
cx1.is_idle_for.assert_called_once_with(lifeness_timeout)
cx1.is_idle_for.assert_called_once_with(liveness_timeout)
cx1.reset.assert_awaited_once()
cx3.is_idle_for.assert_called_once_with(lifeness_timeout)
cx3.is_idle_for.assert_called_once_with(liveness_timeout)
cx3.reset.assert_awaited_once()
assert cx1 not in pool.connections[cx1.addr]
assert cx3 in pool.connections[cx1.addr]
2 changes: 1 addition & 1 deletion tests/unit/async_/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ async def test_verify_connectivity(uri, mocker):

assert ret is None
pool_mock.acquire.assert_awaited_once()
assert pool_mock.acquire.call_args.kwargs["lifeness_check_timeout"] == 0
assert pool_mock.acquire.call_args.kwargs["liveness_check_timeout"] == 0
pool_mock.release.assert_awaited_once()


Expand Down
4 changes: 2 additions & 2 deletions tests/unit/sync/io/test_direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ def opener(addr, timeout):

def acquire(
self, access_mode=None, timeout=None, database=None, bookmarks=None,
lifeness_check_timeout=None
liveness_check_timeout=None
):
return self._acquire(self.address, timeout,
lifeness_check_timeout)
liveness_check_timeout)


@mark_sync_test
Expand Down
Loading