From 310a80dba02d32bba7beeae5cc90c326294ed5fe Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 17 Oct 2015 00:27:39 +0800 Subject: [PATCH 1/3] Support mget, mapped_mget, mset and mapped_mset commands for distributed redis. It is necessary of multi get and set commands for distributed redis. It will be slow when calling many commands one by one. Executing many get commands in a batch will be fast. Commands will be splitted to different groups based on nodes for distributed redis and then execute multiple style command by node. Response will be composed finally. --- lib/redis.rb | 2 +- lib/redis/distributed.rb | 54 ++++++++++++++++++-- test/distributed_commands_on_strings_test.rb | 38 +++++++++----- 3 files changed, 77 insertions(+), 17 deletions(-) diff --git a/lib/redis.rb b/lib/redis.rb index 6b7115b70..44ef2a67f 100644 --- a/lib/redis.rb +++ b/lib/redis.rb @@ -2032,7 +2032,7 @@ def punsubscribe(*channels) end end - # Inspect the state of the Pub/Sub subsystem. + # Inspect the state of the Pub/Sub subsystem. # Possible subcommands: channels, numsub, numpat. def pubsub(subcommand, *args) synchronize do |client| diff --git a/lib/redis/distributed.rb b/lib/redis/distributed.rb index 4bda232a4..ad3c1b01a 100644 --- a/lib/redis/distributed.rb +++ b/lib/redis/distributed.rb @@ -256,11 +256,15 @@ def setnx(key, value) # Set multiple keys to multiple values. def mset(*args) - raise CannotDistribute, :mset + args.flatten! + node_args_hash = split_args_for_nodes(args) + node_args_hash.each do |node, part_args| + node.mset *part_args + end end def mapped_mset(hash) - raise CannotDistribute, :mapped_mset + mset(hash.to_a.flatten) end # Set multiple keys to multiple values, only if none of the keys exist. @@ -279,11 +283,25 @@ def get(key) # Get the values of all the given keys. def mget(*keys) - raise CannotDistribute, :mget + # Split keys based on node + node_keys_hash = split_keys_for_nodes(keys) + + # mget with node & set value to result + result = Array.new keys.size + node_keys_hash.each do |node, payload| + values = node.mget(*payload[:keys]) + + values.each_with_index do |value, index| + key_index = payload[:index][index] + result[key_index] = value + end + end + result end def mapped_mget(*keys) - raise CannotDistribute, :mapped_mget + values = mget(*keys) + Hash[keys.zip(values)] end # Overwrite part of a string at key starting at the specified offset. @@ -839,6 +857,34 @@ def dup protected + def split_keys_for_nodes(keys) + result = Hash.new + keys.each_with_index do |key, index| + node = node_for(key) + if !(payload = result[node]) + payload = result[node] = {} + payload[:keys] = [] + payload[:index] = [] + end + payload[:keys] << key + payload[:index] << index + end + result + end + + def split_args_for_nodes(args) + result = Hash.new + args.each_slice(2) do |key, value| + node = node_for(key) + if !(payload = result[node]) + payload = result[node] = [] + end + payload << key + payload << value + end + result + end + def on_each_node(command, *args) nodes.map do |node| node.send(command, *args) diff --git a/test/distributed_commands_on_strings_test.rb b/test/distributed_commands_on_strings_test.rb index ad83c12e5..f25364159 100644 --- a/test/distributed_commands_on_strings_test.rb +++ b/test/distributed_commands_on_strings_test.rb @@ -9,27 +9,41 @@ class TestDistributedCommandsOnStrings < Test::Unit::TestCase include Lint::Strings def test_mget - assert_raise Redis::Distributed::CannotDistribute do - r.mget("foo", "bar") - end + r.set("foo", "s1") + r.set("bar", "s2") + + assert_equal ["s1", "s2"] , r.mget("foo", "bar") + assert_equal ["s1", "s2", nil], r.mget("foo", "bar", "baz") end def test_mget_mapped - assert_raise Redis::Distributed::CannotDistribute do - r.mapped_mget("foo", "bar") - end + r.set("foo", "s1") + r.set("bar", "s2") + + response = r.mapped_mget("foo", "bar") + + assert_equal "s1", response["foo"] + assert_equal "s2", response["bar"] + + response = r.mapped_mget("foo", "bar", "baz") + + assert_equal "s1", response["foo"] + assert_equal "s2", response["bar"] + assert_equal nil , response["baz"] end def test_mset - assert_raise Redis::Distributed::CannotDistribute do - r.mset(:foo, "s1", :bar, "s2") - end + r.mset(:foo, "s1", :bar, "s2") + + assert_equal "s1", r.get("foo") + assert_equal "s2", r.get("bar") end def test_mset_mapped - assert_raise Redis::Distributed::CannotDistribute do - r.mapped_mset(:foo => "s1", :bar => "s2") - end + r.mapped_mset(:foo => "s1", :bar => "s2") + + assert_equal "s1", r.get("foo") + assert_equal "s2", r.get("bar") end def test_msetnx From 3c2e1f12a2c8d827d47bc522bf04a64770448e62 Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 17 Oct 2015 01:04:14 +0800 Subject: [PATCH 2/3] Fix test case. --- test/lint/hashes.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/lint/hashes.rb b/test/lint/hashes.rb index 649e6673f..3fc9813f3 100644 --- a/test/lint/hashes.rb +++ b/test/lint/hashes.rb @@ -147,15 +147,15 @@ def test_hincrbyfloat target_version "2.5.4" do r.hincrbyfloat("foo", "f1", 1.23) - assert_equal "1.23", r.hget("foo", "f1") + assert_equal "1.23", r.hget("foo", "f1").to_f.round(2).to_s r.hincrbyfloat("foo", "f1", 0.77) - assert_equal "2", r.hget("foo", "f1") + assert_equal "2", r.hget("foo", "f1").to_f.round.to_s r.hincrbyfloat("foo", "f1", -0.1) - assert_equal "1.9", r.hget("foo", "f1") + assert_equal "1.9", r.hget("foo", "f1").to_f.round(1).to_s end end end From 1b052ce7a226e46c6d02f55e8124203ef4c4a12d Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 17 Oct 2015 01:16:30 +0800 Subject: [PATCH 3/3] Fix test case. --- test/lint/hashes.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/lint/hashes.rb b/test/lint/hashes.rb index 3fc9813f3..051630845 100644 --- a/test/lint/hashes.rb +++ b/test/lint/hashes.rb @@ -151,7 +151,7 @@ def test_hincrbyfloat r.hincrbyfloat("foo", "f1", 0.77) - assert_equal "2", r.hget("foo", "f1").to_f.round.to_s + assert_equal "2", r.hget("foo", "f1").to_f.round(0).to_s r.hincrbyfloat("foo", "f1", -0.1)