From 08844b81b0fb844a13c37bfd61ec9321b634fe63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Tue, 1 Aug 2023 13:38:22 +0000 Subject: [PATCH 1/8] Use correct redis url if not default when creating Connection --- tests/test_asyncio/test_sentinel_managed_connection.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_asyncio/test_sentinel_managed_connection.py b/tests/test_asyncio/test_sentinel_managed_connection.py index e784690c77..711b3ee733 100644 --- a/tests/test_asyncio/test_sentinel_managed_connection.py +++ b/tests/test_asyncio/test_sentinel_managed_connection.py @@ -10,11 +10,11 @@ pytestmark = pytest.mark.asyncio -async def test_connect_retry_on_timeout_error(): +async def test_connect_retry_on_timeout_error(connect_args): """Test that the _connect function is retried in case of a timeout""" connection_pool = mock.AsyncMock() connection_pool.get_master_address = mock.AsyncMock( - return_value=("localhost", 6379) + return_value=(connect_args["host"], connect_args["port"]) ) conn = SentinelManagedConnection( retry_on_timeout=True, From 3c75ab0361174c1e00ce726cc5074b44f008ac0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Wed, 2 Aug 2023 13:17:55 +0000 Subject: [PATCH 2/8] Make resource-warning __del__ code safer during shutdown --- redis/asyncio/client.py | 17 +++++++++++------ redis/asyncio/cluster.py | 23 +++++++++++++---------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index c340d851b1..1018c473e6 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -531,13 +531,18 @@ async def __aexit__(self, exc_type, exc_value, traceback): _DEL_MESSAGE = "Unclosed Redis client" - def __del__(self, _warnings: Any = warnings) -> None: + # passing _warnings and _grl as argument default since they may be gone + # by the time __del__ is called at shutdown + def __del__( + self, _warn: Any = warnings.warn, _grl: Any = asyncio.get_running_loop + ) -> None: if hasattr(self, "connection") and (self.connection is not None): - _warnings.warn( - f"Unclosed client session {self!r}", ResourceWarning, source=self - ) - context = {"client": self, "message": self._DEL_MESSAGE} - asyncio.get_running_loop().call_exception_handler(context) + _warn(f"Unclosed client session {self!r}", ResourceWarning, source=self) + try: + context = {"client": self, "message": self._DEL_MESSAGE} + _grl().call_exception_handler(context) + except RuntimeError: + pass async def aclose(self, close_connection_pool: Optional[bool] = None) -> None: """ diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index f4f031580d..daa6cea554 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -433,14 +433,16 @@ def __await__(self) -> Generator[Any, None, "RedisCluster"]: _DEL_MESSAGE = "Unclosed RedisCluster client" - def __del__(self) -> None: + def __del__( + self, _warn: Any = warnings.warn, _grl: Any = asyncio.get_running_loop + ) -> None: if hasattr(self, "_initialize") and not self._initialize: - warnings.warn(f"{self._DEL_MESSAGE} {self!r}", ResourceWarning, source=self) + _warn(f"{self._DEL_MESSAGE} {self!r}", ResourceWarning, source=self) try: context = {"client": self, "message": self._DEL_MESSAGE} - asyncio.get_running_loop().call_exception_handler(context) + _grl().call_exception_handler(context) except RuntimeError: - ... + pass async def on_connect(self, connection: Connection) -> None: await connection.on_connect() @@ -969,17 +971,18 @@ def __eq__(self, obj: Any) -> bool: _DEL_MESSAGE = "Unclosed ClusterNode object" - def __del__(self) -> None: + def __del__( + self, _warn: Any = warnings.warn, _grl: Any = asyncio.get_running_loop + ) -> None: for connection in self._connections: if connection.is_connected: - warnings.warn( - f"{self._DEL_MESSAGE} {self!r}", ResourceWarning, source=self - ) + _warn(f"{self._DEL_MESSAGE} {self!r}", ResourceWarning, source=self) + try: context = {"client": self, "message": self._DEL_MESSAGE} - asyncio.get_running_loop().call_exception_handler(context) + _grl().call_exception_handler(context) except RuntimeError: - ... + pass break async def disconnect(self) -> None: From 6275657faa9c2cfadaabc1e1685cdbef688bd151 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Wed, 2 Aug 2023 12:59:38 +0000 Subject: [PATCH 3/8] Remove __del__ handler, fix pubsub weakref callback handling --- redis/asyncio/client.py | 2 +- redis/asyncio/connection.py | 13 ++++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 1018c473e6..9a3cfadd3d 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -791,7 +791,7 @@ async def aclose(self): async with self._lock: if self.connection: await self.connection.disconnect() - self.connection.clear_connect_callbacks() + self.connection.deregister_connect_callback(self.on_connect) await self.connection_pool.release(self.connection) self.connection = None self.channels = {} diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 71d0e92002..d1dff783c0 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -217,10 +217,15 @@ def is_connected(self): return self._reader is not None and self._writer is not None def register_connect_callback(self, callback): - self._connect_callbacks.append(weakref.WeakMethod(callback)) + wm = weakref.WeakMethod(callback) + if wm not in self._connect_callbacks: + self._connect_callbacks.append(wm) - def clear_connect_callbacks(self): - self._connect_callbacks = [] + def deregister_connect_callback(self, callback): + try: + self._connect_callbacks.remove(weakref.WeakMethod(callback)) + except ValueError: + pass def set_parser(self, parser_class: Type[BaseParser]) -> None: """ @@ -263,6 +268,8 @@ async def connect(self): # run any user callbacks. right now the only internal callback # is for pubsub channel/pattern resubscription + # first, remove any dead weakrefs + self._connect_callbacks = [ref for ref in self._connect_callbacks if ref()] for ref in self._connect_callbacks: callback = ref() task = callback(self) From 9239b0859d1049dbfde0fd1e7657f45d80b089f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Wed, 2 Aug 2023 13:25:40 +0000 Subject: [PATCH 4/8] Clarify comment, since there is no __del__ on asyncio.connection.ConnectionPool --- redis/asyncio/client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 9a3cfadd3d..927051ba80 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -227,7 +227,6 @@ def __init__( lib_version: Optional[str] = get_lib_version(), username: Optional[str] = None, retry: Optional[Retry] = None, - # deprecated. create a pool and use connection_pool instead auto_close_connection_pool: Optional[bool] = None, redis_connect_func=None, credential_provider: Optional[CredentialProvider] = None, @@ -241,7 +240,9 @@ def __init__( To retry on TimeoutError, `retry_on_timeout` can also be set to `True`. """ kwargs: Dict[str, Any] - + # auto_close_connection_pool only has an effect if connection_pool is + # None. It is assumed that if connection_pool is not None, the user + # wants to manage the connection pool themselves. if auto_close_connection_pool is not None: warnings.warn( DeprecationWarning( From e5f5167bd51e15a07196a29effe5e91ae7cab700 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Wed, 2 Aug 2023 13:28:11 +0000 Subject: [PATCH 5/8] Remove remaining __del__ from async parser. They are not needed. --- redis/_parsers/base.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/redis/_parsers/base.py b/redis/_parsers/base.py index f77296df6a..5de04c0f94 100644 --- a/redis/_parsers/base.py +++ b/redis/_parsers/base.py @@ -138,12 +138,6 @@ def __init__(self, socket_read_size: int): self._stream: Optional[StreamReader] = None self._read_size = socket_read_size - def __del__(self): - try: - self.on_disconnect() - except Exception: - pass - async def can_read_destructive(self) -> bool: raise NotImplementedError() From 27e195caeb52fc5fbc4e0c4817e13b58750aa7f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Sun, 17 Sep 2023 12:07:31 +0000 Subject: [PATCH 6/8] make connect callback methods internal --- redis/asyncio/client.py | 4 ++-- redis/asyncio/connection.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 927051ba80..75bc03eaa7 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -792,7 +792,7 @@ async def aclose(self): async with self._lock: if self.connection: await self.connection.disconnect() - self.connection.deregister_connect_callback(self.on_connect) + self.connection._deregister_connect_callback(self.on_connect) await self.connection_pool.release(self.connection) self.connection = None self.channels = {} @@ -855,7 +855,7 @@ async def connect(self): ) # register a callback that re-subscribes to any channels we # were listening to when we were disconnected - self.connection.register_connect_callback(self.on_connect) + self.connection._register_connect_callback(self.on_connect) else: await self.connection.connect() if self.push_handler_func is not None and not HIREDIS_AVAILABLE: diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index d1dff783c0..f36b4bf79b 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -216,12 +216,12 @@ def repr_pieces(self): def is_connected(self): return self._reader is not None and self._writer is not None - def register_connect_callback(self, callback): + def _register_connect_callback(self, callback): wm = weakref.WeakMethod(callback) if wm not in self._connect_callbacks: self._connect_callbacks.append(wm) - def deregister_connect_callback(self, callback): + def _deregister_connect_callback(self, callback): try: self._connect_callbacks.remove(weakref.WeakMethod(callback)) except ValueError: From c65d043a0af4d0800f913b49ba60f076dfd43a1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Sun, 17 Sep 2023 12:13:40 +0000 Subject: [PATCH 7/8] similarly make non-async connect callbacks internal, use same system as for async. --- redis/client.py | 4 ++-- redis/cluster.py | 2 +- redis/connection.py | 15 +++++++++++---- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/redis/client.py b/redis/client.py index 1e1ff57605..cf6dbf1eed 100755 --- a/redis/client.py +++ b/redis/client.py @@ -690,7 +690,7 @@ def __del__(self): def reset(self): if self.connection: self.connection.disconnect() - self.connection.clear_connect_callbacks() + self.connection._deregister_connect_callback(self.on_connect) self.connection_pool.release(self.connection) self.connection = None self.health_check_response_counter = 0 @@ -748,7 +748,7 @@ def execute_command(self, *args): ) # register a callback that re-subscribes to any channels we # were listening to when we were disconnected - self.connection.register_connect_callback(self.on_connect) + self.connection._register_connect_callback(self.on_connect) if self.push_handler_func is not None and not HIREDIS_AVAILABLE: self.connection._parser.set_push_handler(self.push_handler_func) connection = self.connection diff --git a/redis/cluster.py b/redis/cluster.py index 2ce9c54f85..ee3e1a865d 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1775,7 +1775,7 @@ def execute_command(self, *args): ) # register a callback that re-subscribes to any channels we # were listening to when we were disconnected - self.connection.register_connect_callback(self.on_connect) + self.connection._register_connect_callback(self.on_connect) if self.push_handler_func is not None and not HIREDIS_AVAILABLE: self.connection._parser.set_push_handler(self.push_handler_func) connection = self.connection diff --git a/redis/connection.py b/redis/connection.py index 45ecd2a370..f5266d7dce 100644 --- a/redis/connection.py +++ b/redis/connection.py @@ -237,11 +237,16 @@ def _construct_command_packer(self, packer): else: return PythonRespSerializer(self._buffer_cutoff, self.encoder.encode) - def register_connect_callback(self, callback): - self._connect_callbacks.append(weakref.WeakMethod(callback)) + def _register_connect_callback(self, callback): + wm = weakref.WeakMethod(callback) + if wm not in self._connect_callbacks: + self._connect_callbacks.append(wm) - def clear_connect_callbacks(self): - self._connect_callbacks = [] + def _deregister_connect_callback(self, callback): + try: + self._connect_callbacks.remove(weakref.WeakMethod(callback)) + except ValueError: + pass def set_parser(self, parser_class): """ @@ -279,6 +284,8 @@ def connect(self): # run any user callbacks. right now the only internal callback # is for pubsub channel/pattern resubscription + # first, remove any dead weakrefs + self._connect_callbacks = [ref for ref in self._connect_callbacks if ref()] for ref in self._connect_callbacks: callback = ref() if callback: From dca114d40992cc82af95c0fe55e6179318bedc9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Mon, 18 Sep 2023 10:14:46 +0000 Subject: [PATCH 8/8] Reformat __del__() --- redis/asyncio/client.py | 4 +++- redis/asyncio/cluster.py | 8 ++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 75bc03eaa7..381df50ccc 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -535,7 +535,9 @@ async def __aexit__(self, exc_type, exc_value, traceback): # passing _warnings and _grl as argument default since they may be gone # by the time __del__ is called at shutdown def __del__( - self, _warn: Any = warnings.warn, _grl: Any = asyncio.get_running_loop + self, + _warn: Any = warnings.warn, + _grl: Any = asyncio.get_running_loop, ) -> None: if hasattr(self, "connection") and (self.connection is not None): _warn(f"Unclosed client session {self!r}", ResourceWarning, source=self) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index daa6cea554..15634de81f 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -434,7 +434,9 @@ def __await__(self) -> Generator[Any, None, "RedisCluster"]: _DEL_MESSAGE = "Unclosed RedisCluster client" def __del__( - self, _warn: Any = warnings.warn, _grl: Any = asyncio.get_running_loop + self, + _warn: Any = warnings.warn, + _grl: Any = asyncio.get_running_loop, ) -> None: if hasattr(self, "_initialize") and not self._initialize: _warn(f"{self._DEL_MESSAGE} {self!r}", ResourceWarning, source=self) @@ -972,7 +974,9 @@ def __eq__(self, obj: Any) -> bool: _DEL_MESSAGE = "Unclosed ClusterNode object" def __del__( - self, _warn: Any = warnings.warn, _grl: Any = asyncio.get_running_loop + self, + _warn: Any = warnings.warn, + _grl: Any = asyncio.get_running_loop, ) -> None: for connection in self._connections: if connection.is_connected: