From 0b5e77f1b2494c079e332b71d01d12adf62efeae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 20 Aug 2025 11:40:58 +0200 Subject: [PATCH] Fix race condition on connection closing The broker can close a connection abruptly (in case of re-authentication fails) and the application can call Client#close() at the same time. There can be a race condition whereby the call to close() triggers a regular closing initiated by the client but the Netty channel is already inactive. The call to close() then blocks until it reaches the RPC timeout. This commit makes sure the channel is still active before trying to send the close command to the broker. --- .../java/com/rabbitmq/stream/impl/Client.java | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index f5c5121937..56d02862dc 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -184,7 +184,7 @@ public long applyAsLong(Object value) { private final Map connectionProperties; private final Duration rpcTimeout; private final List saslMechanisms; - private volatile ShutdownReason shutdownReason = null; + private AtomicReference shutdownReason = new AtomicReference<>(); private final Runnable streamStatsCommandVersionsCheck; private final boolean filteringSupported; private final Runnable superStreamManagementCommandVersionsCheck; @@ -1435,17 +1435,21 @@ public Response unsubscribe(byte subscriptionId) { public void close() { if (closing.compareAndSet(false, true)) { - LOGGER.debug("Closing client"); - - sendClose(RESPONSE_CODE_OK, "OK"); - - closingSequence(ShutdownContext.ShutdownReason.CLIENT_CLOSE); - + LOGGER.debug("Closing client, channel still active? {}", this.channel.isActive()); + ShutdownReason reason; + if (this.channel.isActive()) { + sendClose(RESPONSE_CODE_OK, "OK"); + reason = ShutdownReason.CLIENT_CLOSE; + } else { + reason = ShutdownReason.UNKNOWN; + } + closingSequence(reason); LOGGER.debug("Client closed"); } } void closingSequence(ShutdownContext.ShutdownReason reason) { + this.shutdownReason(reason); if (reason != null) { this.shutdownListenerCallback.accept(reason); } @@ -1713,7 +1717,7 @@ public void consumerUpdateResponse( } void shutdownReason(ShutdownReason reason) { - this.shutdownReason = reason; + this.shutdownReason.compareAndSet(null, reason); } public SocketAddress localAddress() { @@ -2858,16 +2862,14 @@ public void channelInactive(ChannelHandlerContext ctx) { // the event is actually dispatched to the listener, emitting // an UNKNOWN reason instead of SERVER_CLOSE. So we skip the closing here // because it will be handled later anyway. - if (shutdownReason == null) { + if (shutdownReason.get() == null) { + LOGGER.debug("No shutdown reason"); if (closing.compareAndSet(false, true)) { + LOGGER.debug("Closing with 'unknown' shutdown reason"); if (executorService == null) { // the TCP connection is closed before the state is initialized // we do our best the execute the closing sequence - new Thread( - () -> { - closingSequence(ShutdownReason.UNKNOWN); - }) - .start(); + new Thread(() -> closingSequence(ShutdownReason.UNKNOWN)).start(); } else { executorService.submit(() -> closingSequence(ShutdownReason.UNKNOWN)); }