From 4beb5faf6d569bfb65f1f2bdbf891ad29b9313c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 29 Sep 2025 11:15:17 +0200 Subject: [PATCH 1/3] Stop reconnecting locator is environment is closed --- .../stream/impl/StreamEnvironment.java | 19 ++++++++++++++----- .../impl/SacSuperStreamConsumerTest.java | 6 ++---- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 1d0eb33696..5c2b7c6e67 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -356,13 +356,19 @@ class StreamEnvironment implements Environment { clientFactory, this.locatorReconnectionScheduledExecutorService, this.recoveryBackOffDelayPolicy, - l.label()); + l.label(), + this.closed); } }); } }; if (lazyInit) { - this.locatorInitializationSequence = locatorInitSequence; + this.locatorInitializationSequence = + () -> { + if (!this.closed.get()) { + locatorInitSequence.run(); + } + }; } else { locatorInitSequence.run(); locatorsInitialized.set(true); @@ -391,7 +397,7 @@ private ShutdownListener shutdownListener( shutdownContext -> { String label = locator.label(); LOGGER.debug("Locator {} disconnected", label); - if (shutdownContext.isShutdownUnexpected()) { + if (shutdownContext.isShutdownUnexpected() && !this.closed.get()) { locator.client(null); BackOffDelayPolicy delayPolicy = recoveryBackOffDelayPolicy; LOGGER.debug( @@ -408,7 +414,8 @@ private ShutdownListener shutdownListener( clientFactory, this.locatorReconnectionScheduledExecutorService, delayPolicy, - label); + label, + this.closed); } else { LOGGER.debug("Locator connection '{}' closing normally", label); } @@ -425,7 +432,8 @@ private static void scheduleLocatorConnection( Function clientFactory, ScheduledExecutorService scheduler, BackOffDelayPolicy delayPolicy, - String locatorLabel) { + String locatorLabel, + AtomicBoolean closed) { LOGGER.debug( "Scheduling locator '{}' connection with delay policy {}", locatorLabel, delayPolicy); try { @@ -452,6 +460,7 @@ private static void scheduleLocatorConnection( .description("Locator '%s' connection", locatorLabel) .scheduler(scheduler) .delayPolicy(delayPolicy) + .retry(ignored -> !closed.get()) .build() .thenAccept(locator::client) .exceptionally( diff --git a/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java index 60e1ffaf2e..c35567a1a6 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java @@ -53,10 +53,8 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -@ExtendWith({ - TestUtils.StreamTestInfrastructureExtension.class, - BrokerVersionAtLeast311Condition.class -}) +@ExtendWith({BrokerVersionAtLeast311Condition.class}) +@StreamTestInfrastructure @SingleActiveConsumer public class SacSuperStreamConsumerTest { EventLoopGroup eventLoopGroup; From 0b1203dd5dd12c86eee1e0ab436435a6d030dd6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 29 Sep 2025 11:34:27 +0200 Subject: [PATCH 2/3] Check Netty event loop group is open before trying to connect --- .../stream/impl/StreamEnvironment.java | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 5c2b7c6e67..d5f91124b8 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -87,6 +87,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongConsumer; @@ -103,6 +104,7 @@ class StreamEnvironment implements Environment { private static final Logger LOGGER = LoggerFactory.getLogger(StreamEnvironment.class); private final EventLoopGroup eventLoopGroup; + private final boolean privateEventLoopGroup; private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService locatorReconnectionScheduledExecutorService; private final boolean privateScheduleExecutorService; @@ -272,17 +274,16 @@ class StreamEnvironment implements Environment { if (clientParametersPrototype.eventLoopGroup == null) { this.eventLoopGroup = Utils.eventLoopGroup(); + this.privateEventLoopGroup = true; shutdownService.wrap(() -> closeEventLoopGroup(this.eventLoopGroup)); - this.clientParametersPrototype = - clientParametersPrototype.duplicate().eventLoopGroup(this.eventLoopGroup); } else { - this.eventLoopGroup = null; - this.clientParametersPrototype = - clientParametersPrototype - .duplicate() - .eventLoopGroup(clientParametersPrototype.eventLoopGroup); + this.eventLoopGroup = clientParametersPrototype.eventLoopGroup; + this.privateEventLoopGroup = false; } + this.clientParametersPrototype = + clientParametersPrototype.duplicate().eventLoopGroup(this.eventLoopGroup); + this.producersCoordinator = new ProducersCoordinator( this, @@ -357,7 +358,7 @@ class StreamEnvironment implements Environment { this.locatorReconnectionScheduledExecutorService, this.recoveryBackOffDelayPolicy, l.label(), - this.closed); + this::shouldTryLocatorConnection); } }); } @@ -365,7 +366,7 @@ class StreamEnvironment implements Environment { if (lazyInit) { this.locatorInitializationSequence = () -> { - if (!this.closed.get()) { + if (this.shouldTryLocatorConnection()) { locatorInitSequence.run(); } }; @@ -397,7 +398,7 @@ private ShutdownListener shutdownListener( shutdownContext -> { String label = locator.label(); LOGGER.debug("Locator {} disconnected", label); - if (shutdownContext.isShutdownUnexpected() && !this.closed.get()) { + if (shutdownContext.isShutdownUnexpected()) { locator.client(null); BackOffDelayPolicy delayPolicy = recoveryBackOffDelayPolicy; LOGGER.debug( @@ -415,7 +416,7 @@ private ShutdownListener shutdownListener( this.locatorReconnectionScheduledExecutorService, delayPolicy, label, - this.closed); + this::shouldTryLocatorConnection); } else { LOGGER.debug("Locator connection '{}' closing normally", label); } @@ -433,7 +434,7 @@ private static void scheduleLocatorConnection( ScheduledExecutorService scheduler, BackOffDelayPolicy delayPolicy, String locatorLabel, - AtomicBoolean closed) { + BooleanSupplier shouldRetry) { LOGGER.debug( "Scheduling locator '{}' connection with delay policy {}", locatorLabel, delayPolicy); try { @@ -460,7 +461,7 @@ private static void scheduleLocatorConnection( .description("Locator '%s' connection", locatorLabel) .scheduler(scheduler) .delayPolicy(delayPolicy) - .retry(ignored -> !closed.get()) + .retry(ignored -> shouldRetry.getAsBoolean()) .build() .thenAccept(locator::client) .exceptionally( @@ -786,14 +787,19 @@ public void close() { if (this.locatorReconnectionScheduledExecutorService != null) { this.locatorReconnectionScheduledExecutorService.shutdownNow(); } - closeEventLoopGroup(this.eventLoopGroup); + if (this.privateEventLoopGroup) { + closeEventLoopGroup(this.eventLoopGroup); + } } } + private boolean shouldTryLocatorConnection() { + return !this.closed.get() && !this.eventLoopGroup.isShuttingDown(); + } + private static void closeEventLoopGroup(EventLoopGroup eventLoopGroup) { try { - if (eventLoopGroup != null - && (!eventLoopGroup.isShuttingDown() || !eventLoopGroup.isShutdown())) { + if (!eventLoopGroup.isShuttingDown()) { LOGGER.debug("Closing Netty event loop group"); eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS); } From 58490490e7ea05df5583d93480482d6f88843a6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 29 Sep 2025 12:11:58 +0200 Subject: [PATCH 3/3] Refine tests --- .../rabbitmq/stream/impl/DedupIdempotencyTest.java | 8 +++++++- .../rabbitmq/stream/impl/StreamProducerUnitTest.java | 12 ++++++++---- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/test/java/com/rabbitmq/stream/impl/DedupIdempotencyTest.java b/src/test/java/com/rabbitmq/stream/impl/DedupIdempotencyTest.java index 9e3982eb43..9d628b6f4a 100644 --- a/src/test/java/com/rabbitmq/stream/impl/DedupIdempotencyTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/DedupIdempotencyTest.java @@ -43,6 +43,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -57,7 +58,12 @@ public class DedupIdempotencyTest { void init() { EnvironmentBuilder environmentBuilder = Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder(); - environment = environmentBuilder.build(); + this.environment = environmentBuilder.build(); + } + + @AfterEach + void tearDown() { + this.environment.close(); } @Test diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java index 61a5b73540..407f3c6948 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java @@ -39,11 +39,14 @@ import com.rabbitmq.stream.impl.Client.OutboundEntityWriteCallback; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import java.time.Duration; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -70,7 +73,7 @@ public class StreamProducerUnitTest { @Mock Channel channel; @Mock ChannelFuture channelFuture; - Set buffers = ConcurrentHashMap.newKeySet(); + Queue buffers = new ConcurrentLinkedQueue<>(); ScheduledExecutorService executorService; Clock clock = new Clock(); @@ -82,15 +85,16 @@ public class StreamProducerUnitTest { void init() { mocks = MockitoAnnotations.openMocks(this); executorService = Executors.newScheduledThreadPool(2); - when(channel.alloc()).thenReturn(Utils.byteBufAllocator()); + ByteBufAllocator allocator = new UnpooledByteBufAllocator(false); + when(channel.alloc()).thenReturn(allocator); when(channel.writeAndFlush(Mockito.any())).thenReturn(channelFuture); when(client.allocateNoCheck(any(ByteBufAllocator.class), anyInt())) .thenAnswer( (Answer) invocation -> { - ByteBufAllocator allocator = invocation.getArgument(0); + ByteBufAllocator alloc = invocation.getArgument(0); int capacity = invocation.getArgument(1); - ByteBuf buffer = allocator.buffer(capacity); + ByteBuf buffer = alloc.buffer(capacity); buffers.add(buffer); return buffer; });