Skip to content

Commit 8e7f04e

Browse files
committed
Use different executor factories for producers/consumers
Executor services are now shared between connection, but there must be a distinction between producer and consumer connections, as a consumer can depend on a RPC in a producer connection (e.g. getting the stored offset from a connection to the stream leader). So the producer and consumer connections cannot share the same executors to avoid deadlocks. References #298
1 parent ac90b10 commit 8e7f04e

File tree

8 files changed

+129
-48
lines changed

8 files changed

+129
-48
lines changed

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ class ConsumersCoordinator {
8686

8787
private final boolean debug = false;
8888
private final List<SubscriptionTracker> trackers = new CopyOnWriteArrayList<>();
89+
private final ExecutorServiceFactory executorServiceFactory =
90+
new DefaultExecutorServiceFactory(
91+
Runtime.getRuntime().availableProcessors(), 10, "rabbitmq-stream-consumer-connection-");
8992

9093
ConsumersCoordinator(
9194
StreamEnvironment environment,
@@ -171,7 +174,11 @@ private void addToManager(
171174
OffsetSpecification offsetSpecification,
172175
boolean isInitialSubscription) {
173176
ClientParameters clientParameters =
174-
environment.clientParametersCopy().host(node.getHost()).port(node.getPort());
177+
environment
178+
.clientParametersCopy()
179+
.executorServiceFactory(this.executorServiceFactory)
180+
.host(node.getHost())
181+
.port(node.getPort());
175182
ClientSubscriptionsManager pickedManager = null;
176183
while (pickedManager == null) {
177184
Iterator<ClientSubscriptionsManager> iterator = this.managers.iterator();
@@ -300,6 +307,11 @@ public void close() {
300307
e.getMessage());
301308
}
302309
}
310+
try {
311+
this.executorServiceFactory.close();
312+
} catch (Exception e) {
313+
LOGGER.info("Error while closing executor service factory: {}", e.getMessage());
314+
}
303315
}
304316

305317
@Override

src/main/java/com/rabbitmq/stream/impl/DefaultExecutorServiceFactory.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,13 @@ class DefaultExecutorServiceFactory implements ExecutorServiceFactory {
4141
private final int clientPerExecutor;
4242
private final Supplier<Executor> executorFactory;
4343

44-
DefaultExecutorServiceFactory() {
45-
this(Runtime.getRuntime().availableProcessors(), 10);
46-
}
47-
48-
DefaultExecutorServiceFactory(int minSize, int clientPerExecutor) {
44+
DefaultExecutorServiceFactory(int minSize, int clientPerExecutor, String prefix) {
4945
this.minSize = minSize;
5046
this.clientPerExecutor = clientPerExecutor;
51-
this.threadFactory = new NamedThreadFactory("rabbitmq-stream-connection-");
47+
this.threadFactory = new NamedThreadFactory(prefix);
5248
this.executorFactory = () -> newExecutor();
53-
List<Executor> l = new ArrayList<>(Runtime.getRuntime().availableProcessors());
54-
IntStream.range(0, Runtime.getRuntime().availableProcessors())
55-
.forEach(ignored -> l.add(this.executorFactory.get()));
49+
List<Executor> l = new ArrayList<>(this.minSize);
50+
IntStream.range(0, this.minSize).forEach(ignored -> l.add(this.executorFactory.get()));
5651
executors = new CopyOnWriteArrayList<>(l);
5752
}
5853

src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import com.rabbitmq.stream.impl.Utils.ClientConnectionType;
3737
import com.rabbitmq.stream.impl.Utils.ClientFactory;
3838
import com.rabbitmq.stream.impl.Utils.ClientFactoryContext;
39-
import com.rabbitmq.stream.impl.Utils.NamedThreadFactory;
4039
import java.util.Collection;
4140
import java.util.Iterator;
4241
import java.util.List;
@@ -48,8 +47,6 @@
4847
import java.util.concurrent.ConcurrentMap;
4948
import java.util.concurrent.ConcurrentSkipListSet;
5049
import java.util.concurrent.CopyOnWriteArrayList;
51-
import java.util.concurrent.ExecutorService;
52-
import java.util.concurrent.Executors;
5350
import java.util.concurrent.atomic.AtomicBoolean;
5451
import java.util.concurrent.atomic.AtomicLong;
5552
import java.util.concurrent.atomic.AtomicReference;
@@ -73,7 +70,9 @@ class ProducersCoordinator {
7370
private final AtomicLong trackerIdSequence = new AtomicLong(0);
7471
private final boolean debug = false;
7572
private final List<ProducerTracker> producerTrackers = new CopyOnWriteArrayList<>();
76-
private final ExecutorServiceFactory executorServiceFactory;
73+
private final ExecutorServiceFactory executorServiceFactory =
74+
new DefaultExecutorServiceFactory(
75+
Runtime.getRuntime().availableProcessors(), 10, "rabbitmq-stream-producer-connection-");
7776

7877
ProducersCoordinator(
7978
StreamEnvironment environment,
@@ -86,27 +85,6 @@ class ProducersCoordinator {
8685
this.maxProducersByClient = maxProducersByClient;
8786
this.maxTrackingConsumersByClient = maxTrackingConsumersByClient;
8887
this.connectionNamingStrategy = connectionNamingStrategy;
89-
// use the same single-threaded executor for all client connections
90-
// it's meant for message dispatching, so it should not be used
91-
this.executorServiceFactory =
92-
new ExecutorServiceFactory() {
93-
private final ExecutorService executorService =
94-
Executors.newSingleThreadExecutor(
95-
new NamedThreadFactory("rabbitmq-stream-producers-coordinator-dispatcher-"));
96-
97-
@Override
98-
public ExecutorService get() {
99-
return executorService;
100-
}
101-
102-
@Override
103-
public void clientClosed(ExecutorService executorService) {}
104-
105-
@Override
106-
public void close() {
107-
executorService.shutdownNow();
108-
}
109-
};
11088
}
11189

11290
private static String keyForNode(Client.Broker broker) {
@@ -156,7 +134,8 @@ private void addToManager(Broker node, AgentTracker tracker) {
156134
.clientParametersCopy()
157135
.host(node.getHost())
158136
.port(node.getPort())
159-
.dispatchingExecutorServiceFactory(this.executorServiceFactory);
137+
.executorServiceFactory(this.executorServiceFactory)
138+
.dispatchingExecutorServiceFactory(Utils.NO_OP_EXECUTOR_SERVICE_FACTORY);
160139
ClientProducersManager pickedManager = null;
161140
while (pickedManager == null) {
162141
Iterator<ClientProducersManager> iterator = this.managers.iterator();

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class StreamEnvironment implements Environment {
9999
private final AtomicBoolean locatorsInitialized = new AtomicBoolean(false);
100100
private final Runnable locatorInitializationSequence;
101101
private final List<Locator> locators = new CopyOnWriteArrayList<>();
102-
private final ExecutorServiceFactory executorServiceFactory = new DefaultExecutorServiceFactory();
102+
private final ExecutorServiceFactory executorServiceFactory;
103103

104104
StreamEnvironment(
105105
ScheduledExecutorService scheduledExecutorService,
@@ -150,10 +150,7 @@ class StreamEnvironment implements Environment {
150150
this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy;
151151
this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy;
152152
this.byteBufAllocator = byteBufAllocator;
153-
clientParametersPrototype =
154-
clientParametersPrototype
155-
.byteBufAllocator(byteBufAllocator)
156-
.executorServiceFactory(this.executorServiceFactory);
153+
clientParametersPrototype = clientParametersPrototype.byteBufAllocator(byteBufAllocator);
157154
clientParametersPrototype = maybeSetUpClientParametersFromUris(uris, clientParametersPrototype);
158155

159156
this.addressResolver = addressResolver;
@@ -195,6 +192,9 @@ class StreamEnvironment implements Environment {
195192
}
196193

197194
this.addresses.forEach(address -> this.locators.add(new Locator(address)));
195+
this.executorServiceFactory =
196+
new DefaultExecutorServiceFactory(
197+
this.addresses.size(), 1, "rabbitmq-stream-locator-connection-");
198198

199199
if (clientParametersPrototype.eventLoopGroup == null) {
200200
this.eventLoopGroup = new NioEventLoopGroup();
@@ -232,7 +232,7 @@ class StreamEnvironment implements Environment {
232232
connectionNamingStrategy,
233233
Utils.coordinatorClientFactory(this));
234234
this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
235-
ClientParameters clientParametersForInit = clientParametersPrototype.duplicate();
235+
ClientParameters clientParametersForInit = locatorParametersCopy();
236236
Runnable locatorInitSequence =
237237
() -> {
238238
RuntimeException lastException = null;
@@ -301,9 +301,7 @@ private ShutdownListener shutdownListener(
301301
LOGGER.debug("Unexpected locator disconnection, trying to reconnect");
302302
try {
303303
Client.ClientParameters newLocatorParameters =
304-
this.clientParametersPrototype
305-
.duplicate()
306-
.shutdownListener(shutdownListenerReference.get());
304+
this.locatorParametersCopy().shutdownListener(shutdownListenerReference.get());
307305
AsyncRetry.asyncRetry(
308306
() -> {
309307
LOGGER.debug("Locator reconnection...");
@@ -351,7 +349,7 @@ private void scheduleLocatorConnection(
351349
shutdownListener(locator, connectionNamingStrategy, clientFactory);
352350
try {
353351
Client.ClientParameters newLocatorParameters =
354-
this.clientParametersPrototype.duplicate().shutdownListener(shutdownListener);
352+
this.locatorParametersCopy().shutdownListener(shutdownListener);
355353
AsyncRetry.asyncRetry(
356354
() -> {
357355
LOGGER.debug("Locator reconnection...");
@@ -762,6 +760,13 @@ Client.ClientParameters clientParametersCopy() {
762760
return this.clientParametersPrototype.duplicate();
763761
}
764762

763+
private Client.ClientParameters locatorParametersCopy() {
764+
return this.clientParametersPrototype
765+
.duplicate()
766+
.executorServiceFactory(this.executorServiceFactory)
767+
.dispatchingExecutorServiceFactory(Utils.NO_OP_EXECUTOR_SERVICE_FACTORY);
768+
}
769+
765770
TrackingConsumerRegistration registerTrackingConsumer(
766771
StreamConsumer streamConsumer, TrackingConfiguration configuration) {
767772
Runnable closingCallable = this.producersCoordinator.registerTrackingConsumer(streamConsumer);

src/main/java/com/rabbitmq/stream/impl/Utils.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,20 @@
2727
import java.security.cert.X509Certificate;
2828
import java.time.Duration;
2929
import java.util.Arrays;
30+
import java.util.Collection;
3031
import java.util.Collections;
3132
import java.util.HashMap;
3233
import java.util.List;
3334
import java.util.Locale;
3435
import java.util.Map;
36+
import java.util.concurrent.Callable;
3537
import java.util.concurrent.ConcurrentHashMap;
3638
import java.util.concurrent.CopyOnWriteArrayList;
39+
import java.util.concurrent.ExecutorService;
3740
import java.util.concurrent.Executors;
41+
import java.util.concurrent.Future;
3842
import java.util.concurrent.ThreadFactory;
43+
import java.util.concurrent.TimeUnit;
3944
import java.util.concurrent.atomic.AtomicBoolean;
4045
import java.util.concurrent.atomic.AtomicLong;
4146
import java.util.function.Consumer;
@@ -523,4 +528,88 @@ public Thread newThread(Runnable r) {
523528
return thread;
524529
}
525530
}
531+
532+
static final ExecutorServiceFactory NO_OP_EXECUTOR_SERVICE_FACTORY =
533+
new NoOpExecutorServiceFactory();
534+
535+
static class NoOpExecutorServiceFactory implements ExecutorServiceFactory {
536+
537+
private final ExecutorService executorService = new NoOpExecutorService();
538+
539+
@Override
540+
public ExecutorService get() {
541+
return executorService;
542+
}
543+
544+
@Override
545+
public void clientClosed(ExecutorService executorService) {}
546+
547+
@Override
548+
public void close() {}
549+
}
550+
551+
private static class NoOpExecutorService implements ExecutorService {
552+
553+
@Override
554+
public void shutdown() {}
555+
556+
@Override
557+
public List<Runnable> shutdownNow() {
558+
return null;
559+
}
560+
561+
@Override
562+
public boolean isShutdown() {
563+
return false;
564+
}
565+
566+
@Override
567+
public boolean isTerminated() {
568+
return false;
569+
}
570+
571+
@Override
572+
public boolean awaitTermination(long timeout, TimeUnit unit) {
573+
return false;
574+
}
575+
576+
@Override
577+
public <T> Future<T> submit(Callable<T> task) {
578+
return null;
579+
}
580+
581+
@Override
582+
public <T> Future<T> submit(Runnable task, T result) {
583+
return null;
584+
}
585+
586+
@Override
587+
public Future<?> submit(Runnable task) {
588+
return null;
589+
}
590+
591+
@Override
592+
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
593+
return null;
594+
}
595+
596+
@Override
597+
public <T> List<Future<T>> invokeAll(
598+
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
599+
return null;
600+
}
601+
602+
@Override
603+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) {
604+
return null;
605+
}
606+
607+
@Override
608+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
609+
return null;
610+
}
611+
612+
@Override
613+
public void execute(Runnable command) {}
614+
}
526615
}

src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ public Client.ClientParameters shutdownListener(
143143
return super.shutdownListener(shutdownListener);
144144
}
145145
};
146-
clientParameters.executorServiceFactory(new DefaultExecutorServiceFactory());
147146
mocks = MockitoAnnotations.openMocks(this);
148147
when(environment.locator()).thenReturn(locator);
149148
when(environment.locatorOperation(any())).thenCallRealMethod();
@@ -168,6 +167,7 @@ void tearDown() throws Exception {
168167
scheduledExecutorService.shutdownNow();
169168
}
170169
mocks.close();
170+
coordinator.close();
171171
}
172172

173173
@Test

src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ void tearDown() throws Exception {
130130
scheduledExecutorService.shutdownNow();
131131
}
132132
mocks.close();
133+
coordinator.close();
133134
}
134135

135136
@Test

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public final class TestUtils {
8989

9090
private static final Logger LOGGER = LoggerFactory.getLogger(TestUtils.class);
9191

92-
private static final Duration DEFAULT_CONDITION_TIMEOUT = Duration.ofSeconds(20);
92+
private static final Duration DEFAULT_CONDITION_TIMEOUT = Duration.ofSeconds(10);
9393

9494
private TestUtils() {}
9595

0 commit comments

Comments
 (0)