Skip to content

Commit eb34e05

Browse files
xraywggivo
andauthored
Run pipeline in current thread if all the keys on same node (#4149)
* perf:last node run in current thread directly * fix: connection leak, we should return it to connection pool * noop sync when pipelinedResponses.isEmpty() * revert rename * clean * remove last node run in current thread when multi node * add test for pipeline all keys at same node * fix: make all keys on same node * formatting --------- Co-authored-by: ggivo <[email protected]>
1 parent fba0f97 commit eb34e05

File tree

2 files changed

+71
-12
lines changed

2 files changed

+71
-12
lines changed

src/main/java/redis/clients/jedis/MultiNodePipelineBase.java

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.Map;
88
import java.util.Queue;
99
import java.util.concurrent.CountDownLatch;
10+
import java.util.concurrent.Executor;
1011
import java.util.concurrent.ExecutorService;
1112
import java.util.concurrent.Executors;
1213

@@ -85,17 +86,27 @@ public final void sync() {
8586
}
8687
syncing = true;
8788

88-
ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);
89+
boolean multiNode = pipelinedResponses.size() > 1;
90+
Executor executor;
91+
ExecutorService executorService = null;
92+
if (multiNode) {
93+
executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);
94+
executor = executorService;
95+
} else {
96+
executor = Runnable::run;
97+
}
98+
CountDownLatch countDownLatch = multiNode
99+
? new CountDownLatch(pipelinedResponses.size())
100+
: null;
89101

90-
CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size());
91-
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
92-
= pipelinedResponses.entrySet().iterator();
102+
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator = pipelinedResponses.entrySet()
103+
.iterator();
93104
while (pipelinedResponsesIterator.hasNext()) {
94105
Map.Entry<HostAndPort, Queue<Response<?>>> entry = pipelinedResponsesIterator.next();
95106
HostAndPort nodeKey = entry.getKey();
96107
Queue<Response<?>> queue = entry.getValue();
97108
Connection connection = connections.get(nodeKey);
98-
executorService.submit(() -> {
109+
executor.execute(() -> {
99110
try {
100111
List<Object> unformatted = connection.getMany(queue.size());
101112
for (Object o : unformatted) {
@@ -104,22 +115,27 @@ public final void sync() {
104115
} catch (JedisConnectionException jce) {
105116
log.error("Error with connection to " + nodeKey, jce);
106117
// cleanup the connection
118+
// TODO these operations not thread-safe and when executed here, the iter may moved
107119
pipelinedResponsesIterator.remove();
108120
connections.remove(nodeKey);
109121
IOUtils.closeQuietly(connection);
110122
} finally {
111-
countDownLatch.countDown();
123+
if (multiNode) {
124+
countDownLatch.countDown();
125+
}
112126
}
113127
});
114128
}
115129

116-
try {
117-
countDownLatch.await();
118-
} catch (InterruptedException e) {
119-
log.error("Thread is interrupted during sync.", e);
120-
}
130+
if (multiNode) {
131+
try {
132+
countDownLatch.await();
133+
} catch (InterruptedException e) {
134+
log.error("Thread is interrupted during sync.", e);
135+
}
121136

122-
executorService.shutdownNow();
137+
executorService.shutdownNow();
138+
}
123139

124140
syncing = false;
125141
}

src/test/java/redis/clients/jedis/ClusterPipeliningTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1112,6 +1112,49 @@ public void multiple() {
11121112
}
11131113
}
11141114

1115+
@Test
1116+
public void testPipelineKeysAtSameNode() {
1117+
try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG)) {
1118+
1119+
// test simple key
1120+
cluster.set("foo", "bar");
1121+
1122+
try (ClusterPipeline pipeline = cluster.pipelined()) {
1123+
Response<String> foo = pipeline.get("foo");
1124+
pipeline.sync();
1125+
1126+
assertEquals("bar", foo.get());
1127+
}
1128+
1129+
// test multi key but at same node
1130+
int cnt = 3;
1131+
String prefix = "{foo}:";
1132+
for (int i = 0; i < cnt; i++) {
1133+
String key = prefix + i;
1134+
cluster.set(key, String.valueOf(i));
1135+
}
1136+
1137+
try (ClusterPipeline pipeline = cluster.pipelined()) {
1138+
List<Response<String>> results = new ArrayList<>();
1139+
for (int i = 0; i < cnt; i++) {
1140+
String key = prefix + i;
1141+
results.add(pipeline.get(key));
1142+
}
1143+
1144+
Response<Object> foo = pipeline.eval("return redis.call('get', KEYS[1])",
1145+
Collections.singletonList("foo"), Collections.emptyList());
1146+
1147+
pipeline.sync();
1148+
int idx = 0;
1149+
for (Response<String> res : results) {
1150+
assertEquals(String.valueOf(idx), res.get());
1151+
idx++;
1152+
}
1153+
assertEquals("bar", String.valueOf(foo.get()));
1154+
}
1155+
}
1156+
}
1157+
11151158
private static void assertThreadsCount() {
11161159
// Get the root thread group
11171160
final ThreadGroup rootGroup = Thread.currentThread().getThreadGroup().getParent();

0 commit comments

Comments
 (0)