Skip to content

Commit 22e1e53

Browse files
committed
Dispatch messages in dedicated thread
Each connection has now a dedicated single-threaded executor to dispatch messages. This is especially suited for long consumers, as they could block the handling of other frames sent by the server. Other server frames are now handled by a shared executor service. The default implementation maintains a list of executor services shared between all connections maintained by the environment instances. The list grows and shrinks depending on the usage.
1 parent efe89c2 commit 22e1e53

File tree

8 files changed

+533
-5
lines changed

8 files changed

+533
-5
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import com.rabbitmq.stream.ChunkChecksum;
5656
import com.rabbitmq.stream.Codec;
5757
import com.rabbitmq.stream.Codec.EncodedMessage;
58+
import com.rabbitmq.stream.Constants;
5859
import com.rabbitmq.stream.Environment;
5960
import com.rabbitmq.stream.Message;
6061
import com.rabbitmq.stream.MessageBuilder;
@@ -68,6 +69,7 @@
6869
import com.rabbitmq.stream.impl.Client.ShutdownContext.ShutdownReason;
6970
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandler;
7071
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo;
72+
import com.rabbitmq.stream.impl.Utils.NamedThreadFactory;
7173
import com.rabbitmq.stream.metrics.MetricsCollector;
7274
import com.rabbitmq.stream.metrics.NoOpMetricsCollector;
7375
import com.rabbitmq.stream.sasl.CredentialsProvider;
@@ -179,6 +181,7 @@ public class Client implements AutoCloseable {
179181
new ConcurrentHashMap<>();
180182
final List<SubscriptionOffset> subscriptionOffsets = new CopyOnWriteArrayList<>();
181183
final ExecutorService executorService;
184+
final ExecutorService dispatchingExecutorService;
182185
final TuneState tuneState;
183186
final AtomicBoolean closing = new AtomicBoolean(false);
184187
final ChunkChecksum chunkChecksum;
@@ -348,8 +351,37 @@ public void initChannel(SocketChannel ch) {
348351

349352
this.channel = f.channel();
350353
this.nettyClosing = Utils.makeIdempotent(this::closeNetty);
351-
this.executorService = Executors.newSingleThreadExecutor();
352-
this.executorServiceClosing = Utils.makeIdempotent(this.executorService::shutdownNow);
354+
ExecutorServiceFactory executorServiceFactory = parameters.executorServiceFactory;
355+
if (executorServiceFactory == null) {
356+
this.executorService =
357+
Executors.newSingleThreadExecutor(new NamedThreadFactory(clientConnectionName + "-"));
358+
} else {
359+
this.executorService = executorServiceFactory.get();
360+
}
361+
ExecutorServiceFactory dispatchingExecutorServiceFactory =
362+
parameters.dispatchingExecutorServiceFactory;
363+
if (dispatchingExecutorServiceFactory == null) {
364+
this.dispatchingExecutorService =
365+
Executors.newSingleThreadExecutor(
366+
new NamedThreadFactory("dispatching-" + clientConnectionName + "-"));
367+
} else {
368+
this.dispatchingExecutorService = dispatchingExecutorServiceFactory.get();
369+
}
370+
this.executorServiceClosing =
371+
Utils.makeIdempotent(
372+
() -> {
373+
this.dispatchingExecutorService.shutdownNow();
374+
if (dispatchingExecutorServiceFactory == null) {
375+
this.dispatchingExecutorService.shutdownNow();
376+
} else {
377+
dispatchingExecutorServiceFactory.clientClosed(this.dispatchingExecutorService);
378+
}
379+
if (executorServiceFactory == null) {
380+
this.executorService.shutdownNow();
381+
} else {
382+
executorServiceFactory.clientClosed(this.executorService);
383+
}
384+
});
353385
try {
354386
this.tuneState =
355387
new TuneState(
@@ -2204,6 +2236,10 @@ public static class ClientParameters {
22042236
private Duration rpcTimeout;
22052237
private Consumer<Channel> channelCustomizer = noOpConsumer();
22062238
private Consumer<Bootstrap> bootstrapCustomizer = noOpConsumer();
2239+
// for messages
2240+
private ExecutorServiceFactory dispatchingExecutorServiceFactory;
2241+
// for other server frames
2242+
private ExecutorServiceFactory executorServiceFactory;
22072243

22082244
public ClientParameters host(String host) {
22092245
this.host = host;
@@ -2363,6 +2399,17 @@ public ClientParameters rpcTimeout(Duration rpcTimeout) {
23632399
return this;
23642400
}
23652401

2402+
public ClientParameters dispatchingExecutorServiceFactory(
2403+
ExecutorServiceFactory dispatchingExecutorServiceFactory) {
2404+
this.dispatchingExecutorServiceFactory = dispatchingExecutorServiceFactory;
2405+
return this;
2406+
}
2407+
2408+
public ClientParameters executorServiceFactory(ExecutorServiceFactory executorServiceFactory) {
2409+
this.executorServiceFactory = executorServiceFactory;
2410+
return this;
2411+
}
2412+
23662413
String host() {
23672414
return this.host;
23682415
}
@@ -2585,7 +2632,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
25852632
}
25862633

25872634
if (task != null) {
2588-
executorService.submit(task);
2635+
if (commandId == Constants.COMMAND_DELIVER) {
2636+
dispatchingExecutorService.submit(task);
2637+
} else {
2638+
executorService.submit(task);
2639+
}
25892640
}
25902641
}
25912642

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream.impl;
15+
16+
import com.rabbitmq.stream.impl.Utils.NamedThreadFactory;
17+
import java.util.ArrayList;
18+
import java.util.Comparator;
19+
import java.util.List;
20+
import java.util.concurrent.CopyOnWriteArrayList;
21+
import java.util.concurrent.ExecutorService;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ThreadFactory;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.atomic.AtomicInteger;
26+
import java.util.function.Supplier;
27+
import java.util.stream.IntStream;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
class DefaultExecutorServiceFactory implements ExecutorServiceFactory {
32+
33+
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultExecutorServiceFactory.class);
34+
private static final Comparator<Executor> EXECUTOR_COMPARATOR =
35+
Comparator.comparingInt(Executor::usage);
36+
37+
private final List<Executor> executors;
38+
private final AtomicBoolean closed = new AtomicBoolean(false);
39+
private final ThreadFactory threadFactory;
40+
private final int minSize;
41+
private final int clientPerExecutor;
42+
private final Supplier<Executor> executorFactory;
43+
44+
DefaultExecutorServiceFactory() {
45+
this(Runtime.getRuntime().availableProcessors(), 10);
46+
}
47+
48+
DefaultExecutorServiceFactory(int minSize, int clientPerExecutor) {
49+
this.minSize = minSize;
50+
this.clientPerExecutor = clientPerExecutor;
51+
this.threadFactory = new NamedThreadFactory("rabbitmq-stream-connection-");
52+
this.executorFactory = () -> newExecutor();
53+
List<Executor> l = new ArrayList<>(Runtime.getRuntime().availableProcessors());
54+
IntStream.range(0, Runtime.getRuntime().availableProcessors())
55+
.forEach(ignored -> l.add(this.executorFactory.get()));
56+
executors = new CopyOnWriteArrayList<>(l);
57+
}
58+
59+
static void maybeResize(
60+
List<Executor> current, int min, int clientsPerResource, Supplier<Executor> factory) {
61+
LOGGER.debug(
62+
"Resizing {}, with min = {}, clients per resource = {}", current, min, clientsPerResource);
63+
int clientCount = 0;
64+
for (Executor executor : current) {
65+
clientCount += executor.usage();
66+
}
67+
LOGGER.debug("Total usage is {}", clientCount);
68+
69+
int target = Math.max((clientCount / clientsPerResource) + 1, min);
70+
LOGGER.debug("Target size is {}, current size is {}", target, current.size());
71+
if (target > current.size()) {
72+
LOGGER.debug("Upsizing...");
73+
List<Executor> l = new ArrayList<>();
74+
for (int i = 0; i < target; i++) {
75+
if (i < current.size()) {
76+
l.add(current.get(i));
77+
} else {
78+
l.add(factory.get());
79+
}
80+
}
81+
current.clear();
82+
current.addAll(l);
83+
LOGGER.debug("New list is {}", current);
84+
} else if (target < current.size()) {
85+
LOGGER.debug("Downsizing...");
86+
boolean hasUnusedExecutors = current.stream().filter(ex -> ex.usage() == 0).count() > 0;
87+
if (!hasUnusedExecutors) {
88+
LOGGER.debug("No downsizing, there is no unused executor");
89+
}
90+
if (hasUnusedExecutors) {
91+
List<Executor> l = new ArrayList<>(target);
92+
for (int i = 0; i < current.size(); i++) {
93+
Executor executor = current.get(i);
94+
if (executor.usage() == 0) {
95+
executor.close();
96+
} else {
97+
l.add(executor);
98+
}
99+
}
100+
if (l.size() < target) {
101+
for (int i = l.size(); i < target; i++) {
102+
l.add(factory.get());
103+
}
104+
}
105+
current.clear();
106+
current.addAll(l);
107+
LOGGER.debug("New list is {}", current);
108+
}
109+
}
110+
}
111+
112+
private Executor newExecutor() {
113+
return new Executor(Executors.newSingleThreadExecutor(threadFactory));
114+
}
115+
116+
@Override
117+
public synchronized ExecutorService get() {
118+
if (closed.get()) {
119+
throw new IllegalStateException("Executor service factory is closed");
120+
} else {
121+
maybeResize(this.executors, this.minSize, this.clientPerExecutor, this.executorFactory);
122+
LOGGER.debug("Looking least used executor in {}", this.executors);
123+
Executor executor = this.executors.stream().min(EXECUTOR_COMPARATOR).get();
124+
LOGGER.debug("Least used executor is {}", executor);
125+
executor.incrementUsage();
126+
return executor.executorService;
127+
}
128+
}
129+
130+
@Override
131+
public synchronized void clientClosed(ExecutorService executorService) {
132+
if (!closed.get()) {
133+
Executor executor = find(executorService);
134+
if (executor == null) {
135+
LOGGER.info("Could not find executor service wrapper");
136+
} else {
137+
executor.decrementUsage();
138+
maybeResize(this.executors, this.minSize, this.clientPerExecutor, this.executorFactory);
139+
}
140+
}
141+
}
142+
143+
private Executor find(ExecutorService executorService) {
144+
for (Executor executor : this.executors) {
145+
if (executor.executorService.equals(executorService)) {
146+
return executor;
147+
}
148+
}
149+
return null;
150+
}
151+
152+
@Override
153+
public synchronized void close() {
154+
if (closed.compareAndSet(false, true)) {
155+
this.executors.forEach(executor -> executor.executorService.shutdownNow());
156+
}
157+
}
158+
159+
static class Executor {
160+
161+
private final ExecutorService executorService;
162+
private AtomicInteger usage = new AtomicInteger(0);
163+
164+
Executor(ExecutorService executorService) {
165+
this.executorService = executorService;
166+
}
167+
168+
Executor incrementUsage() {
169+
this.usage.incrementAndGet();
170+
return this;
171+
}
172+
173+
Executor decrementUsage() {
174+
this.usage.decrementAndGet();
175+
return this;
176+
}
177+
178+
Executor addUsage(int delta) {
179+
this.usage.addAndGet(delta);
180+
return this;
181+
}
182+
183+
Executor substractUsage(int delta) {
184+
this.usage.addAndGet(-delta);
185+
return this;
186+
}
187+
188+
private int usage() {
189+
return this.usage.get();
190+
}
191+
192+
private void close() {
193+
this.executorService.shutdownNow();
194+
}
195+
196+
@Override
197+
public String toString() {
198+
return "Executor{" + "usage=" + usage.get() + '}';
199+
}
200+
}
201+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream.impl;
15+
16+
import java.util.concurrent.ExecutorService;
17+
18+
interface ExecutorServiceFactory extends AutoCloseable {
19+
20+
ExecutorService get();
21+
22+
void clientClosed(ExecutorService executorService);
23+
24+
@Override
25+
void close();
26+
}

0 commit comments

Comments
 (0)