Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,7 @@ def reset(self):
def make_connection(self):
"Make a fresh connection."
connection = self.connection_class(**self.connection_kwargs)
connection.pool_generation = self.generation
self._connections.append(connection)
return connection

Expand Down Expand Up @@ -1071,6 +1072,12 @@ def get_connection(self, command_name, *keys, **options):
# raised unless handled by application code. If you want never to
raise ConnectionError("No connection available.")

# If the pool generation differs, close the connection and open a new one.
if connection is not None and connection.pool_generation != self.generation:
self._remove_connection(connection)
connection.disconnect()
connection = None

# If the ``connection`` is actually ``None`` then that's a cue to make
# a new connection to add to the pool.
if connection is None:
Expand All @@ -1085,15 +1092,32 @@ def release(self, connection):
if connection.pid != self.pid:
return

# Put the connection back into the pool.
# If we are releasing a connection that is no longer the same as the pool's generation,
# we will disconnect it.
if connection.pool_generation != self.generation:
self._remove_connection(connection)
connection.disconnect()
connection = None

# Put the connection, or None back into the pool.
try:
self.pool.put_nowait(connection)
except Full:
# perhaps the pool has been reset() after a fork? regardless,
# we don't want this connection
pass

def disconnect(self):
def disconnect(self, immediate=False):
"Disconnects all connections in the pool."
for connection in self._connections:
connection.disconnect()
self.generation += 1

if immediate:
for connection in self._connections:
connection.disconnect()

def _remove_connection(self, connection):
"Remove a connection from the list of connections."
try:
self._connections.remove(connection)
except IndexError:
pass
33 changes: 33 additions & 0 deletions tests/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ class DummyConnection(object):
def __init__(self, **kwargs):
self.kwargs = kwargs
self.pid = os.getpid()
self.disconnected = False

def disconnect(self):
self.disconnected = True


class TestConnectionPool(object):
Expand Down Expand Up @@ -127,6 +131,35 @@ def test_reuse_previously_released_connection(self):
c2 = pool.get_connection('_')
assert c1 == c2

def test_disconnect_changes_generation_and_returns_new_connection(self):
pool = self.get_pool()
c1 = pool.get_connection('_')
pool.release(c1)
assert pool.generation == 0
pool.disconnect()
assert pool.generation == 1
c2 = pool.get_connection('_')
assert c1 != c2

def test_disconnect_happens_when_releasing_connection_when_pool_generation_changes(self):
pool = self.get_pool()
c1 = pool.get_connection('_')
pool.disconnect()
assert not c1.disconnected
pool.release(c1)
assert c1.disconnected
c2 = pool.get_connection('_')
assert c1 != c2

def test_disconnect_disconnects_immediately(self):
pool = self.get_pool()
c1 = pool.get_connection('_')
pool.disconnect(immediate=True)
assert c1.disconnected
pool.release(c1)
c2 = pool.get_connection('_')
assert c1 != c2

def test_repr_contains_db_info_tcp(self):
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
expected = 'ConnectionPool<Connection<host=localhost,port=6379,db=0>>'
Expand Down