Skip to content

Commit aaf196e

Browse files
authored
Merge pull request #298 from rabbitmq/dispatch-messages-in-dedicated-thread
Dispatch messages in dedicated thread
2 parents f44e161 + 22e1e53 commit aaf196e

File tree

8 files changed

+533
-5
lines changed

8 files changed

+533
-5
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import com.rabbitmq.stream.ChunkChecksum;
5656
import com.rabbitmq.stream.Codec;
5757
import com.rabbitmq.stream.Codec.EncodedMessage;
58+
import com.rabbitmq.stream.Constants;
5859
import com.rabbitmq.stream.Environment;
5960
import com.rabbitmq.stream.Message;
6061
import com.rabbitmq.stream.MessageBuilder;
@@ -68,6 +69,7 @@
6869
import com.rabbitmq.stream.impl.Client.ShutdownContext.ShutdownReason;
6970
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandler;
7071
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo;
72+
import com.rabbitmq.stream.impl.Utils.NamedThreadFactory;
7173
import com.rabbitmq.stream.metrics.MetricsCollector;
7274
import com.rabbitmq.stream.metrics.NoOpMetricsCollector;
7375
import com.rabbitmq.stream.sasl.CredentialsProvider;
@@ -179,6 +181,7 @@ public class Client implements AutoCloseable {
179181
new ConcurrentHashMap<>();
180182
final List<SubscriptionOffset> subscriptionOffsets = new CopyOnWriteArrayList<>();
181183
final ExecutorService executorService;
184+
final ExecutorService dispatchingExecutorService;
182185
final TuneState tuneState;
183186
final AtomicBoolean closing = new AtomicBoolean(false);
184187
final ChunkChecksum chunkChecksum;
@@ -348,8 +351,37 @@ public void initChannel(SocketChannel ch) {
348351

349352
this.channel = f.channel();
350353
this.nettyClosing = Utils.makeIdempotent(this::closeNetty);
351-
this.executorService = Executors.newSingleThreadExecutor();
352-
this.executorServiceClosing = Utils.makeIdempotent(this.executorService::shutdownNow);
354+
ExecutorServiceFactory executorServiceFactory = parameters.executorServiceFactory;
355+
if (executorServiceFactory == null) {
356+
this.executorService =
357+
Executors.newSingleThreadExecutor(new NamedThreadFactory(clientConnectionName + "-"));
358+
} else {
359+
this.executorService = executorServiceFactory.get();
360+
}
361+
ExecutorServiceFactory dispatchingExecutorServiceFactory =
362+
parameters.dispatchingExecutorServiceFactory;
363+
if (dispatchingExecutorServiceFactory == null) {
364+
this.dispatchingExecutorService =
365+
Executors.newSingleThreadExecutor(
366+
new NamedThreadFactory("dispatching-" + clientConnectionName + "-"));
367+
} else {
368+
this.dispatchingExecutorService = dispatchingExecutorServiceFactory.get();
369+
}
370+
this.executorServiceClosing =
371+
Utils.makeIdempotent(
372+
() -> {
373+
this.dispatchingExecutorService.shutdownNow();
374+
if (dispatchingExecutorServiceFactory == null) {
375+
this.dispatchingExecutorService.shutdownNow();
376+
} else {
377+
dispatchingExecutorServiceFactory.clientClosed(this.dispatchingExecutorService);
378+
}
379+
if (executorServiceFactory == null) {
380+
this.executorService.shutdownNow();
381+
} else {
382+
executorServiceFactory.clientClosed(this.executorService);
383+
}
384+
});
353385
try {
354386
this.tuneState =
355387
new TuneState(
@@ -2204,6 +2236,10 @@ public static class ClientParameters {
22042236
private Duration rpcTimeout;
22052237
private Consumer<Channel> channelCustomizer = noOpConsumer();
22062238
private Consumer<Bootstrap> bootstrapCustomizer = noOpConsumer();
2239+
// for messages
2240+
private ExecutorServiceFactory dispatchingExecutorServiceFactory;
2241+
// for other server frames
2242+
private ExecutorServiceFactory executorServiceFactory;
22072243

22082244
public ClientParameters host(String host) {
22092245
this.host = host;
@@ -2363,6 +2399,17 @@ public ClientParameters rpcTimeout(Duration rpcTimeout) {
23632399
return this;
23642400
}
23652401

2402+
public ClientParameters dispatchingExecutorServiceFactory(
2403+
ExecutorServiceFactory dispatchingExecutorServiceFactory) {
2404+
this.dispatchingExecutorServiceFactory = dispatchingExecutorServiceFactory;
2405+
return this;
2406+
}
2407+
2408+
public ClientParameters executorServiceFactory(ExecutorServiceFactory executorServiceFactory) {
2409+
this.executorServiceFactory = executorServiceFactory;
2410+
return this;
2411+
}
2412+
23662413
String host() {
23672414
return this.host;
23682415
}
@@ -2585,7 +2632,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
25852632
}
25862633

25872634
if (task != null) {
2588-
executorService.submit(task);
2635+
if (commandId == Constants.COMMAND_DELIVER) {
2636+
dispatchingExecutorService.submit(task);
2637+
} else {
2638+
executorService.submit(task);
2639+
}
25892640
}
25902641
}
25912642

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream.impl;
15+
16+
import com.rabbitmq.stream.impl.Utils.NamedThreadFactory;
17+
import java.util.ArrayList;
18+
import java.util.Comparator;
19+
import java.util.List;
20+
import java.util.concurrent.CopyOnWriteArrayList;
21+
import java.util.concurrent.ExecutorService;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ThreadFactory;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.atomic.AtomicInteger;
26+
import java.util.function.Supplier;
27+
import java.util.stream.IntStream;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
class DefaultExecutorServiceFactory implements ExecutorServiceFactory {
32+
33+
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultExecutorServiceFactory.class);
34+
private static final Comparator<Executor> EXECUTOR_COMPARATOR =
35+
Comparator.comparingInt(Executor::usage);
36+
37+
private final List<Executor> executors;
38+
private final AtomicBoolean closed = new AtomicBoolean(false);
39+
private final ThreadFactory threadFactory;
40+
private final int minSize;
41+
private final int clientPerExecutor;
42+
private final Supplier<Executor> executorFactory;
43+
44+
DefaultExecutorServiceFactory() {
45+
this(Runtime.getRuntime().availableProcessors(), 10);
46+
}
47+
48+
DefaultExecutorServiceFactory(int minSize, int clientPerExecutor) {
49+
this.minSize = minSize;
50+
this.clientPerExecutor = clientPerExecutor;
51+
this.threadFactory = new NamedThreadFactory("rabbitmq-stream-connection-");
52+
this.executorFactory = () -> newExecutor();
53+
List<Executor> l = new ArrayList<>(Runtime.getRuntime().availableProcessors());
54+
IntStream.range(0, Runtime.getRuntime().availableProcessors())
55+
.forEach(ignored -> l.add(this.executorFactory.get()));
56+
executors = new CopyOnWriteArrayList<>(l);
57+
}
58+
59+
static void maybeResize(
60+
List<Executor> current, int min, int clientsPerResource, Supplier<Executor> factory) {
61+
LOGGER.debug(
62+
"Resizing {}, with min = {}, clients per resource = {}", current, min, clientsPerResource);
63+
int clientCount = 0;
64+
for (Executor executor : current) {
65+
clientCount += executor.usage();
66+
}
67+
LOGGER.debug("Total usage is {}", clientCount);
68+
69+
int target = Math.max((clientCount / clientsPerResource) + 1, min);
70+
LOGGER.debug("Target size is {}, current size is {}", target, current.size());
71+
if (target > current.size()) {
72+
LOGGER.debug("Upsizing...");
73+
List<Executor> l = new ArrayList<>();
74+
for (int i = 0; i < target; i++) {
75+
if (i < current.size()) {
76+
l.add(current.get(i));
77+
} else {
78+
l.add(factory.get());
79+
}
80+
}
81+
current.clear();
82+
current.addAll(l);
83+
LOGGER.debug("New list is {}", current);
84+
} else if (target < current.size()) {
85+
LOGGER.debug("Downsizing...");
86+
boolean hasUnusedExecutors = current.stream().filter(ex -> ex.usage() == 0).count() > 0;
87+
if (!hasUnusedExecutors) {
88+
LOGGER.debug("No downsizing, there is no unused executor");
89+
}
90+
if (hasUnusedExecutors) {
91+
List<Executor> l = new ArrayList<>(target);
92+
for (int i = 0; i < current.size(); i++) {
93+
Executor executor = current.get(i);
94+
if (executor.usage() == 0) {
95+
executor.close();
96+
} else {
97+
l.add(executor);
98+
}
99+
}
100+
if (l.size() < target) {
101+
for (int i = l.size(); i < target; i++) {
102+
l.add(factory.get());
103+
}
104+
}
105+
current.clear();
106+
current.addAll(l);
107+
LOGGER.debug("New list is {}", current);
108+
}
109+
}
110+
}
111+
112+
private Executor newExecutor() {
113+
return new Executor(Executors.newSingleThreadExecutor(threadFactory));
114+
}
115+
116+
@Override
117+
public synchronized ExecutorService get() {
118+
if (closed.get()) {
119+
throw new IllegalStateException("Executor service factory is closed");
120+
} else {
121+
maybeResize(this.executors, this.minSize, this.clientPerExecutor, this.executorFactory);
122+
LOGGER.debug("Looking least used executor in {}", this.executors);
123+
Executor executor = this.executors.stream().min(EXECUTOR_COMPARATOR).get();
124+
LOGGER.debug("Least used executor is {}", executor);
125+
executor.incrementUsage();
126+
return executor.executorService;
127+
}
128+
}
129+
130+
@Override
131+
public synchronized void clientClosed(ExecutorService executorService) {
132+
if (!closed.get()) {
133+
Executor executor = find(executorService);
134+
if (executor == null) {
135+
LOGGER.info("Could not find executor service wrapper");
136+
} else {
137+
executor.decrementUsage();
138+
maybeResize(this.executors, this.minSize, this.clientPerExecutor, this.executorFactory);
139+
}
140+
}
141+
}
142+
143+
private Executor find(ExecutorService executorService) {
144+
for (Executor executor : this.executors) {
145+
if (executor.executorService.equals(executorService)) {
146+
return executor;
147+
}
148+
}
149+
return null;
150+
}
151+
152+
@Override
153+
public synchronized void close() {
154+
if (closed.compareAndSet(false, true)) {
155+
this.executors.forEach(executor -> executor.executorService.shutdownNow());
156+
}
157+
}
158+
159+
static class Executor {
160+
161+
private final ExecutorService executorService;
162+
private AtomicInteger usage = new AtomicInteger(0);
163+
164+
Executor(ExecutorService executorService) {
165+
this.executorService = executorService;
166+
}
167+
168+
Executor incrementUsage() {
169+
this.usage.incrementAndGet();
170+
return this;
171+
}
172+
173+
Executor decrementUsage() {
174+
this.usage.decrementAndGet();
175+
return this;
176+
}
177+
178+
Executor addUsage(int delta) {
179+
this.usage.addAndGet(delta);
180+
return this;
181+
}
182+
183+
Executor substractUsage(int delta) {
184+
this.usage.addAndGet(-delta);
185+
return this;
186+
}
187+
188+
private int usage() {
189+
return this.usage.get();
190+
}
191+
192+
private void close() {
193+
this.executorService.shutdownNow();
194+
}
195+
196+
@Override
197+
public String toString() {
198+
return "Executor{" + "usage=" + usage.get() + '}';
199+
}
200+
}
201+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream.impl;
15+
16+
import java.util.concurrent.ExecutorService;
17+
18+
interface ExecutorServiceFactory extends AutoCloseable {
19+
20+
ExecutorService get();
21+
22+
void clientClosed(ExecutorService executorService);
23+
24+
@Override
25+
void close();
26+
}

0 commit comments

Comments
 (0)