Skip to content

Commit 7b31ee3

Browse files
authored
ssubscribe to keys separately by slot (#28)
1 parent 45d8693 commit 7b31ee3

File tree

1 file changed

+13
-7
lines changed

1 file changed

+13
-7
lines changed

redis/client.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import threading
55
import time
66
import warnings
7+
from collections import defaultdict
78
from itertools import chain
89
from typing import Optional
910

@@ -14,6 +15,7 @@
1415
list_or_args,
1516
)
1617
from redis.connection import ConnectionPool, SSLConnection, UnixDomainSocketConnection
18+
from redis.crc import key_slot
1719
from redis.credentials import CredentialProvider
1820
from redis.exceptions import (
1921
ConnectionError,
@@ -1447,11 +1449,14 @@ def on_connect(self, connection):
14471449
}
14481450
self.psubscribe(**patterns)
14491451
if self.shard_channels:
1450-
shard_channels = {
1451-
self.encoder.decode(k, force=True): v
1452-
for k, v in self.shard_channels.items()
1453-
}
1454-
self.ssubscribe(**shard_channels)
1452+
channels_by_slot = defaultdict(dict)
1453+
for k, v in self.shard_channels.items():
1454+
key = self.encoder.decode(k, force=True)
1455+
slot = key_slot(self.encoder.encode(key))
1456+
channels_by_slot[slot][key] = v
1457+
1458+
for slot, channels in channels_by_slot.items():
1459+
self.ssubscribe(**channels)
14551460

14561461
@property
14571462
def subscribed(self):
@@ -1672,8 +1677,8 @@ def ssubscribe(self, *args, target_node=None, **kwargs):
16721677
args = list_or_args(args[0], args[1:])
16731678
new_s_channels = dict.fromkeys(args)
16741679
new_s_channels.update(kwargs)
1675-
for channel in new_s_channels: # We should send ssubscribe one by one on redis cluster to prevent CROSSSLOT error
1676-
self.execute_command("SSUBSCRIBE", channel)
1680+
ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys())
1681+
16771682
# update the s_channels dict AFTER we send the command. we don't want to
16781683
# subscribe twice to these channels, once for the command and again
16791684
# for the reconnection.
@@ -1685,6 +1690,7 @@ def ssubscribe(self, *args, target_node=None, **kwargs):
16851690
# Clear the health check counter
16861691
self.health_check_response_counter = 0
16871692
self.pending_unsubscribe_shard_channels.difference_update(new_s_channels)
1693+
return ret_val
16881694

16891695
def sunsubscribe(self, *args, target_node=None):
16901696
"""

0 commit comments

Comments
 (0)