From efa51dca400cf80e3c34d00fe098571105ac208d Mon Sep 17 00:00:00 2001 From: boyxuper Date: Sun, 29 Mar 2015 13:24:12 +0800 Subject: [PATCH 01/16] - implements some CLUSTER commands --- redis/client.py | 108 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 107 insertions(+), 1 deletion(-) diff --git a/redis/client.py b/redis/client.py index 8b5a3faed2..b2fb9c53ec 100755 --- a/redis/client.py +++ b/redis/client.py @@ -278,6 +278,26 @@ def parse_slowlog_get(response, **options): } for item in response] +def parse_not_implemented(r, **options): + raise NotImplementedError() + + +def parse_cluster_slots(resp, **options): + current_host = options.get('current_host', None) + fix_server = lambda (host, port): (host or current_host, port) + + slots = {} + for slot in resp: + start, end, master = slot[:3] + slaves = slot[3:] + slots[start, end] = { + 'master': fix_server(master), + 'slaves': [fix_server(slave) for slave in slaves], + } + + return slots + + class StrictRedis(object): """ Implementation of the Redis protocol. @@ -361,6 +381,27 @@ class StrictRedis(object): 'SSCAN': parse_scan, 'TIME': lambda x: (int(x[0]), int(x[1])), 'ZSCAN': parse_zscan + }, + # cluster + { + 'CLUSTER ADDSLOTS': bool_ok, + 'CLUSTER COUNT-FAILURE-REPORTS': int, + 'CLUSTER COUNTKEYSINSLOT': int, + 'CLUSTER DELSLOTS': bool_ok, + 'CLUSTER FAILOVER': bool_ok, + 'CLUSTER FORGET': bool_ok, + 'CLUSTER GETKEYSINSLOT': int, + 'CLUSTER INFO': parse_info, + 'CLUSTER KEYSLOT': int, + 'CLUSTER MEET': bool_ok, + 'CLUSTER NODES': parse_not_implemented, + 'CLUSTER REPLICATE': bool_ok, + 'CLUSTER RESET': bool_ok, + 'CLUSTER SAVECONFIG': bool_ok, + 'CLUSTER SET-CONFIG-EPOCH': bool_ok, + 'CLUSTER SETSLOT': bool_ok, + 'CLUSTER SLAVES': parse_not_implemented, + 'CLUSTER SLOTS': parse_cluster_slots, } ) @@ -444,6 +485,7 @@ def __init__(self, host='localhost', port=6379, 'ssl_ca_certs': ssl_ca_certs, }) connection_pool = ConnectionPool(**kwargs) + self.host = host self.connection_pool = connection_pool self._use_lua_lock = None @@ -612,8 +654,72 @@ def client_setname(self, name): "Sets the current connection name" return self.execute_command('CLIENT SETNAME', name) + def cluster_addslots(self, *slots): + """Assign new hash slots to receiving node""" + return self.execute_command('CLUSTER ADDSLOTS', *slots) + + def cluster_countkeysinslot(self, slot_id): + """Return the number of local keys in the specified hash slot""" + return self.execute_command('CLUSTER COUNTKEYSINSLOT', slot_id) + + def cluster_count_failure_report(self, node_id): + """Return the number of failure reports active for a given node""" + return self.execute_command('CLUSTER COUNT-FAILURE-REPORTS', node_id) + + def cluster_delslots(self, *slots): + """Set hash slots as unbound in receiving node""" + return self.execute_command('CLUSTER DELSLOTS', *slots) + + def cluster_failover(self, option): + """Forces a slave to perform a manual failover of its master.""" + assert option.upper() in ('FORCE', 'TAKEOVER') + return self.execute_command('CLUSTER FAILOVER', Token(option)) + + def cluster_info(self): + """Provides info about Redis Cluster node state""" + return self.execute_command('CLUSTER INFO') + + def cluster_keyslot(self, name): + """Returns the hash slot of the specified key""" + return self.execute_command('CLUSTER KEYSLOT', name) + + def cluster_meet(self, host, port): + """Force a node cluster to handshake with another node""" + return self.execute_command('CLUSTER MEET', host, port) + + def cluster_replicate(self, node_id): + """Reconfigure a node as a slave of the specified master node""" + return self.execute_command('CLUSTER REPLICATE', node_id) + + def cluster_reset(self, option='SOFT'): + """Reset a Redis Cluster node""" + assert option.upper() in ('SOFT', 'HARD') + return self.execute_command('CLUSTER RESET', Token(option)) + + def cluster_save_config(self): + """Forces the node to save cluster state on disk""" + return self.execute_command('CLUSTER SAVECONFIG') + + def cluster_set_config_epoch(self, epoch): + """Set the configuration epoch in a new node""" + return self.execute_command('CLUSTER SET-CONFIG-EPOCH', epoch) + + def cluster_setslot(self, slot_id, state, node_id=None): + """Bind an hash slot to a specific node""" + if state.upper() in ('IMPORTING', 'MIGRATING', 'NODE'): + if node_id is not None: + return self.execute_command('CLUSTER SETSLOT', slot_id, Token(state), node_id) + elif state.upper() == 'STABLE': + return self.execute_command('CLUSTER SETSLOT', slot_id, Token('STABLE')) + else: + raise RedisError('Invalid slot state: %s' % state) + + def cluster_slots(self): + """Get array of Cluster slot to node mappings""" + return self.execute_command('CLUSTER SLOTS', current_host=self.host) + def config_get(self, pattern="*"): - "Return a dictionary of configuration based on the ``pattern``" + """Return a dictionary of configuration based on the ``pattern``""" return self.execute_command('CONFIG GET', pattern) def config_set(self, name, value): From 9d0868acd1b74c77d3b6af95bbbff45734654a81 Mon Sep 17 00:00:00 2001 From: boyxuper Date: Mon, 30 Mar 2015 22:33:28 +0800 Subject: [PATCH 02/16] - basic cluster client --- redis/cluster.py | 359 ++++++++++++++++++++++++++++++++++++++++++++ redis/crc.py | 60 ++++++++ redis/exceptions.py | 22 +++ 3 files changed, 441 insertions(+) create mode 100644 redis/cluster.py create mode 100644 redis/crc.py diff --git a/redis/cluster.py b/redis/cluster.py new file mode 100644 index 0000000000..2e3a764793 --- /dev/null +++ b/redis/cluster.py @@ -0,0 +1,359 @@ +"""cluster support ported from https://github.com/Grokzen/redis-py-cluster +""" +import time +import logging + +from redis.crc import crc16 +from redis._compat import iteritems, nativestr +from redis.client import StrictRedis, dict_merge +from redis.connection import Connection, ConnectionPool, DefaultParser +from redis.exceptions import ( + ConnectionError, ClusterPartitionError, ClusterError, + TimeoutError, ResponseError, BusyLoadingError, ClusterCrossSlotError, + ClusterSlotNotServedError, ClusterDownError, +) + +# TODO: loose redis interface +# TODO: more redis command support +# TODO: advanced balancer +LOGGER = logging.getLogger(__name__) + + +class ClusterBalancer(object): + def get_node_for_key(self, key_name, readonly): + raise NotImplementedError() + + def get_random_node(self, readonly): + raise NotImplementedError() + + +class RoundRobinClusterNodeBalancer(ClusterBalancer): + RR_COUNTER = 0 + + def __init__(self, manager): + self.manager = manager + + def get_node_for_key(self, key_name, readonly): + slot_id = Cluster.keyslot(key_name) + if not readonly: + return self.manager.get_master_node(slot_id) + else: + counter = self.__class__.RR_COUNTER = (self.__class__.RR_COUNTER + 1) % Cluster.KEY_SLOTS + nodes = self.manager.get_slave_nodes(slot_id, slave_only=False) + return list(nodes)[counter % len(nodes)] + + def get_random_node(self, readonly): + counter = self.__class__.RR_COUNTER = (self.__class__.RR_COUNTER + 1) % Cluster.KEY_SLOTS + + if readonly: + nodes = self.manager.master_nodes + else: + nodes = self.manager.nodes + + return list(nodes)[counter % len(nodes)] + + +class ClusterParser(DefaultParser): + class AskError(ResponseError): + def __init__(self, resp): + print resp + + class MovedError(ResponseError): + def __init__(self, resp): + """redis only redirect to master node""" + slot_id, new_node = resp.split(' ') + host, port = new_node.rsplit(':', 1) + self.slot_id = int(slot_id) + self.node = self.host, self.port = host, int(port) + + EXCEPTION_CLASSES = dict_merge( + DefaultParser.EXCEPTION_CLASSES, { + 'ASK': AskError, + 'MOVED': MovedError, + 'CLUSTERDOWN': ClusterDownError, + 'CROSSSLOT': ClusterCrossSlotError, + }) + + +class ClusterConnection(Connection): + def __init__(self, *args, **kwargs): + self.use_readonly = kwargs.pop('use_readonly', False) + kwargs['parser_class'] = ClusterParser + super(ClusterConnection, self).__init__(*args, **kwargs) + + def on_connect(self): + """Initialize the connection, set readonly is required""" + super(ClusterConnection, self).on_connect() + if self.use_readonly: + self.send_command('READONLY') + if nativestr(self.read_response()) != 'OK': + raise ResponseError('Cannot set READONLY flag') + + +class Cluster(object): + """keep knowledge of cluster""" + KEY_SLOTS = 16384 + + def __init__(self, startup_nodes=None, allow_partition=False, **cluster_kwargs): + """allow_partition: raise Exception when partition appears or not.""" + + if not startup_nodes: + raise ValueError('No startup nodes provided') + + self.cluster_kwargs = dict([ + (k, v) for k, v in iteritems(cluster_kwargs) + if k.startswith('socket_') + ]) + + self.cluster_kwargs['decode_responses'] = True + self.cluster_kwargs['password'] = cluster_kwargs.get('password') + self.allow_partition = allow_partition + + self.master_nodes = set() + self.nodes = set(startup_nodes) + self.slots = {} + self.pubsub_node = None + self.state = None + + @classmethod + def keyslot(cls, key): + """Calculate keyslot for a given key. + + This also works for binary keys that is used in python 3. + """ + k = unicode(key) + start = k.find('{') + if start > -1: + end = k.find('}', start + 1) + if end > -1 and end != start + 1: + k = k[start + 1:end] + return crc16(k) % cls.KEY_SLOTS + + def discover_cluster(self, force=False): + if len(self.slots) == self.KEY_SLOTS and not force: + return + + slots_node = {} + startup_nodes, self.nodes, self.master_nodes = self.nodes, set(), set() + for node in startup_nodes: + host, port = node + node_conn = StrictRedis(host, port, **self.cluster_kwargs) + try: + node_conn.ping() + except ConnectionError: + continue + + if self.state is None: + # TODO: upgrade to use CLUSTER NODES + self.state = node_conn.cluster_info()['cluster_state'] + + for (start, end), slot in node_conn.cluster_slots().items(): + for slot_id in range(start, end + 1): + if not self.allow_partition and slot_id in slots_node: + raise ClusterPartitionError( + 'Cluster partition appears: slot #%s, node: [%s] and [%s]' % ( + slot_id, slots_node[slot_id], node)) + + self.master_nodes.add(slot['master']) + self.nodes.update([slot['master']] + slot['slaves']) + self.slots[slot_id] = { + 'master': slot['master'], + 'slaves': set(slot['slaves']), + } + slots_node[slot_id] = node + + if len(self.slots) == self.KEY_SLOTS: + self.pubsub_node = self.determine_pubsub_node() + break + + def get_master_node(self, slot_id): + self.discover_cluster() + try: + node = self.slots[slot_id] + except IndexError: + raise ClusterSlotNotServedError(slot_id) + else: + return node['master'] + + def get_slave_nodes(self, slot_id, slave_only=True): + self.discover_cluster() + try: + node = self.slots[slot_id] + except IndexError: + raise ClusterSlotNotServedError(slot_id) + else: + if slave_only: + return list(node['slaves']) + else: + return list(node['slaves']) + [node['master']] + + def determine_pubsub_node(self): + """ + Determine what node object should be used for pubsub commands. + + All clients in the cluster will talk to the same pubsub node to ensure + all code stay compatible. See pubsub doc for more details why. + + Allways use the server with highest port number + """ + highest = -1 + node = None, None + for host, port in self.nodes: + if port > highest: + highest = port + node = host, port + + return node + + def slot_moved(self, slot_id, node): + """signal from response""" + slot = self.slots.setdefault(slot_id, {'master': None, 'slaves': set()}) + slot['master'] = node + self.nodes.add(node) + self.master_nodes.add(node) + + +class ClusterConnectionPool(object): + """connection pool for redis cluster + collection of pools + """ + DEFAULT_TIMEOUT = None + DEFAULT_MAX_CONN = 32 + + def __init__(self, manager, connection_class=ClusterConnection, + max_connections=None, **connection_kwargs): + + max_connections = max_connections or self.DEFAULT_MAX_CONN + if not isinstance(max_connections, (int, long)) or max_connections < 0: + raise ValueError('"max_connections" must be a positive integer') + + self.manager = manager + self.connection_class = connection_class + self.connection_kwargs = connection_kwargs + self.max_connections = max_connections + + # (host, port) -> pool + self.pools = {} + self.reset() + + def reset(self, force=False): + self.manager.discover_cluster(force=force) + self.pools = dict([ + (node, self.make_connection_pool(node)) + for node in self.manager.nodes + ]) + + def get_connection(self, node): + """Get a connection from the pool""" + return self.pools[node].get_connection(None) + + def make_connection_pool(self, node): + """Create a new connection""" + host, port = node + use_readonly = node not in self.manager.master_nodes + return ConnectionPool(host=host, port=port, + connection_class=self.connection_class, + max_connections=self.max_connections, + use_readonly=use_readonly, + **self.connection_kwargs) + + def release(self, connection): + """Releases the connection back to the pool""" + self.pools[connection.host, connection.port].release(connection) + + def disconnect(self): + """Disconnects all connections in the pool""" + for pool in self.pools.values(): + pool.disconnect() + + +class StrictClusterRedis(StrictRedis): + """ + If a command is implemented over the one in StrictRedis then it requires some changes compared to + the regular implementation of the method. + """ + COMMAND_TTL = 16 + READONLY_COMMANDS = 'GET MGET RANDOMKEY'.split(' ') + RANDOM_NODE_COMMANDS = 'RANDOMKEY INFO'.split(' ') + RANDOM_RR_COUNTER = 0 + + def __init__(self, startup_nodes, max_connections=32, discover_cluster=True, + pipeline_use_threads=True, node_balancer=None, **kwargs): + """ + startup_nodes --> List of nodes that initial bootstrapping can be done from + max_connections --> Maximum number of connections that should be kept open at one time + pipeline_use_threads -> By default, use threads in pipeline if this flag is set to True + **kwargs --> Extra arguments that will be sent into StrictRedis instance when created + (See Official redis-py doc for supported kwargs + [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) + Some kwargs is not supported and will raise RedisClusterException + - db (Redis do not support database SELECT in cluster mode) + """ + super(StrictClusterRedis, self).__init__(**kwargs) + + if 'db' in kwargs: + raise ClusterError("Argument 'db' is not possible to use in cluster mode") + + self.manager = Cluster(startup_nodes=startup_nodes, **kwargs) + self.connection_pool = ClusterConnectionPool(manager=self.manager, max_connections=max_connections, **kwargs) + self.node_balancer = node_balancer or RoundRobinClusterNodeBalancer(self.manager) + + if discover_cluster: + self.manager.discover_cluster() + + self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy() + self.pipeline_use_threads = pipeline_use_threads + + def _get_random_node(self): + """for read""" + slot_id = self.__class__.RANDOM_RR_COUNT = (self.__class__.RANDOM_RR_COUNT + 1) % Cluster.KEY_SLOTS + nodes = self.manager.get_slave_nodes(slot_id, slave_only=False) + return nodes[slot_id % len(nodes)] + + def prepare_command(self, command_args): + command = command_args[0] + readonly = command in self.READONLY_COMMANDS + + if command in self.RANDOM_NODE_COMMANDS: + node = self.node_balancer.get_random_node(readonly=readonly) + else: + key_name = command_args[1] + node = self.node_balancer.get_node_for_key(key_name=key_name, readonly=readonly) + + connection = self.connection_pool.get_connection(node) + packed_command = connection.pack_command(*command_args) + return connection, packed_command + + def execute_command(self, *command_args, **parser_args): + """Send a command to a node in the cluster + SINGLE & SIMPLE MODE + 1. single slot command + 2 random node command + 3. multiple slot command + 3.1. *cross slot command + + 1. single key [v] + 2. no key + 3. multiple key + """ + command = command_args[0] + + ttl = self.COMMAND_TTL + while ttl > 0: + ttl -= 1 + + connection, packed_command = self.prepare_command(command_args) + try: + connection.send_packed_command(packed_command) + return self.parse_response(connection, command, **parser_args) + except BusyLoadingError: + raise + except (ConnectionError, TimeoutError): + if ttl < self.COMMAND_TTL / 2: + time.sleep(0.01) + except ClusterParser.MovedError as e: + self.manager.slot_moved(e.slot_id, e.node) + finally: + self.connection_pool.release(connection) + + raise ClusterError('TTL exhausted.') diff --git a/redis/crc.py b/redis/crc.py new file mode 100644 index 0000000000..91b29fb407 --- /dev/null +++ b/redis/crc.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- + +# python std lib +import sys + + +XMODEMCRC16Lookup = [ + 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7, + 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef, + 0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6, + 0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de, + 0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485, + 0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d, + 0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4, + 0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc, + 0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823, + 0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b, + 0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12, + 0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a, + 0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41, + 0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49, + 0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70, + 0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78, + 0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f, + 0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067, + 0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e, + 0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256, + 0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d, + 0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405, + 0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c, + 0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634, + 0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab, + 0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3, + 0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a, + 0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92, + 0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9, + 0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1, + 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8, + 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0 +] + + +def _crc16_py3(data): + crc = 0 + for byte in data.encode("utf-8"): + crc = ((crc << 8) & 0xff00) ^ XMODEMCRC16Lookup[((crc >> 8) & 0xff) ^ byte] + return crc & 0xffff + + +def _crc16_py2(data): + crc = 0 + for byte in data.encode("utf-8"): + crc = ((crc << 8) & 0xff00) ^ XMODEMCRC16Lookup[((crc >> 8) & 0xff) ^ ord(byte)] + return crc & 0xffff + + +if sys.version_info >= (3, 0, 0): + crc16 = _crc16_py3 +else: + crc16 = _crc16_py2 diff --git a/redis/exceptions.py b/redis/exceptions.py index a8518c708a..b5a2b1ffba 100644 --- a/redis/exceptions.py +++ b/redis/exceptions.py @@ -69,3 +69,25 @@ class LockError(RedisError, ValueError): # NOTE: For backwards compatability, this class derives from ValueError. # This was originally chosen to behave like threading.Lock. pass + + +class ClusterError(RedisError): + pass + + +class ClusterCrossSlotError(RedisError): + def __init__(self, resp): + print resp + + +class ClusterSlotNotServedError(RedisError): + pass + + +class ClusterPartitionError(RedisError): + pass + + +class ClusterDownError(ClusterError, ResponseError): + def __init__(self, resp): + print resp From ad675f351c271d2701fb910f75429b096cd604aa Mon Sep 17 00:00:00 2001 From: boyxuper Date: Tue, 31 Mar 2015 15:01:36 +0800 Subject: [PATCH 03/16] - support same slot multiple-keys ops --- redis/cluster.py | 211 ++++++++++++++++++++++++++++++++++++++------ redis/exceptions.py | 3 +- 2 files changed, 185 insertions(+), 29 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index 2e3a764793..905115d594 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -14,27 +14,32 @@ ) # TODO: loose redis interface -# TODO: more redis command support +# TODO: collect results # TODO: advanced balancer -LOGGER = logging.getLogger(__name__) +# TODO: pipeline +# TODO: script +# TODO: pubsub +LOGGER = logging #.getLogger(__name__) class ClusterBalancer(object): def get_node_for_key(self, key_name, readonly): raise NotImplementedError() + def get_node_for_slot(self, slot_id, readonly): + raise NotImplementedError() + def get_random_node(self, readonly): raise NotImplementedError() class RoundRobinClusterNodeBalancer(ClusterBalancer): - RR_COUNTER = 0 + RR_COUNTER = 1 def __init__(self, manager): self.manager = manager - def get_node_for_key(self, key_name, readonly): - slot_id = Cluster.keyslot(key_name) + def get_node_for_slot(self, slot_id, readonly): if not readonly: return self.manager.get_master_node(slot_id) else: @@ -42,13 +47,16 @@ def get_node_for_key(self, key_name, readonly): nodes = self.manager.get_slave_nodes(slot_id, slave_only=False) return list(nodes)[counter % len(nodes)] + def get_node_for_key(self, key_name, readonly): + return self.get_node_for_slot(Cluster.keyslot(key_name), readonly=readonly) + def get_random_node(self, readonly): counter = self.__class__.RR_COUNTER = (self.__class__.RR_COUNTER + 1) % Cluster.KEY_SLOTS if readonly: - nodes = self.manager.master_nodes - else: nodes = self.manager.nodes + else: + nodes = self.manager.master_nodes return list(nodes)[counter % len(nodes)] @@ -76,6 +84,8 @@ def __init__(self, resp): class ClusterConnection(Connection): + description_format = "ClusterConnection" + def __init__(self, *args, **kwargs): self.use_readonly = kwargs.pop('use_readonly', False) kwargs['parser_class'] = ClusterParser @@ -149,10 +159,12 @@ def discover_cluster(self, force=False): for (start, end), slot in node_conn.cluster_slots().items(): for slot_id in range(start, end + 1): - if not self.allow_partition and slot_id in slots_node: - raise ClusterPartitionError( - 'Cluster partition appears: slot #%s, node: [%s] and [%s]' % ( - slot_id, slots_node[slot_id], node)) + if not self.allow_partition and slot_id in self.slots: + old_master = self.slots[slot_id]['master'] + if old_master != slot['master']: + raise ClusterPartitionError( + 'Cluster partition appears: slot #%s, node: [%s]:[%s] and [%s]:[%s]' % ( + slot_id, slots_node[slot_id], old_master, node, slot['master'])) self.master_nodes.add(slot['master']) self.nodes.update([slot['master']] + slot['slaves']) @@ -166,6 +178,10 @@ def discover_cluster(self, force=False): self.pubsub_node = self.determine_pubsub_node() break + if not self.nodes: + self.nodes = startup_nodes + raise ClusterDownError('no startup node can be reached.') + def get_master_node(self, slot_id): self.discover_cluster() try: @@ -217,7 +233,7 @@ class ClusterConnectionPool(object): """connection pool for redis cluster collection of pools """ - DEFAULT_TIMEOUT = None + DEFAULT_TIMEOUT = .05 DEFAULT_MAX_CONN = 32 def __init__(self, manager, connection_class=ClusterConnection, @@ -230,6 +246,7 @@ def __init__(self, manager, connection_class=ClusterConnection, self.manager = manager self.connection_class = connection_class self.connection_kwargs = connection_kwargs + self.connection_kwargs.setdefault('socket_timeout', self.DEFAULT_TIMEOUT) self.max_connections = max_connections # (host, port) -> pool @@ -273,12 +290,111 @@ class StrictClusterRedis(StrictRedis): the regular implementation of the method. """ COMMAND_TTL = 16 - READONLY_COMMANDS = 'GET MGET RANDOMKEY'.split(' ') - RANDOM_NODE_COMMANDS = 'RANDOMKEY INFO'.split(' ') - RANDOM_RR_COUNTER = 0 + COMMAND_FLAGS = dict_merge( + dict.fromkeys([ + 'RANDOMKEY', 'CLUSTER KEYSLOT', 'ECHO', + ], 'random'), + dict.fromkeys([ + 'CLUSTER COUNTKEYSINSLOT', + ], 'slot-id'), + dict.fromkeys([ + 'DBSIZE', + ], 'collect'), + dict.fromkeys([ + # impossible in cluster mode + 'SELECT', 'MOVE', 'SLAVEOF', + + # sentinels + 'SENTINEL GET-MASTER-ADDR-BY-NAME', 'SENTINEL MASTER', 'SENTINEL MASTERS', + 'SENTINEL MONITOR', 'SENTINEL REMOVE', 'SENTINEL SENTINELS', 'SENTINEL SET', + 'SENTINEL SLAVES', + + # admin commands + 'BGREWRITEAOF', 'BGSAVE', 'SAVE', 'INFO', 'KEYS', 'LASTSAVE', + 'CONFIG GET', 'CONFIG SET', 'CONFIG RESETSTAT', 'CONFIG REWRITE', + 'CLIENT KILL', 'CLIENT LIST', 'CLIENT GETNAME', 'CLIENT SETNAME', + 'SLOWLOG GET', 'SLOWLOG RESET', 'SLOWLOG LEN', 'SHUTDOWN', + + # lua script + 'EVALSHA', 'SCRIPT EXISTS', 'SCRIPT KILL', 'SCRIPT LOAD', 'SCRIPT FLUSH', + + # unknown behaviors in cluster + 'DBSIZE', 'PING', 'MONITOR', 'TIME', 'PFSELFTEST', 'READONLY', 'READWRITE', + 'OBJECT REFCOUNT', 'OBJECT ENCODING', 'OBJECT IDLETIME', + + # test/doc command + 'PFSELFTEST', 'COMMAND', 'COMMAND COUNT', 'COMMAND GETKEYS', 'COMMAND INFO', + + # pipeline related + 'DISCARD', 'MULTI', 'WATCH', 'UNWATCH', + + # latency monitor + 'LATENCY LATEST', 'LATENCY HISTORY', 'LATENCY RESET', 'LATENCY GRAPH', 'LATENCY DOCTOR', + + # unknown commands + 'REPLCONF', 'SYNC', 'PSYNC', + + # all_master for loose + 'FLUSHALL', 'FLUSHDB', 'SCAN', + + # all_shard for loose + 'KEYS', + + # pubsub_node for loose + 'PUBLISH', 'SUBSCRIBE', 'PSUBSCRIBE', 'UNSUBSCRIBE', 'PUNSUBSCRIBE', + 'PUBSUB CHANNELS', 'PUBSUB NUMSUB', 'PUBSUB NUMPAT', + ], 'blocked'), # block for strict client + ) + COMMAND_PARSE_KEYS = dict_merge( + dict.fromkeys([ + 'BRPOPLPUSH', 'RPOPLPUSH', 'RENAME', 'SMOVE', + ], lambda args: [args[1], args[2]]), + dict.fromkeys([ + 'BLPOP', 'BRPOP', + ], lambda args: args[2:-1]), + dict.fromkeys([ + 'ZINTERSTORE', 'ZUNIONSTORE', + ], lambda args: [args[1]] + args[3:3+args[2]]), + dict.fromkeys([ + 'MSET', 'MSETNX', + ], lambda args: args[1::2]), + dict.fromkeys([ + 'DEL', 'RPOPLPUSH', 'RENAME', 'RENAMENX', 'SMOVE', 'SDIFF', 'SDIFFSTORE', + 'SINTER', 'SINTERSTORE', 'SUNION', 'SUNIONSTORE', 'PFMERGE' + ], lambda args: args[1:]), + { + 'BITOP': lambda args: args[2:], + '': lambda args: args[2:-1], + }, + ), + READONLY_COMMANDS = { + # single key ops + # - bits + 'BITCOUNT', 'BITPOS', 'GETBIT', + + # - string + 'GET', 'MGET', 'TTL', 'PTTL', 'EXISTS', 'GETRANGE', 'SUBSTR', 'STRLEN', + 'DUMP', 'TYPE', 'RANDOMKEY', + + # - hash + 'HEXISTS', 'HGET', 'HGETALL', 'HKEYS', 'HLEN', 'HMGET', 'HSCAN', 'HVALS', + + # - list + 'LINDEX', 'LLEN', 'LRANGE', + + # - SET + 'SISMEMBER', 'SMEMBERS', 'SRANDMEMBER', 'SCARD', 'SSCAN', 'SDIFF', 'SINTER', 'SUNION', + + # - ZSET + 'ZCARD', 'ZCOUNT', 'ZLEXCOUNT', 'ZRANGE', 'ZRANGEBYLEX', 'ZRANGEBYSCORE', 'ZSCAN', + 'ZRANK', 'ZREVRANGE', 'ZREVRANGEBYLEX', 'ZREVRANGEBYSCORE', 'ZREVRANK', 'ZSCORE', + + # - hyper loglog + 'PFCOUNT', + } def __init__(self, startup_nodes, max_connections=32, discover_cluster=True, - pipeline_use_threads=True, node_balancer=None, **kwargs): + pipeline_use_threads=True, node_balancer=None, packer_kwargs=None, **kwargs): """ startup_nodes --> List of nodes that initial bootstrapping can be done from max_connections --> Maximum number of connections that should be kept open at one time @@ -301,6 +417,7 @@ def __init__(self, startup_nodes, max_connections=32, discover_cluster=True, if discover_cluster: self.manager.discover_cluster() + self.packer_conn = Connection(**packer_kwargs or {}) self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy() self.pipeline_use_threads = pipeline_use_threads @@ -314,46 +431,86 @@ def prepare_command(self, command_args): command = command_args[0] readonly = command in self.READONLY_COMMANDS - if command in self.RANDOM_NODE_COMMANDS: + node_flag = self.COMMAND_FLAGS.get(command) + if node_flag == 'blocked': + raise ClusterError('Blocked command: %s' % command) + elif node_flag == 'random': node = self.node_balancer.get_random_node(readonly=readonly) + elif command in self.COMMAND_PARSE_KEYS: + slot_ids = set() + for key_name in self.COMMAND_PARSE_KEYS[command](command_args): + slot_ids.add(Cluster.keyslot(key_name)) + + if len(slot_ids) != 1: + raise ClusterCrossSlotError() + + node = self.node_balancer.get_node_for_slot(slot_id=slot_ids[0], readonly=readonly) else: key_name = command_args[1] node = self.node_balancer.get_node_for_key(key_name=key_name, readonly=readonly) connection = self.connection_pool.get_connection(node) - packed_command = connection.pack_command(*command_args) - return connection, packed_command + return connection + + @classmethod + def _desc_node(cls, node_or_conn): + if isinstance(node_or_conn, Connection): + node_or_conn = node_or_conn.host, node_or_conn.port + + return '%s:%s' % node_or_conn def execute_command(self, *command_args, **parser_args): """Send a command to a node in the cluster SINGLE & SIMPLE MODE - 1. single slot command - 2 random node command - 3. multiple slot command - 3.1. *cross slot command + 1. single slot command [v] + 2. random node command [v] + 3. multiple slot command [loose] + 3.1. *cross slot command [loose] 1. single key [v] - 2. no key - 3. multiple key + 2. no key [v] + 3. multiple key in one slot [v] """ command = command_args[0] + packed_command = self.packer_conn.pack_command(*command_args) ttl = self.COMMAND_TTL while ttl > 0: ttl -= 1 - connection, packed_command = self.prepare_command(command_args) + connection = self.prepare_command(command_args) try: connection.send_packed_command(packed_command) return self.parse_response(connection, command, **parser_args) except BusyLoadingError: raise - except (ConnectionError, TimeoutError): + except (ConnectionError, TimeoutError) as e: if ttl < self.COMMAND_TTL / 2: time.sleep(0.01) + LOGGER.warning('Node %s: %s' % (e.__class__.__name__, self._desc_node(connection))) except ClusterParser.MovedError as e: self.manager.slot_moved(e.slot_id, e.node) + LOGGER.warning('slot moved: %s [%s] -> [%s]' % ( + e.slot_id, self._desc_node(connection), self._desc_node(e.node))) finally: self.connection_pool.release(connection) raise ClusterError('TTL exhausted.') + + +class ClusterRedis(StrictClusterRedis): + """ + KEYS pattern + Find all keys matching the given pattern + MIGRATE host port key destination-db timeout [COPY] [REPLACE] + Atomically transfer a key from a Redis instance to another one. + + implements cross-slot/all-nodes operations + """ + + COMMAND_FLAGS = dict_merge( + StrictClusterRedis.COMMAND_FLAGS, + dict.fromkeys([ + 'FLUSHALL', 'FLUSHDB', + ], 'all-master'), + ) diff --git a/redis/exceptions.py b/redis/exceptions.py index b5a2b1ffba..d6acc8cab9 100644 --- a/redis/exceptions.py +++ b/redis/exceptions.py @@ -76,8 +76,7 @@ class ClusterError(RedisError): class ClusterCrossSlotError(RedisError): - def __init__(self, resp): - print resp + pass class ClusterSlotNotServedError(RedisError): From 562150d0c47b9eff98cb60426efeb96aa69a1fd0 Mon Sep 17 00:00:00 2001 From: boyxuper Date: Tue, 31 Mar 2015 15:50:49 +0800 Subject: [PATCH 04/16] - collect commands: DBSIZE, KEYS, MGET --- redis/cluster.py | 97 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 79 insertions(+), 18 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index 905115d594..5d3f263cfb 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -5,7 +5,7 @@ from redis.crc import crc16 from redis._compat import iteritems, nativestr -from redis.client import StrictRedis, dict_merge +from redis.client import StrictRedis, dict_merge, list_or_args from redis.connection import Connection, ConnectionPool, DefaultParser from redis.exceptions import ( ConnectionError, ClusterPartitionError, ClusterError, @@ -32,6 +32,10 @@ def get_node_for_slot(self, slot_id, readonly): def get_random_node(self, readonly): raise NotImplementedError() + def get_random_nodes(self, readonly): + """one each shard""" + raise NotImplementedError() + class RoundRobinClusterNodeBalancer(ClusterBalancer): RR_COUNTER = 1 @@ -39,27 +43,39 @@ class RoundRobinClusterNodeBalancer(ClusterBalancer): def __init__(self, manager): self.manager = manager + def _incr(self, by=1): + counter = self.__class__.RR_COUNTER = (self.__class__.RR_COUNTER + by) % Cluster.KEY_SLOTS + return counter + def get_node_for_slot(self, slot_id, readonly): if not readonly: return self.manager.get_master_node(slot_id) else: - counter = self.__class__.RR_COUNTER = (self.__class__.RR_COUNTER + 1) % Cluster.KEY_SLOTS nodes = self.manager.get_slave_nodes(slot_id, slave_only=False) - return list(nodes)[counter % len(nodes)] + return list(nodes)[self._incr() % len(nodes)] def get_node_for_key(self, key_name, readonly): return self.get_node_for_slot(Cluster.keyslot(key_name), readonly=readonly) def get_random_node(self, readonly): - counter = self.__class__.RR_COUNTER = (self.__class__.RR_COUNTER + 1) % Cluster.KEY_SLOTS + counter = self._incr() if readonly: nodes = self.manager.nodes else: - nodes = self.manager.master_nodes + nodes = self.manager.master_slaves.keys() return list(nodes)[counter % len(nodes)] + def get_random_nodes(self, readonly): + """one each shard""" + for master, slaves in self.manager.master_slaves.items(): + if readonly: + nodes = list(slaves) + [master] + yield list(nodes)[self._incr() % len(nodes)] + else: + yield master + class ClusterParser(DefaultParser): class AskError(ResponseError): @@ -119,7 +135,7 @@ def __init__(self, startup_nodes=None, allow_partition=False, **cluster_kwargs): self.cluster_kwargs['password'] = cluster_kwargs.get('password') self.allow_partition = allow_partition - self.master_nodes = set() + self.master_slaves = {} self.nodes = set(startup_nodes) self.slots = {} self.pubsub_node = None @@ -144,7 +160,7 @@ def discover_cluster(self, force=False): return slots_node = {} - startup_nodes, self.nodes, self.master_nodes = self.nodes, set(), set() + startup_nodes, self.nodes, self.master_slaves = self.nodes, set(), {} for node in startup_nodes: host, port = node node_conn = StrictRedis(host, port, **self.cluster_kwargs) @@ -166,7 +182,7 @@ def discover_cluster(self, force=False): 'Cluster partition appears: slot #%s, node: [%s]:[%s] and [%s]:[%s]' % ( slot_id, slots_node[slot_id], old_master, node, slot['master'])) - self.master_nodes.add(slot['master']) + self.master_slaves.setdefault(slot['master'], set()).update(slot['slaves']) self.nodes.update([slot['master']] + slot['slaves']) self.slots[slot_id] = { 'master': slot['master'], @@ -226,14 +242,16 @@ def slot_moved(self, slot_id, node): slot = self.slots.setdefault(slot_id, {'master': None, 'slaves': set()}) slot['master'] = node self.nodes.add(node) - self.master_nodes.add(node) + + # FIXME: should we trigger reload? + self.master_slaves.setdefault(node, []) class ClusterConnectionPool(object): """connection pool for redis cluster collection of pools """ - DEFAULT_TIMEOUT = .05 + DEFAULT_TIMEOUT = 0.1 DEFAULT_MAX_CONN = 32 def __init__(self, manager, connection_class=ClusterConnection, @@ -267,7 +285,7 @@ def get_connection(self, node): def make_connection_pool(self, node): """Create a new connection""" host, port = node - use_readonly = node not in self.manager.master_nodes + use_readonly = node not in self.manager.master_slaves return ConnectionPool(host=host, port=port, connection_class=self.connection_class, max_connections=self.max_connections, @@ -297,9 +315,6 @@ class StrictClusterRedis(StrictRedis): dict.fromkeys([ 'CLUSTER COUNTKEYSINSLOT', ], 'slot-id'), - dict.fromkeys([ - 'DBSIZE', - ], 'collect'), dict.fromkeys([ # impossible in cluster mode 'SELECT', 'MOVE', 'SLAVEOF', @@ -360,7 +375,7 @@ class StrictClusterRedis(StrictRedis): ], lambda args: args[1::2]), dict.fromkeys([ 'DEL', 'RPOPLPUSH', 'RENAME', 'RENAMENX', 'SMOVE', 'SDIFF', 'SDIFFSTORE', - 'SINTER', 'SINTERSTORE', 'SUNION', 'SUNIONSTORE', 'PFMERGE' + 'SINTER', 'SINTERSTORE', 'SUNION', 'SUNIONSTORE', 'PFMERGE', 'MGET', ], lambda args: args[1:]), { 'BITOP': lambda args: args[2:], @@ -427,7 +442,46 @@ def _get_random_node(self): nodes = self.manager.get_slave_nodes(slot_id, slave_only=False) return nodes[slot_id % len(nodes)] - def prepare_command(self, command_args): + def mget(self, keys, *args): + """collects from slots + """ + slot_keys = {} + origin_keys = list_or_args(keys, args) + for key in origin_keys: + slot_keys.setdefault(Cluster.keyslot(key), []).append(key) + + results = {} + for slot_id, keys in slot_keys.iteritems(): + values = super(StrictClusterRedis, self).mget(keys) + results.update(dict(zip(keys, values))) + + return [results[key] for key in origin_keys] + + def dbsize(self): + """collects from masters + """ + result = 0 + for node in self.node_balancer.get_random_nodes(readonly=True): + connection = self.connection_pool.get_connection(node) + try: + result += self.execute_connection_command(connection, ('DBSIZE', )) + finally: + self.connection_pool.release(connection) + + def keys(self, pattern='*'): + """collects from masters + """ + result = [] + for node in self.node_balancer.get_random_nodes(readonly=True): + connection = self.connection_pool.get_connection(node) + try: + result += self.execute_connection_command(connection, ('KEYS', pattern)) + finally: + self.connection_pool.release(connection) + + return result + + def get_connection(self, command_args): command = command_args[0] readonly = command in self.READONLY_COMMANDS @@ -436,6 +490,8 @@ def prepare_command(self, command_args): raise ClusterError('Blocked command: %s' % command) elif node_flag == 'random': node = self.node_balancer.get_random_node(readonly=readonly) + elif node_flag == 'slot-id': + node = self.node_balancer.get_node_for_slot(slot_id=command_args[1], readonly=readonly) elif command in self.COMMAND_PARSE_KEYS: slot_ids = set() for key_name in self.COMMAND_PARSE_KEYS[command](command_args): @@ -444,7 +500,7 @@ def prepare_command(self, command_args): if len(slot_ids) != 1: raise ClusterCrossSlotError() - node = self.node_balancer.get_node_for_slot(slot_id=slot_ids[0], readonly=readonly) + node = self.node_balancer.get_node_for_slot(slot_id=slot_ids.pop(), readonly=readonly) else: key_name = command_args[1] node = self.node_balancer.get_node_for_key(key_name=key_name, readonly=readonly) @@ -459,6 +515,11 @@ def _desc_node(cls, node_or_conn): return '%s:%s' % node_or_conn + def execute_connection_command(self, connection, command_args, parser_args=None): + command = command_args[0] + connection.send_command(*command_args) + return self.parse_response(connection, command, **parser_args or {}) + def execute_command(self, *command_args, **parser_args): """Send a command to a node in the cluster SINGLE & SIMPLE MODE @@ -478,7 +539,7 @@ def execute_command(self, *command_args, **parser_args): while ttl > 0: ttl -= 1 - connection = self.prepare_command(command_args) + connection = self.get_connection(command_args) try: connection.send_packed_command(packed_command) return self.parse_response(connection, command, **parser_args) From 86b8f627f49d839f6f83491c522758235bf0a597 Mon Sep 17 00:00:00 2001 From: boyxuper Date: Tue, 31 Mar 2015 16:35:40 +0800 Subject: [PATCH 05/16] - removed unused code --- redis/cluster.py | 58 ++++++++++++++++++++++++++---------------------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index 5d3f263cfb..f71b2244f0 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -14,7 +14,6 @@ ) # TODO: loose redis interface -# TODO: collect results # TODO: advanced balancer # TODO: pipeline # TODO: script @@ -32,7 +31,7 @@ def get_node_for_slot(self, slot_id, readonly): def get_random_node(self, readonly): raise NotImplementedError() - def get_random_nodes(self, readonly): + def get_shards_nodes(self, readonly): """one each shard""" raise NotImplementedError() @@ -67,7 +66,7 @@ def get_random_node(self, readonly): return list(nodes)[counter % len(nodes)] - def get_random_nodes(self, readonly): + def get_shards_nodes(self, readonly): """one each shard""" for master, slaves in self.manager.master_slaves.items(): if readonly: @@ -325,7 +324,7 @@ class StrictClusterRedis(StrictRedis): 'SENTINEL SLAVES', # admin commands - 'BGREWRITEAOF', 'BGSAVE', 'SAVE', 'INFO', 'KEYS', 'LASTSAVE', + 'BGREWRITEAOF', 'BGSAVE', 'SAVE', 'INFO', 'LASTSAVE', 'CONFIG GET', 'CONFIG SET', 'CONFIG RESETSTAT', 'CONFIG REWRITE', 'CLIENT KILL', 'CLIENT LIST', 'CLIENT GETNAME', 'CLIENT SETNAME', 'SLOWLOG GET', 'SLOWLOG RESET', 'SLOWLOG LEN', 'SHUTDOWN', @@ -334,7 +333,7 @@ class StrictClusterRedis(StrictRedis): 'EVALSHA', 'SCRIPT EXISTS', 'SCRIPT KILL', 'SCRIPT LOAD', 'SCRIPT FLUSH', # unknown behaviors in cluster - 'DBSIZE', 'PING', 'MONITOR', 'TIME', 'PFSELFTEST', 'READONLY', 'READWRITE', + 'PING', 'MONITOR', 'TIME', 'READONLY', 'READWRITE', 'OBJECT REFCOUNT', 'OBJECT ENCODING', 'OBJECT IDLETIME', # test/doc command @@ -352,9 +351,6 @@ class StrictClusterRedis(StrictRedis): # all_master for loose 'FLUSHALL', 'FLUSHDB', 'SCAN', - # all_shard for loose - 'KEYS', - # pubsub_node for loose 'PUBLISH', 'SUBSCRIBE', 'PSUBSCRIBE', 'UNSUBSCRIBE', 'PUNSUBSCRIBE', 'PUBSUB CHANNELS', 'PUBSUB NUMSUB', 'PUBSUB NUMPAT', @@ -379,7 +375,6 @@ class StrictClusterRedis(StrictRedis): ], lambda args: args[1:]), { 'BITOP': lambda args: args[2:], - '': lambda args: args[2:-1], }, ), READONLY_COMMANDS = { @@ -409,7 +404,7 @@ class StrictClusterRedis(StrictRedis): } def __init__(self, startup_nodes, max_connections=32, discover_cluster=True, - pipeline_use_threads=True, node_balancer=None, packer_kwargs=None, **kwargs): + node_balancer=None, packer_kwargs=None, **kwargs): """ startup_nodes --> List of nodes that initial bootstrapping can be done from max_connections --> Maximum number of connections that should be kept open at one time @@ -434,13 +429,6 @@ def __init__(self, startup_nodes, max_connections=32, discover_cluster=True, self.packer_conn = Connection(**packer_kwargs or {}) self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy() - self.pipeline_use_threads = pipeline_use_threads - - def _get_random_node(self): - """for read""" - slot_id = self.__class__.RANDOM_RR_COUNT = (self.__class__.RANDOM_RR_COUNT + 1) % Cluster.KEY_SLOTS - nodes = self.manager.get_slave_nodes(slot_id, slave_only=False) - return nodes[slot_id % len(nodes)] def mget(self, keys, *args): """collects from slots @@ -448,31 +436,47 @@ def mget(self, keys, *args): slot_keys = {} origin_keys = list_or_args(keys, args) for key in origin_keys: - slot_keys.setdefault(Cluster.keyslot(key), []).append(key) + slot_keys.setdefault(Cluster.keyslot(key), set()).add(key) results = {} for slot_id, keys in slot_keys.iteritems(): + keys = list(keys) values = super(StrictClusterRedis, self).mget(keys) results.update(dict(zip(keys, values))) return [results[key] for key in origin_keys] + def delete(self, *names): + """collects from slots + """ + slot_keys = {} + for key in names: + slot_keys.setdefault(Cluster.keyslot(key), set()).add(key) + + result = 0 + for slot_id, keys in slot_keys.iteritems(): + result += super(StrictClusterRedis, self).delete(*keys) + + return result + def dbsize(self): - """collects from masters + """collects from all shards """ result = 0 - for node in self.node_balancer.get_random_nodes(readonly=True): + for node in self.node_balancer.get_shards_nodes(readonly=True): connection = self.connection_pool.get_connection(node) try: result += self.execute_connection_command(connection, ('DBSIZE', )) finally: self.connection_pool.release(connection) + return result + def keys(self, pattern='*'): - """collects from masters + """collects from all shards """ result = [] - for node in self.node_balancer.get_random_nodes(readonly=True): + for node in self.node_balancer.get_shards_nodes(readonly=True): connection = self.connection_pool.get_connection(node) try: result += self.execute_connection_command(connection, ('KEYS', pattern)) @@ -481,7 +485,7 @@ def keys(self, pattern='*'): return result - def get_connection(self, command_args): + def determine_node(self, command_args): command = command_args[0] readonly = command in self.READONLY_COMMANDS @@ -505,8 +509,7 @@ def get_connection(self, command_args): key_name = command_args[1] node = self.node_balancer.get_node_for_key(key_name=key_name, readonly=readonly) - connection = self.connection_pool.get_connection(node) - return connection + return node @classmethod def _desc_node(cls, node_or_conn): @@ -530,7 +533,7 @@ def execute_command(self, *command_args, **parser_args): 1. single key [v] 2. no key [v] - 3. multiple key in one slot [v] + 3. multiple key in single slot [v] """ command = command_args[0] packed_command = self.packer_conn.pack_command(*command_args) @@ -539,7 +542,8 @@ def execute_command(self, *command_args, **parser_args): while ttl > 0: ttl -= 1 - connection = self.get_connection(command_args) + node = self.determine_node(command_args) + connection = self.connection_pool.get_connection(node) try: connection.send_packed_command(packed_command) return self.parse_response(connection, command, **parser_args) From 13c815c471ec56aa9f2d99c4dfd6b59a5ee0117a Mon Sep 17 00:00:00 2001 From: boyxuper Date: Tue, 31 Mar 2015 16:40:56 +0800 Subject: [PATCH 06/16] - discover_cluster before get node --- redis/cluster.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index f71b2244f0..c1da5bc04f 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -47,6 +47,8 @@ def _incr(self, by=1): return counter def get_node_for_slot(self, slot_id, readonly): + self.manager.discover_cluster() + if not readonly: return self.manager.get_master_node(slot_id) else: @@ -57,17 +59,19 @@ def get_node_for_key(self, key_name, readonly): return self.get_node_for_slot(Cluster.keyslot(key_name), readonly=readonly) def get_random_node(self, readonly): - counter = self._incr() + self.manager.discover_cluster() if readonly: nodes = self.manager.nodes else: nodes = self.manager.master_slaves.keys() - return list(nodes)[counter % len(nodes)] + return list(nodes)[self._incr() % len(nodes)] def get_shards_nodes(self, readonly): """one each shard""" + self.manager.discover_cluster() + for master, slaves in self.manager.master_slaves.items(): if readonly: nodes = list(slaves) + [master] @@ -495,7 +499,7 @@ def determine_node(self, command_args): elif node_flag == 'random': node = self.node_balancer.get_random_node(readonly=readonly) elif node_flag == 'slot-id': - node = self.node_balancer.get_node_for_slot(slot_id=command_args[1], readonly=readonly) + node = self.node_balancer.get_node_for_slot(slot_id=int(command_args[1]), readonly=readonly) elif command in self.COMMAND_PARSE_KEYS: slot_ids = set() for key_name in self.COMMAND_PARSE_KEYS[command](command_args): From 869501bfdf156794e023cb2d377397eaa3a8b6da Mon Sep 17 00:00:00 2001 From: boyxuper Date: Mon, 6 Apr 2015 01:25:04 +0800 Subject: [PATCH 07/16] - implements CLUSTER NODES/SLAVES command --- redis/client.py | 78 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 75 insertions(+), 3 deletions(-) diff --git a/redis/client.py b/redis/client.py index b2fb9c53ec..4709819db7 100755 --- a/redis/client.py +++ b/redis/client.py @@ -283,7 +283,7 @@ def parse_not_implemented(r, **options): def parse_cluster_slots(resp, **options): - current_host = options.get('current_host', None) + current_host = options.get('current_host', '') fix_server = lambda (host, port): (host or current_host, port) slots = {} @@ -298,6 +298,70 @@ def parse_cluster_slots(resp, **options): return slots +def parse_cluster_nodes(resp, **options): + """ + id The node ID, a 40 characters random string generated when a node is created and never changed again + (unless CLUSTER RESET HARD is used). + ip:port The node address where clients should contact the node to run queries. + flags A list of comma separated flags: myself, master, slave, fail?, fail, handshake, noaddr, noflags. + Flags are explained in detail in the next section. + master If the node is a slave, and the master is known, the master node ID, otherwise the "-" character. + ping-sent Milliseconds unix time at which the currently active ping was sent, or zero if there are no pending pings. + pong-recv Milliseconds unix time the last pong was received. + config-epoch The configuration epoch (or version) of the current node + (or of the current master if the node is a slave). + Each time there is a failover, a new, unique, monotonically increasing configuration epoch is created. + If multiple nodes claim to serve the same hash slots, the one with higher configuration epoch wins. + link-state The state of the link used for the node-to-node cluster bus. + We use this link to communicate with the node. Can be connected or disconnected. + slot An hash slot number or range. Starting from argument number 9, + but there may be up to 16384 entries in total (limit never reached). + This is the list of hash slots served by this node. If the entry is just a number, is parsed as such. + If it is a range, it is in the form start-end, and means that the node is responsible for all the hash slots + from start to end including the start and end values. + """ + current_host = options.get('current_host', '') + + def parse_slots(s): + slots = [] + for r in s.split(' '): + if '-' in r: + start, end = r.split('-') + slots.extend(range(int(start), int(end) + 1)) + else: + slots.append(int(r)) + + return slots + + nodes = {} + if isinstance(resp, basestring): + resp = resp.splitlines() + + for line in resp: + parts = line.split(' ', 8) + self_id, addr, flags, master_id, ping_sent, \ + pong_recv, config_epoch, link_state = parts[:8] + + host, port = addr.rsplit(':', 1) + + node = nodes[self_id] = { + 'id': self_id, + 'host': host or current_host, + 'port': int(port), + 'flags': tuple(flags.split(',')), + 'master': master_id if master_id != '-' else None, + 'ping-sent': int(ping_sent), + 'pong-recv': int(pong_recv), + 'link-state': link_state, + 'slots': [], + } + + if len(parts) >= 9: + node['slots'] = tuple(parse_slots(parts[8])) + + return nodes + + class StrictRedis(object): """ Implementation of the Redis protocol. @@ -394,13 +458,13 @@ class StrictRedis(object): 'CLUSTER INFO': parse_info, 'CLUSTER KEYSLOT': int, 'CLUSTER MEET': bool_ok, - 'CLUSTER NODES': parse_not_implemented, + 'CLUSTER NODES': parse_cluster_nodes, 'CLUSTER REPLICATE': bool_ok, 'CLUSTER RESET': bool_ok, 'CLUSTER SAVECONFIG': bool_ok, 'CLUSTER SET-CONFIG-EPOCH': bool_ok, 'CLUSTER SETSLOT': bool_ok, - 'CLUSTER SLAVES': parse_not_implemented, + 'CLUSTER SLAVES': parse_cluster_nodes, 'CLUSTER SLOTS': parse_cluster_slots, } ) @@ -687,6 +751,10 @@ def cluster_meet(self, host, port): """Force a node cluster to handshake with another node""" return self.execute_command('CLUSTER MEET', host, port) + def cluster_nodes(self): + """Force a node cluster to handshake with another node""" + return self.execute_command('CLUSTER NODES', current_host=self.host) + def cluster_replicate(self, node_id): """Reconfigure a node as a slave of the specified master node""" return self.execute_command('CLUSTER REPLICATE', node_id) @@ -714,6 +782,10 @@ def cluster_setslot(self, slot_id, state, node_id=None): else: raise RedisError('Invalid slot state: %s' % state) + def cluster_slaves(self, node_id): + """Force a node cluster to handshake with another node""" + return self.execute_command('CLUSTER SLAVES', node_id) + def cluster_slots(self): """Get array of Cluster slot to node mappings""" return self.execute_command('CLUSTER SLOTS', current_host=self.host) From e77a58198f76bb38e3921a86ea66142b782aaaac Mon Sep 17 00:00:00 2001 From: boyxuper Date: Mon, 6 Apr 2015 01:35:27 +0800 Subject: [PATCH 08/16] - I think parse_cluster_nodes should return a list --- redis/client.py | 27 ++++++--------------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/redis/client.py b/redis/client.py index 4709819db7..dc8f617bbe 100755 --- a/redis/client.py +++ b/redis/client.py @@ -300,25 +300,8 @@ def parse_cluster_slots(resp, **options): def parse_cluster_nodes(resp, **options): """ - id The node ID, a 40 characters random string generated when a node is created and never changed again - (unless CLUSTER RESET HARD is used). - ip:port The node address where clients should contact the node to run queries. - flags A list of comma separated flags: myself, master, slave, fail?, fail, handshake, noaddr, noflags. - Flags are explained in detail in the next section. - master If the node is a slave, and the master is known, the master node ID, otherwise the "-" character. - ping-sent Milliseconds unix time at which the currently active ping was sent, or zero if there are no pending pings. - pong-recv Milliseconds unix time the last pong was received. - config-epoch The configuration epoch (or version) of the current node - (or of the current master if the node is a slave). - Each time there is a failover, a new, unique, monotonically increasing configuration epoch is created. - If multiple nodes claim to serve the same hash slots, the one with higher configuration epoch wins. - link-state The state of the link used for the node-to-node cluster bus. - We use this link to communicate with the node. Can be connected or disconnected. - slot An hash slot number or range. Starting from argument number 9, - but there may be up to 16384 entries in total (limit never reached). - This is the list of hash slots served by this node. If the entry is just a number, is parsed as such. - If it is a range, it is in the form start-end, and means that the node is responsible for all the hash slots - from start to end including the start and end values. + @see: http://redis.io/commands/cluster-nodes # string + @see: http://redis.io/commands/cluster-slaves # list of string """ current_host = options.get('current_host', '') @@ -333,10 +316,10 @@ def parse_slots(s): return slots - nodes = {} if isinstance(resp, basestring): resp = resp.splitlines() + nodes = [] for line in resp: parts = line.split(' ', 8) self_id, addr, flags, master_id, ping_sent, \ @@ -344,7 +327,7 @@ def parse_slots(s): host, port = addr.rsplit(':', 1) - node = nodes[self_id] = { + node = { 'id': self_id, 'host': host or current_host, 'port': int(port), @@ -359,6 +342,8 @@ def parse_slots(s): if len(parts) >= 9: node['slots'] = tuple(parse_slots(parts[8])) + nodes.append(node) + return nodes From 77489de288492e58242c441cfedcee32b5580dca Mon Sep 17 00:00:00 2001 From: boyxuper Date: Mon, 6 Apr 2015 03:17:20 +0800 Subject: [PATCH 09/16] - tweak ClusterBalancer Interface - upgrade Cluster obtain information from cluster_nodes() - keep slot info in shard - better startup node management - keep node_id in cluster manager - fix typo and minor bugs - determine pubsub node earlier --- redis/cluster.py | 217 ++++++++++++++++++++++++++++------------------- 1 file changed, 130 insertions(+), 87 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index c1da5bc04f..bfba3e4167 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -13,17 +13,29 @@ ClusterSlotNotServedError, ClusterDownError, ) -# TODO: loose redis interface +# TODO: loose redis interface(cross slot ops) # TODO: advanced balancer # TODO: pipeline # TODO: script # TODO: pubsub +# TODO: lock +# TODO: ASK +# TODO: master slave changed +# TODO: master timed out +# TODO: slave timed out +# TODO: read from slave, but slave changed to master +# TODO: READWRITE/READONLY switching +# TODO: connection_pool (partially) rebuild +# TODO: check discover code +# TODO: every possible situation in cluster +# TODO: generator as interactive load balancer +# TODO: migrate tests LOGGER = logging #.getLogger(__name__) class ClusterBalancer(object): def get_node_for_key(self, key_name, readonly): - raise NotImplementedError() + return self.get_node_for_slot(Cluster.keyslot(key_name), readonly=readonly) def get_node_for_slot(self, slot_id, readonly): raise NotImplementedError() @@ -55,29 +67,25 @@ def get_node_for_slot(self, slot_id, readonly): nodes = self.manager.get_slave_nodes(slot_id, slave_only=False) return list(nodes)[self._incr() % len(nodes)] - def get_node_for_key(self, key_name, readonly): - return self.get_node_for_slot(Cluster.keyslot(key_name), readonly=readonly) - def get_random_node(self, readonly): self.manager.discover_cluster() if readonly: - nodes = self.manager.nodes + nodes = self.manager.nodes.keys() else: - nodes = self.manager.master_slaves.keys() + nodes = self.manager.shards.keys() - return list(nodes)[self._incr() % len(nodes)] + return nodes[self._incr() % len(nodes)] def get_shards_nodes(self, readonly): - """one each shard""" self.manager.discover_cluster() - for master, slaves in self.manager.master_slaves.items(): + for shard in self.manager.shards.values(): if readonly: - nodes = list(slaves) + [master] + nodes = list(shard['slaves']) + [shard['master']] yield list(nodes)[self._incr() % len(nodes)] else: - yield master + yield shard['master'] class ClusterParser(DefaultParser): @@ -106,14 +114,14 @@ class ClusterConnection(Connection): description_format = "ClusterConnection" def __init__(self, *args, **kwargs): - self.use_readonly = kwargs.pop('use_readonly', False) + self.readonly = kwargs.pop('readonly', False) kwargs['parser_class'] = ClusterParser super(ClusterConnection, self).__init__(*args, **kwargs) def on_connect(self): """Initialize the connection, set readonly is required""" super(ClusterConnection, self).on_connect() - if self.use_readonly: + if self.readonly: self.send_command('READONLY') if nativestr(self.read_response()) != 'OK': raise ResponseError('Cannot set READONLY flag') @@ -121,7 +129,10 @@ def on_connect(self): class Cluster(object): """keep knowledge of cluster""" + KEY_SLOTS = 16384 + # timeout for collecting from startup nodes + DEFAULT_TIMEOUT = 0.5 def __init__(self, startup_nodes=None, allow_partition=False, **cluster_kwargs): """allow_partition: raise Exception when partition appears or not.""" @@ -136,13 +147,17 @@ def __init__(self, startup_nodes=None, allow_partition=False, **cluster_kwargs): self.cluster_kwargs['decode_responses'] = True self.cluster_kwargs['password'] = cluster_kwargs.get('password') + self.cluster_kwargs.setdefault('socket_timeout', self.DEFAULT_TIMEOUT) self.allow_partition = allow_partition - self.master_slaves = {} - self.nodes = set(startup_nodes) + self.shards = {} # master_id -> shard_info + self.nodes = {} + self.startup_nodes = set(startup_nodes) # [(host, port), ...] self.slots = {} self.pubsub_node = None - self.state = None + + # version for keep generators work + self.slots_epoch = 0 @classmethod def keyslot(cls, key): @@ -158,69 +173,95 @@ def keyslot(cls, key): k = k[start + 1:end] return crc16(k) % cls.KEY_SLOTS - def discover_cluster(self, force=False): - if len(self.slots) == self.KEY_SLOTS and not force: + def discover_cluster(self, force=False, check_all=False): + """ + check_all: read from each startup nodes for detect cluster partition + force: do the discover action even cluster slot info is complete + """ + if not force and len(self.slots) == self.KEY_SLOTS: return - slots_node = {} - startup_nodes, self.nodes, self.master_slaves = self.nodes, set(), {} - for node in startup_nodes: - host, port = node - node_conn = StrictRedis(host, port, **self.cluster_kwargs) + self.nodes, self.shards = {}, {} + # TODO: discover more node dynamically + for startup_node in self.startup_nodes: try: - node_conn.ping() + nodes = StrictRedis(*startup_node, **self.cluster_kwargs).cluster_nodes() except ConnectionError: + LOGGER.warning('Startup node: %s:%s not responding in time.' % startup_node) continue - if self.state is None: - # TODO: upgrade to use CLUSTER NODES - self.state = node_conn.cluster_info()['cluster_state'] - - for (start, end), slot in node_conn.cluster_slots().items(): - for slot_id in range(start, end + 1): - if not self.allow_partition and slot_id in self.slots: - old_master = self.slots[slot_id]['master'] - if old_master != slot['master']: - raise ClusterPartitionError( - 'Cluster partition appears: slot #%s, node: [%s]:[%s] and [%s]:[%s]' % ( - slot_id, slots_node[slot_id], old_master, node, slot['master'])) - - self.master_slaves.setdefault(slot['master'], set()).update(slot['slaves']) - self.nodes.update([slot['master']] + slot['slaves']) - self.slots[slot_id] = { - 'master': slot['master'], - 'slaves': set(slot['slaves']), - } - slots_node[slot_id] = node - - if len(self.slots) == self.KEY_SLOTS: - self.pubsub_node = self.determine_pubsub_node() + # build shards + for node in nodes: + node_id = node['id'] + self.nodes[node_id] = { + 'connected': node['link-state'] == 'connected', + 'id': node['id'], + 'host': node['host'], + 'port': node['port'], + 'addr': (node['host'], node['port']), + 'flags': node['flags'], + 'master': node['master'], + 'is_master': not node['master'], + 'slots': node['slots'], + } + + if 'master' not in node['flags']: + continue + + self.shards[node_id] = { + 'master': node_id, + 'slaves': set(), + 'slots': node['slots'], + } + + # fill slaves & slots + for node in nodes: + shard_id = node['master'] or node['id'] + + if 'slave' in node['flags']: + self.shards[shard_id]['slaves'].add(node['id']) + + for slot_id in node['slots']: + old_master = self.slots.setdefault(slot_id, shard_id) + if old_master != shard_id: + raise ClusterPartitionError( + 'Cluster partition appears: slot #%s, node: %s:[%s] and %s:[%s]' % ( + slot_id, old_master, self.node_addr(old_master), + shard_id, self.node_addr(shard_id))) + else: + self.slots[slot_id] = shard_id + + if not check_all and len(self.slots) == self.KEY_SLOTS: break if not self.nodes: - self.nodes = startup_nodes - raise ClusterDownError('no startup node can be reached.') + raise ClusterDownError('No startup node can be reached. [\n%s\n]' % self.startup_nodes) + + self.startup_nodes = set([node['addr'] for node in self.nodes.values()]) + self.pubsub_node = self.determine_pubsub_node() + self.slots_epoch += 1 def get_master_node(self, slot_id): self.discover_cluster() try: - node = self.slots[slot_id] - except IndexError: + shard_id = self.slots[slot_id] + except KeyError: raise ClusterSlotNotServedError(slot_id) else: - return node['master'] + return self.shards[shard_id]['master'] def get_slave_nodes(self, slot_id, slave_only=True): self.discover_cluster() try: - node = self.slots[slot_id] - except IndexError: + shard_id = self.slots[slot_id] + except KeyError: raise ClusterSlotNotServedError(slot_id) else: + shard = self.shards[shard_id] if slave_only: - return list(node['slaves']) + return list(shard['slaves']) else: - return list(node['slaves']) + [node['master']] + return list(shard['slaves']) + [shard['master']] def determine_pubsub_node(self): """ @@ -229,25 +270,28 @@ def determine_pubsub_node(self): All clients in the cluster will talk to the same pubsub node to ensure all code stay compatible. See pubsub doc for more details why. - Allways use the server with highest port number + Always use the server with highest port number """ highest = -1 - node = None, None - for host, port in self.nodes: + node = None + for node in self.nodes.values(): + host, port = node['addr'] if port > highest: highest = port - node = host, port return node - def slot_moved(self, slot_id, node): - """signal from response""" - slot = self.slots.setdefault(slot_id, {'master': None, 'slaves': set()}) - slot['master'] = node - self.nodes.add(node) + def all_nodes(self): + return self.nodes.values() + + def node_addr(self, node_id): + return self.nodes[node_id]['addr'] - # FIXME: should we trigger reload? - self.master_slaves.setdefault(node, []) + def slot_moved(self, slot_id, addr): + """signal from response, target node should be master""" + # XXX: maybe no rebuild cluster? only current slot? + self.startup_nodes.add(addr) + self.discover_cluster(force=True) class ClusterConnectionPool(object): @@ -277,22 +321,20 @@ def __init__(self, manager, connection_class=ClusterConnection, def reset(self, force=False): self.manager.discover_cluster(force=force) self.pools = dict([ - (node, self.make_connection_pool(node)) - for node in self.manager.nodes + (node['addr'], self.make_connection_pool(node['addr'], not node['is_master'])) + for node in self.manager.all_nodes() ]) - def get_connection(self, node): + def get_connection(self, addr, command_name=None, *keys, **options): """Get a connection from the pool""" - return self.pools[node].get_connection(None) + return self.pools[addr].get_connection(command_name, *keys, **options) - def make_connection_pool(self, node): - """Create a new connection""" - host, port = node - use_readonly = node not in self.manager.master_slaves + def make_connection_pool(self, (host, port), readonly): + """Create a new connection pool""" return ConnectionPool(host=host, port=port, connection_class=self.connection_class, max_connections=self.max_connections, - use_readonly=use_readonly, + readonly=readonly, **self.connection_kwargs) def release(self, connection): @@ -497,9 +539,9 @@ def determine_node(self, command_args): if node_flag == 'blocked': raise ClusterError('Blocked command: %s' % command) elif node_flag == 'random': - node = self.node_balancer.get_random_node(readonly=readonly) + node_id = self.node_balancer.get_random_node(readonly=readonly) elif node_flag == 'slot-id': - node = self.node_balancer.get_node_for_slot(slot_id=int(command_args[1]), readonly=readonly) + node_id = self.node_balancer.get_node_for_slot(slot_id=int(command_args[1]), readonly=readonly) elif command in self.COMMAND_PARSE_KEYS: slot_ids = set() for key_name in self.COMMAND_PARSE_KEYS[command](command_args): @@ -508,12 +550,12 @@ def determine_node(self, command_args): if len(slot_ids) != 1: raise ClusterCrossSlotError() - node = self.node_balancer.get_node_for_slot(slot_id=slot_ids.pop(), readonly=readonly) + node_id = self.node_balancer.get_node_for_slot(slot_id=slot_ids.pop(), readonly=readonly) else: key_name = command_args[1] - node = self.node_balancer.get_node_for_key(key_name=key_name, readonly=readonly) + node_id = self.node_balancer.get_node_for_key(key_name=key_name, readonly=readonly) - return node + return node_id @classmethod def _desc_node(cls, node_or_conn): @@ -546,21 +588,22 @@ def execute_command(self, *command_args, **parser_args): while ttl > 0: ttl -= 1 - node = self.determine_node(command_args) - connection = self.connection_pool.get_connection(node) + node_id = self.determine_node(command_args) + node_addr = self.manager.node_addr(node_id) + connection = self.connection_pool.get_connection(node_addr) try: connection.send_packed_command(packed_command) return self.parse_response(connection, command, **parser_args) except BusyLoadingError: raise except (ConnectionError, TimeoutError) as e: + LOGGER.warning('Node %s: %s' % (e.__class__.__name__, self._desc_node(connection))) if ttl < self.COMMAND_TTL / 2: time.sleep(0.01) - LOGGER.warning('Node %s: %s' % (e.__class__.__name__, self._desc_node(connection))) except ClusterParser.MovedError as e: - self.manager.slot_moved(e.slot_id, e.node) - LOGGER.warning('slot moved: %s [%s] -> [%s]' % ( + LOGGER.warning('Slot moved: %s [%s] -> [%s]' % ( e.slot_id, self._desc_node(connection), self._desc_node(e.node))) + self.manager.slot_moved(e.slot_id, e.node) finally: self.connection_pool.release(connection) From fb81c1c48a8ea0505c501765977aaacc782756c5 Mon Sep 17 00:00:00 2001 From: boyxuper Date: Mon, 6 Apr 2015 03:26:19 +0800 Subject: [PATCH 10/16] - reorder TODO - remove unused code --- redis/client.py | 4 ---- redis/cluster.py | 18 +++++++++--------- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/redis/client.py b/redis/client.py index dc8f617bbe..5c277a56ba 100755 --- a/redis/client.py +++ b/redis/client.py @@ -278,10 +278,6 @@ def parse_slowlog_get(response, **options): } for item in response] -def parse_not_implemented(r, **options): - raise NotImplementedError() - - def parse_cluster_slots(resp, **options): current_host = options.get('current_host', '') fix_server = lambda (host, port): (host or current_host, port) diff --git a/redis/cluster.py b/redis/cluster.py index bfba3e4167..1fe2576ed2 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -13,23 +13,23 @@ ClusterSlotNotServedError, ClusterDownError, ) -# TODO: loose redis interface(cross slot ops) -# TODO: advanced balancer -# TODO: pipeline -# TODO: script -# TODO: pubsub -# TODO: lock # TODO: ASK +# TODO: pipeline +# TODO: generator as interactive load balancer # TODO: master slave changed # TODO: master timed out # TODO: slave timed out -# TODO: read from slave, but slave changed to master # TODO: READWRITE/READONLY switching # TODO: connection_pool (partially) rebuild -# TODO: check discover code # TODO: every possible situation in cluster -# TODO: generator as interactive load balancer + +# TODO: read from slave, but slave changed to master +# TODO: pubsub +# TODO: lock +# TODO: advanced balancer +# TODO: loose redis interface(cross slot ops) # TODO: migrate tests +# TODO: script LOGGER = logging #.getLogger(__name__) From 2355061557b40632dcbf54292f494257967ad51d Mon Sep 17 00:00:00 2001 From: boyxuper Date: Mon, 6 Apr 2015 14:25:54 +0800 Subject: [PATCH 11/16] - handle ASK redirection - parse CLUSTER NODES in migration states --- redis/client.py | 24 ++++++++++++--- redis/cluster.py | 77 ++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 84 insertions(+), 17 deletions(-) diff --git a/redis/client.py b/redis/client.py index 5c277a56ba..63ba6ece55 100755 --- a/redis/client.py +++ b/redis/client.py @@ -302,15 +302,29 @@ def parse_cluster_nodes(resp, **options): current_host = options.get('current_host', '') def parse_slots(s): - slots = [] + slots, migrations = [], [] for r in s.split(' '): - if '-' in r: + if '->-' in r: + slot_id, dst_node_id = r[1:-1].split('->-', 1) + migrations.append({ + 'slot': int(slot_id), + 'node_id': dst_node_id, + 'state': 'migrating' + }) + elif '-<-' in r: + slot_id, src_node_id = r[1:-1].split('-<-', 1) + migrations.append({ + 'slot': int(slot_id), + 'node_id': src_node_id, + 'state': 'importing' + }) + elif '-' in r: start, end = r.split('-') slots.extend(range(int(start), int(end) + 1)) else: slots.append(int(r)) - return slots + return slots, migrations if isinstance(resp, basestring): resp = resp.splitlines() @@ -333,10 +347,12 @@ def parse_slots(s): 'pong-recv': int(pong_recv), 'link-state': link_state, 'slots': [], + 'migrations': [], } if len(parts) >= 9: - node['slots'] = tuple(parse_slots(parts[8])) + slots, migrations = parse_slots(parts[8]) + node['slots'], node['migrations'] = tuple(slots), migrations nodes.append(node) diff --git a/redis/cluster.py b/redis/cluster.py index 1fe2576ed2..d01233f460 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1,4 +1,18 @@ """cluster support ported from https://github.com/Grokzen/redis-py-cluster + +MY GOALS: + +1. expose abilities which redis cluster provided + - high availability, endure partially slot coverage + - scaling up read operations using slave nodes + - interactive load balance interface via python generator +2. Strict interface provides + - commands without ambiguity + - multiple-key operations MUST located in single slot +3. Loose interface provides + - cross slot helper methods +4. ability to adapt tornado's Future, via some external component(not-included) +5. let exception just raise to user if it's not in redis protocol. """ import time import logging @@ -13,7 +27,7 @@ ClusterSlotNotServedError, ClusterDownError, ) -# TODO: ASK +# TODO: handle TryAgain # TODO: pipeline # TODO: generator as interactive load balancer # TODO: master slave changed @@ -30,7 +44,9 @@ # TODO: loose redis interface(cross slot ops) # TODO: migrate tests # TODO: script -LOGGER = logging #.getLogger(__name__) +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) +LOGGER.addHandler(logging.StreamHandler()) class ClusterBalancer(object): @@ -90,20 +106,32 @@ def get_shards_nodes(self, readonly): class ClusterParser(DefaultParser): class AskError(ResponseError): + """ + src node: MIGRATING to dst node + get > ASK error + ask dst node > ASKING command + dst node: IMPORTING from src node + asking command only affects next command + any op will be allowed after asking command + """ def __init__(self, resp): - print resp - - class MovedError(ResponseError): - def __init__(self, resp): - """redis only redirect to master node""" + """should only redirect to master node""" slot_id, new_node = resp.split(' ') host, port = new_node.rsplit(':', 1) self.slot_id = int(slot_id) - self.node = self.host, self.port = host, int(port) + self.node_addr = self.host, self.port = host, int(port) + + class TryAgainError(ResponseError): + def __init__(self, resp): + pass + + class MovedError(AskError): + pass EXCEPTION_CLASSES = dict_merge( DefaultParser.EXCEPTION_CLASSES, { 'ASK': AskError, + 'TRYAGAIN': TryAgainError, 'MOVED': MovedError, 'CLUSTERDOWN': ClusterDownError, 'CROSSSLOT': ClusterCrossSlotError, @@ -293,6 +321,10 @@ def slot_moved(self, slot_id, addr): self.startup_nodes.add(addr) self.discover_cluster(force=True) + def ask_node(self, slot_id, addr): + """signal from response, target node should be master""" + self.startup_nodes.add(addr) + class ClusterConnectionPool(object): """connection pool for redis cluster @@ -585,13 +617,26 @@ def execute_command(self, *command_args, **parser_args): packed_command = self.packer_conn.pack_command(*command_args) ttl = self.COMMAND_TTL + redirect_addr = None + asking = False while ttl > 0: ttl -= 1 - node_id = self.determine_node(command_args) - node_addr = self.manager.node_addr(node_id) + if not redirect_addr: + node_id = self.determine_node(command_args) + node_addr = self.manager.node_addr(node_id) + else: + node_addr, redirect_addr = redirect_addr, None + connection = self.connection_pool.get_connection(node_addr) try: + if asking: + asking = False + connection.send_command('ASKING') + resp = self.parse_response(connection, 'ASKING') + if resp != 'OK': + raise ResponseError('ASKING %s is %s' % (self._desc_node(connection), resp)) + connection.send_packed_command(packed_command) return self.parse_response(connection, command, **parser_args) except BusyLoadingError: @@ -601,9 +646,15 @@ def execute_command(self, *command_args, **parser_args): if ttl < self.COMMAND_TTL / 2: time.sleep(0.01) except ClusterParser.MovedError as e: - LOGGER.warning('Slot moved: %s [%s] -> [%s]' % ( - e.slot_id, self._desc_node(connection), self._desc_node(e.node))) - self.manager.slot_moved(e.slot_id, e.node) + LOGGER.warning('MOVED: %s [%s] -> [%s]' % ( + e.slot_id, self._desc_node(connection), self._desc_node(e.node_addr))) + self.manager.slot_moved(e.slot_id, e.node_addr) + redirect_addr = e.node_addr + except ClusterParser.AskError as e: + LOGGER.warning('ASK redirect: %s [%s] -> [%s]' % ( + e.slot_id, self._desc_node(connection), self._desc_node(e.node_addr))) + self.manager.ask_node(e.slot_id, e.node_addr) + redirect_addr, asking = e.node_addr, True finally: self.connection_pool.release(connection) From 53e362914a8e77ca7b453365a01d2d6a63a5c303 Mon Sep 17 00:00:00 2001 From: boyxuper Date: Mon, 6 Apr 2015 23:00:57 +0800 Subject: [PATCH 12/16] - parse ASKING/READONLY/READWRITE as bool_ok - save a round trip for ASKING --- redis/client.py | 3 +++ redis/cluster.py | 40 +++++++++++++++++++++++++++++++++------- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/redis/client.py b/redis/client.py index 63ba6ece55..0b9837a0f7 100755 --- a/redis/client.py +++ b/redis/client.py @@ -463,6 +463,9 @@ class StrictRedis(object): 'CLUSTER SETSLOT': bool_ok, 'CLUSTER SLAVES': parse_cluster_nodes, 'CLUSTER SLOTS': parse_cluster_slots, + 'ASKING': bool_ok, + 'READONLY': bool_ok, + 'READWRITE': bool_ok, } ) diff --git a/redis/cluster.py b/redis/cluster.py index d01233f460..3d394743ba 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -65,7 +65,7 @@ def get_shards_nodes(self, readonly): class RoundRobinClusterNodeBalancer(ClusterBalancer): - RR_COUNTER = 1 + RR_COUNTER = 0 def __init__(self, manager): self.manager = manager @@ -601,6 +601,32 @@ def execute_connection_command(self, connection, command_args, parser_args=None) connection.send_command(*command_args) return self.parse_response(connection, command, **parser_args or {}) + def stack_commands(self, connection): + """like a pipeline, collect and execute and parse""" + + # collect commands + stack = [] + while True: + command = yield len(stack) + if command is None: + break + + if not isinstance(command, tuple) or len(command) != 2: + raise ValueError('command should be 2-tuple (command_args, parser_args)') + + stack.append(command) + + packed_command = connection.pack_commands(cmd_args for cmd_args, _ in stack) + connection.send_packed_command(packed_command) + for command_args, parse_args in stack: + # ensure all responses are consumed + try: + resp = self.parse_response(connection, command_args[0], **parse_args) + except Exception as e: + resp = e + + yield resp + def execute_command(self, *command_args, **parser_args): """Send a command to a node in the cluster SINGLE & SIMPLE MODE @@ -632,12 +658,12 @@ def execute_command(self, *command_args, **parser_args): try: if asking: asking = False - connection.send_command('ASKING') - resp = self.parse_response(connection, 'ASKING') - if resp != 'OK': - raise ResponseError('ASKING %s is %s' % (self._desc_node(connection), resp)) - - connection.send_packed_command(packed_command) + multi_command = connection.pack_commands((('ASKING', ), command_args)) + connection.send_packed_command(multi_command) + if not self.parse_response(connection, 'ASKING'): + raise ResponseError('ASKING %s is not OK' % (self._desc_node(connection))) + else: + connection.send_packed_command(packed_command) return self.parse_response(connection, command, **parser_args) except BusyLoadingError: raise From 4323631a4f3e3b495433f1770da285e707b44bdc Mon Sep 17 00:00:00 2001 From: boyxuper Date: Mon, 6 Apr 2015 23:09:21 +0800 Subject: [PATCH 13/16] - handle TryAgainError and prevent re-determine node --- redis/cluster.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index 3d394743ba..379b2f12ef 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -27,7 +27,6 @@ ClusterSlotNotServedError, ClusterDownError, ) -# TODO: handle TryAgain # TODO: pipeline # TODO: generator as interactive load balancer # TODO: master slave changed @@ -36,13 +35,13 @@ # TODO: READWRITE/READONLY switching # TODO: connection_pool (partially) rebuild # TODO: every possible situation in cluster +# TODO: migrate test cases from redis-py-cluster # TODO: read from slave, but slave changed to master # TODO: pubsub # TODO: lock # TODO: advanced balancer # TODO: loose redis interface(cross slot ops) -# TODO: migrate tests # TODO: script LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) @@ -602,8 +601,10 @@ def execute_connection_command(self, connection, command_args, parser_args=None) return self.parse_response(connection, command, **parser_args or {}) def stack_commands(self, connection): - """like a pipeline, collect and execute and parse""" + """like a pipeline, collect and execute and parse + """ + # TODO: debug code # collect commands stack = [] while True: @@ -670,7 +671,14 @@ def execute_command(self, *command_args, **parser_args): except (ConnectionError, TimeoutError) as e: LOGGER.warning('Node %s: %s' % (e.__class__.__name__, self._desc_node(connection))) if ttl < self.COMMAND_TTL / 2: - time.sleep(0.01) + time.sleep(0.05) + except ClusterParser.TryAgainError: + LOGGER.warning('Cluster in unstable state.') + if ttl < self.COMMAND_TTL / 2: + time.sleep(0.05) + + # prevent re-determine node + redirect_addr = node_addr except ClusterParser.MovedError as e: LOGGER.warning('MOVED: %s [%s] -> [%s]' % ( e.slot_id, self._desc_node(connection), self._desc_node(e.node_addr))) From 3b281b13868401f9e1e0848aeacbcd41ea99a825 Mon Sep 17 00:00:00 2001 From: boyxuper Date: Tue, 7 Apr 2015 07:03:02 +0800 Subject: [PATCH 14/16] - PFCOUNT is a multi-key command - ordering TODOs --- redis/cluster.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index 379b2f12ef..8767ccdfb1 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -27,15 +27,15 @@ ClusterSlotNotServedError, ClusterDownError, ) -# TODO: pipeline -# TODO: generator as interactive load balancer +# TODO: every possible situation in cluster # TODO: master slave changed # TODO: master timed out # TODO: slave timed out # TODO: READWRITE/READONLY switching # TODO: connection_pool (partially) rebuild -# TODO: every possible situation in cluster # TODO: migrate test cases from redis-py-cluster +# TODO: pipeline +# TODO: generator as interactive load balancer # TODO: read from slave, but slave changed to master # TODO: pubsub @@ -448,7 +448,7 @@ class StrictClusterRedis(StrictRedis): ], lambda args: args[1::2]), dict.fromkeys([ 'DEL', 'RPOPLPUSH', 'RENAME', 'RENAMENX', 'SMOVE', 'SDIFF', 'SDIFFSTORE', - 'SINTER', 'SINTERSTORE', 'SUNION', 'SUNIONSTORE', 'PFMERGE', 'MGET', + 'SINTER', 'SINTERSTORE', 'SUNION', 'SUNIONSTORE', 'PFMERGE', 'MGET', 'PFCOUNT', ], lambda args: args[1:]), { 'BITOP': lambda args: args[2:], From 5c4ed919fb7370e58c7db78dd5e5a40f9d615db9 Mon Sep 17 00:00:00 2001 From: boyxuper Date: Tue, 7 Apr 2015 13:01:24 +0800 Subject: [PATCH 15/16] - add references & TODO --- redis/cluster.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/redis/cluster.py b/redis/cluster.py index 8767ccdfb1..761d8a2c03 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -13,6 +13,18 @@ - cross slot helper methods 4. ability to adapt tornado's Future, via some external component(not-included) 5. let exception just raise to user if it's not in redis protocol. + + +REFERENCES: + +about pipeline: +@see: https://groups.google.com/forum/#!topic/jedis_redis/u6j8slokO3E +@see: https://groups.google.com/forum/#!msg/redis-db/4I0ELYnf3bk/Lrctk0ULm6AJ + +about readonly slaves: +@see: https://github.com/antirez/redis/issues/2216 +@see: https://github.com/xetorthio/jedis/issues/790 + """ import time import logging @@ -27,6 +39,7 @@ ClusterSlotNotServedError, ClusterDownError, ) +# TODO: partially update cluster slot info # TODO: every possible situation in cluster # TODO: master slave changed # TODO: master timed out From 8123f7af392660fb93b5312e77afd1b05fdb31c2 Mon Sep 17 00:00:00 2001 From: boyxuper Date: Tue, 7 Apr 2015 17:26:15 +0800 Subject: [PATCH 16/16] - BUGFIX: COMMAND_PARSE_KEYS should be a dict - BUGFIX: ClusterCrossSlotError & ClusterSlotNotServedError should be a ResponseError, so that parse will raise it --- redis/cluster.py | 2 +- redis/exceptions.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index 761d8a2c03..3b10c97176 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -466,7 +466,7 @@ class StrictClusterRedis(StrictRedis): { 'BITOP': lambda args: args[2:], }, - ), + ) READONLY_COMMANDS = { # single key ops # - bits diff --git a/redis/exceptions.py b/redis/exceptions.py index d6acc8cab9..7467204c75 100644 --- a/redis/exceptions.py +++ b/redis/exceptions.py @@ -75,11 +75,11 @@ class ClusterError(RedisError): pass -class ClusterCrossSlotError(RedisError): - pass +class ClusterCrossSlotError(ResponseError): + message = "Keys in request don't hash to the same slot" -class ClusterSlotNotServedError(RedisError): +class ClusterSlotNotServedError(ResponseError): pass