From 09f1b16aba485504ed790df23a376e57e456c8bb Mon Sep 17 00:00:00 2001 From: Jake Date: Tue, 21 Feb 2017 14:41:03 -0800 Subject: [PATCH 1/4] [bugfix] Implement the connection pool fixes onto the blocking connection pool. --- redis/connection.py | 57 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/redis/connection.py b/redis/connection.py index 310f06e204..79e24b8f31 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -1030,11 +1030,7 @@ def reset(self): # Create and fill up a thread safe queue with ``None`` values. self.pool = self.queue_class(self.max_connections) - while True: - try: - self.pool.put_nowait(None) - except Full: - break + self._fill_pool() # Keep a list of actual connection instances so that we can # disconnect them later. @@ -1071,6 +1067,12 @@ def get_connection(self, command_name, *keys, **options): # raised unless handled by application code. If you want never to raise ConnectionError("No connection available.") + # If the pool generation differs, close the connection and open a new one. + if connection is not None and connection.pool_generation != self.generation: + self._remove_connection(connection) + connection.disconnect() + connection = None + # If the ``connection`` is actually ``None`` then that's a cue to make # a new connection to add to the pool. if connection is None: @@ -1085,7 +1087,14 @@ def release(self, connection): if connection.pid != self.pid: return - # Put the connection back into the pool. + # If we are releasing a connection that is no longer the same as the pool's generation, + # we will disconnect it. + if connection.pool_generation != self.generation: + self._remove_connection(connection) + connection.disconnect() + connection = None + + # Put the connection, or None back into the pool. try: self.pool.put_nowait(connection) except Full: @@ -1093,7 +1102,37 @@ def release(self, connection): # we don't want this connection pass - def disconnect(self): + def disconnect(self, immediate=False): "Disconnects all connections in the pool." - for connection in self._connections: - connection.disconnect() + self.generation += 1 + + if immediate: + for connection in self._connections: + connection.disconnect() + + self._connections[:] = [] + self._drain_pool() + self._fill_pool() + + def _remove_connection(self, connection): + "Remove a connection from the list of connections." + try: + self._connections.remove(connection) + except IndexError: + pass + + def _fill_pool(self): + "Fill the connection pool with sentinel values to represent that we need to make a new connection." + while True: + try: + self.pool.put_nowait(None) + except Full: + break + + def _drain_pool(self): + "Drain the pool, removing all items from it." + while True: + try: + self.pool.get_nowait() + except Empty: + break \ No newline at end of file From f58ab99ffc4f8c25d5a181e9fc9bcb2de27367e4 Mon Sep 17 00:00:00 2001 From: Jake Date: Tue, 21 Feb 2017 15:02:35 -0800 Subject: [PATCH 2/4] Remove unneded code. * Don't need to drain and re-fill the pool, when the connection is returned to the pool. it'll be properly reaped. * Actually set the connection's pool generation when making it. --- redis/connection.py | 29 +++++++---------------------- 1 file changed, 7 insertions(+), 22 deletions(-) diff --git a/redis/connection.py b/redis/connection.py index 79e24b8f31..e2dff43e03 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -1030,7 +1030,11 @@ def reset(self): # Create and fill up a thread safe queue with ``None`` values. self.pool = self.queue_class(self.max_connections) - self._fill_pool() + while True: + try: + self.pool.put_nowait(None) + except Full: + break # Keep a list of actual connection instances so that we can # disconnect them later. @@ -1039,6 +1043,7 @@ def reset(self): def make_connection(self): "Make a fresh connection." connection = self.connection_class(**self.connection_kwargs) + connection.pool_generation = self.generation self._connections.append(connection) return connection @@ -1110,29 +1115,9 @@ def disconnect(self, immediate=False): for connection in self._connections: connection.disconnect() - self._connections[:] = [] - self._drain_pool() - self._fill_pool() - def _remove_connection(self, connection): "Remove a connection from the list of connections." try: self._connections.remove(connection) except IndexError: - pass - - def _fill_pool(self): - "Fill the connection pool with sentinel values to represent that we need to make a new connection." - while True: - try: - self.pool.put_nowait(None) - except Full: - break - - def _drain_pool(self): - "Drain the pool, removing all items from it." - while True: - try: - self.pool.get_nowait() - except Empty: - break \ No newline at end of file + pass \ No newline at end of file From 80c5caafe7d06ec7ec66ddffe36190ec116b2c74 Mon Sep 17 00:00:00 2001 From: Jake Date: Wed, 22 Feb 2017 11:15:19 -0800 Subject: [PATCH 3/4] [tests] Write some tests to handle new logic in blocking connection pool. --- tests/test_connection_pool.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/test_connection_pool.py b/tests/test_connection_pool.py index 6b2478aae8..6aa3ac3d68 100644 --- a/tests/test_connection_pool.py +++ b/tests/test_connection_pool.py @@ -16,6 +16,10 @@ class DummyConnection(object): def __init__(self, **kwargs): self.kwargs = kwargs self.pid = os.getpid() + self.disconnected = False + + def disconnect(self): + self.disconnected = True class TestConnectionPool(object): @@ -127,6 +131,36 @@ def test_reuse_previously_released_connection(self): c2 = pool.get_connection('_') assert c1 == c2 + def test_disconnect_changes_generation_and_returns_new_connection(self): + pool = self.get_pool() + c1 = pool.get_connection('_') + pool.release(c1) + assert pool.generation == 0 + pool.disconnect() + assert pool.generation == 1 + c2 = pool.get_connection('_') + assert c1 != c2 + + def test_disconnect_happens_when_releasing_connection_when_pool_generation_changes(self): + pool = self.get_pool() + c1 = pool.get_connection('_') + pool.disconnect() + assert not c1.disconnected + pool.release(c1) + assert c1.disconnected + c2 = pool.get_connection('_') + assert c1 != c2 + + def test_disconnect_disconnects_immediately(self): + pool = self.get_pool() + c1 = pool.get_connection('_') + pool.disconnect(immediate=True) + assert c1.disconnected + pool.release(c1) + assert c1.disconnected + c2 = pool.get_connection('_') + assert c1 != c2 + def test_repr_contains_db_info_tcp(self): pool = redis.ConnectionPool(host='localhost', port=6379, db=0) expected = 'ConnectionPool>' From bef423f7d5f2077d218be2eca66b52b67c985ea1 Mon Sep 17 00:00:00 2001 From: Jake Date: Wed, 22 Feb 2017 11:16:20 -0800 Subject: [PATCH 4/4] remove unneeded assertion. --- tests/test_connection_pool.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_connection_pool.py b/tests/test_connection_pool.py index 6aa3ac3d68..fe2f6e70f3 100644 --- a/tests/test_connection_pool.py +++ b/tests/test_connection_pool.py @@ -157,7 +157,6 @@ def test_disconnect_disconnects_immediately(self): pool.disconnect(immediate=True) assert c1.disconnected pool.release(c1) - assert c1.disconnected c2 = pool.get_connection('_') assert c1 != c2