diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 1d0eb33696..d5f91124b8 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -87,6 +87,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongConsumer; @@ -103,6 +104,7 @@ class StreamEnvironment implements Environment { private static final Logger LOGGER = LoggerFactory.getLogger(StreamEnvironment.class); private final EventLoopGroup eventLoopGroup; + private final boolean privateEventLoopGroup; private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService locatorReconnectionScheduledExecutorService; private final boolean privateScheduleExecutorService; @@ -272,17 +274,16 @@ class StreamEnvironment implements Environment { if (clientParametersPrototype.eventLoopGroup == null) { this.eventLoopGroup = Utils.eventLoopGroup(); + this.privateEventLoopGroup = true; shutdownService.wrap(() -> closeEventLoopGroup(this.eventLoopGroup)); - this.clientParametersPrototype = - clientParametersPrototype.duplicate().eventLoopGroup(this.eventLoopGroup); } else { - this.eventLoopGroup = null; - this.clientParametersPrototype = - clientParametersPrototype - .duplicate() - .eventLoopGroup(clientParametersPrototype.eventLoopGroup); + this.eventLoopGroup = clientParametersPrototype.eventLoopGroup; + this.privateEventLoopGroup = false; } + this.clientParametersPrototype = + clientParametersPrototype.duplicate().eventLoopGroup(this.eventLoopGroup); + this.producersCoordinator = new ProducersCoordinator( this, @@ -356,13 +357,19 @@ class StreamEnvironment implements Environment { clientFactory, this.locatorReconnectionScheduledExecutorService, this.recoveryBackOffDelayPolicy, - l.label()); + l.label(), + this::shouldTryLocatorConnection); } }); } }; if (lazyInit) { - this.locatorInitializationSequence = locatorInitSequence; + this.locatorInitializationSequence = + () -> { + if (this.shouldTryLocatorConnection()) { + locatorInitSequence.run(); + } + }; } else { locatorInitSequence.run(); locatorsInitialized.set(true); @@ -408,7 +415,8 @@ private ShutdownListener shutdownListener( clientFactory, this.locatorReconnectionScheduledExecutorService, delayPolicy, - label); + label, + this::shouldTryLocatorConnection); } else { LOGGER.debug("Locator connection '{}' closing normally", label); } @@ -425,7 +433,8 @@ private static void scheduleLocatorConnection( Function clientFactory, ScheduledExecutorService scheduler, BackOffDelayPolicy delayPolicy, - String locatorLabel) { + String locatorLabel, + BooleanSupplier shouldRetry) { LOGGER.debug( "Scheduling locator '{}' connection with delay policy {}", locatorLabel, delayPolicy); try { @@ -452,6 +461,7 @@ private static void scheduleLocatorConnection( .description("Locator '%s' connection", locatorLabel) .scheduler(scheduler) .delayPolicy(delayPolicy) + .retry(ignored -> shouldRetry.getAsBoolean()) .build() .thenAccept(locator::client) .exceptionally( @@ -777,14 +787,19 @@ public void close() { if (this.locatorReconnectionScheduledExecutorService != null) { this.locatorReconnectionScheduledExecutorService.shutdownNow(); } - closeEventLoopGroup(this.eventLoopGroup); + if (this.privateEventLoopGroup) { + closeEventLoopGroup(this.eventLoopGroup); + } } } + private boolean shouldTryLocatorConnection() { + return !this.closed.get() && !this.eventLoopGroup.isShuttingDown(); + } + private static void closeEventLoopGroup(EventLoopGroup eventLoopGroup) { try { - if (eventLoopGroup != null - && (!eventLoopGroup.isShuttingDown() || !eventLoopGroup.isShutdown())) { + if (!eventLoopGroup.isShuttingDown()) { LOGGER.debug("Closing Netty event loop group"); eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS); } diff --git a/src/test/java/com/rabbitmq/stream/impl/DedupIdempotencyTest.java b/src/test/java/com/rabbitmq/stream/impl/DedupIdempotencyTest.java index 9e3982eb43..9d628b6f4a 100644 --- a/src/test/java/com/rabbitmq/stream/impl/DedupIdempotencyTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/DedupIdempotencyTest.java @@ -43,6 +43,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -57,7 +58,12 @@ public class DedupIdempotencyTest { void init() { EnvironmentBuilder environmentBuilder = Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder(); - environment = environmentBuilder.build(); + this.environment = environmentBuilder.build(); + } + + @AfterEach + void tearDown() { + this.environment.close(); } @Test diff --git a/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java index 60e1ffaf2e..c35567a1a6 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java @@ -53,10 +53,8 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -@ExtendWith({ - TestUtils.StreamTestInfrastructureExtension.class, - BrokerVersionAtLeast311Condition.class -}) +@ExtendWith({BrokerVersionAtLeast311Condition.class}) +@StreamTestInfrastructure @SingleActiveConsumer public class SacSuperStreamConsumerTest { EventLoopGroup eventLoopGroup; diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java index 61a5b73540..407f3c6948 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java @@ -39,11 +39,14 @@ import com.rabbitmq.stream.impl.Client.OutboundEntityWriteCallback; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import java.time.Duration; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -70,7 +73,7 @@ public class StreamProducerUnitTest { @Mock Channel channel; @Mock ChannelFuture channelFuture; - Set buffers = ConcurrentHashMap.newKeySet(); + Queue buffers = new ConcurrentLinkedQueue<>(); ScheduledExecutorService executorService; Clock clock = new Clock(); @@ -82,15 +85,16 @@ public class StreamProducerUnitTest { void init() { mocks = MockitoAnnotations.openMocks(this); executorService = Executors.newScheduledThreadPool(2); - when(channel.alloc()).thenReturn(Utils.byteBufAllocator()); + ByteBufAllocator allocator = new UnpooledByteBufAllocator(false); + when(channel.alloc()).thenReturn(allocator); when(channel.writeAndFlush(Mockito.any())).thenReturn(channelFuture); when(client.allocateNoCheck(any(ByteBufAllocator.class), anyInt())) .thenAnswer( (Answer) invocation -> { - ByteBufAllocator allocator = invocation.getArgument(0); + ByteBufAllocator alloc = invocation.getArgument(0); int capacity = invocation.getArgument(1); - ByteBuf buffer = allocator.buffer(capacity); + ByteBuf buffer = alloc.buffer(capacity); buffers.add(buffer); return buffer; });