Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion redis/client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import copy
import datetime
import random
import re
import threading
import time
import warnings
from collections import defaultdict
from itertools import chain
from typing import Optional

Expand All @@ -14,6 +16,7 @@
list_or_args,
)
from redis.connection import ConnectionPool, SSLConnection, UnixDomainSocketConnection
from redis.crc import key_slot
from redis.credentials import CredentialProvider
from redis.exceptions import (
ConnectionError,
Expand Down Expand Up @@ -1672,8 +1675,23 @@ def ssubscribe(self, *args, target_node=None, **kwargs):
args = list_or_args(args[0], args[1:])
new_s_channels = dict.fromkeys(args)
new_s_channels.update(kwargs)

channels_by_slot = defaultdict(list)
for channel in new_s_channels: # We should send ssubscribe one by one on redis cluster to prevent CROSSSLOT error
self.execute_command("SSUBSCRIBE", channel)
slot = key_slot(self.encoder.encode(channel))
channels_by_slot[slot].append(channel)

slot_count = len(channels_by_slot)
min_interval_ms = 10
base_interval_ms = max(30_000 / slot_count, min_interval_ms)

for slot, channels in channels_by_slot.items():
self.execute_command("SSUBSCRIBE", *channels)

# Add jitter: ±30% of interval
jittered_ms = base_interval_ms * random.uniform(0.7, 1.3)
time.sleep(jittered_ms / 1000.0)

# update the s_channels dict AFTER we send the command. we don't want to
# subscribe twice to these channels, once for the command and again
# for the reconnection.
Expand Down