diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index 247069410a..05bcbaaced 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -7,6 +7,7 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -85,17 +86,27 @@ public final void sync() { } syncing = true; - ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); + boolean multiNode = pipelinedResponses.size() > 1; + Executor executor; + ExecutorService executorService = null; + if (multiNode) { + executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); + executor = executorService; + } else { + executor = Runnable::run; + } + CountDownLatch countDownLatch = multiNode + ? new CountDownLatch(pipelinedResponses.size()) + : null; - CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size()); - Iterator>>> pipelinedResponsesIterator - = pipelinedResponses.entrySet().iterator(); + Iterator>>> pipelinedResponsesIterator = pipelinedResponses.entrySet() + .iterator(); while (pipelinedResponsesIterator.hasNext()) { Map.Entry>> entry = pipelinedResponsesIterator.next(); HostAndPort nodeKey = entry.getKey(); Queue> queue = entry.getValue(); Connection connection = connections.get(nodeKey); - executorService.submit(() -> { + executor.execute(() -> { try { List unformatted = connection.getMany(queue.size()); for (Object o : unformatted) { @@ -104,22 +115,27 @@ public final void sync() { } catch (JedisConnectionException jce) { log.error("Error with connection to " + nodeKey, jce); // cleanup the connection + // TODO these operations not thread-safe and when executed here, the iter may moved pipelinedResponsesIterator.remove(); connections.remove(nodeKey); IOUtils.closeQuietly(connection); } finally { - countDownLatch.countDown(); + if (multiNode) { + countDownLatch.countDown(); + } } }); } - try { - countDownLatch.await(); - } catch (InterruptedException e) { - log.error("Thread is interrupted during sync.", e); - } + if (multiNode) { + try { + countDownLatch.await(); + } catch (InterruptedException e) { + log.error("Thread is interrupted during sync.", e); + } - executorService.shutdownNow(); + executorService.shutdownNow(); + } syncing = false; } diff --git a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java index 0c6667d2f9..f4d6a9bf07 100644 --- a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java +++ b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java @@ -1112,6 +1112,49 @@ public void multiple() { } } + @Test + public void testPipelineKeysAtSameNode() { + try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG)) { + + // test simple key + cluster.set("foo", "bar"); + + try (ClusterPipeline pipeline = cluster.pipelined()) { + Response foo = pipeline.get("foo"); + pipeline.sync(); + + assertEquals("bar", foo.get()); + } + + // test multi key but at same node + int cnt = 3; + String prefix = "{foo}:"; + for (int i = 0; i < cnt; i++) { + String key = prefix + i; + cluster.set(key, String.valueOf(i)); + } + + try (ClusterPipeline pipeline = cluster.pipelined()) { + List> results = new ArrayList<>(); + for (int i = 0; i < cnt; i++) { + String key = prefix + i; + results.add(pipeline.get(key)); + } + + Response foo = pipeline.eval("return redis.call('get', KEYS[1])", + Collections.singletonList("foo"), Collections.emptyList()); + + pipeline.sync(); + int idx = 0; + for (Response res : results) { + assertEquals(String.valueOf(idx), res.get()); + idx++; + } + assertEquals("bar", String.valueOf(foo.get())); + } + } + } + private static void assertThreadsCount() { // Get the root thread group final ThreadGroup rootGroup = Thread.currentThread().getThreadGroup().getParent();