diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index f5c5121937..56d02862dc 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -184,7 +184,7 @@ public long applyAsLong(Object value) { private final Map connectionProperties; private final Duration rpcTimeout; private final List saslMechanisms; - private volatile ShutdownReason shutdownReason = null; + private AtomicReference shutdownReason = new AtomicReference<>(); private final Runnable streamStatsCommandVersionsCheck; private final boolean filteringSupported; private final Runnable superStreamManagementCommandVersionsCheck; @@ -1435,17 +1435,21 @@ public Response unsubscribe(byte subscriptionId) { public void close() { if (closing.compareAndSet(false, true)) { - LOGGER.debug("Closing client"); - - sendClose(RESPONSE_CODE_OK, "OK"); - - closingSequence(ShutdownContext.ShutdownReason.CLIENT_CLOSE); - + LOGGER.debug("Closing client, channel still active? {}", this.channel.isActive()); + ShutdownReason reason; + if (this.channel.isActive()) { + sendClose(RESPONSE_CODE_OK, "OK"); + reason = ShutdownReason.CLIENT_CLOSE; + } else { + reason = ShutdownReason.UNKNOWN; + } + closingSequence(reason); LOGGER.debug("Client closed"); } } void closingSequence(ShutdownContext.ShutdownReason reason) { + this.shutdownReason(reason); if (reason != null) { this.shutdownListenerCallback.accept(reason); } @@ -1713,7 +1717,7 @@ public void consumerUpdateResponse( } void shutdownReason(ShutdownReason reason) { - this.shutdownReason = reason; + this.shutdownReason.compareAndSet(null, reason); } public SocketAddress localAddress() { @@ -2858,16 +2862,14 @@ public void channelInactive(ChannelHandlerContext ctx) { // the event is actually dispatched to the listener, emitting // an UNKNOWN reason instead of SERVER_CLOSE. So we skip the closing here // because it will be handled later anyway. - if (shutdownReason == null) { + if (shutdownReason.get() == null) { + LOGGER.debug("No shutdown reason"); if (closing.compareAndSet(false, true)) { + LOGGER.debug("Closing with 'unknown' shutdown reason"); if (executorService == null) { // the TCP connection is closed before the state is initialized // we do our best the execute the closing sequence - new Thread( - () -> { - closingSequence(ShutdownReason.UNKNOWN); - }) - .start(); + new Thread(() -> closingSequence(ShutdownReason.UNKNOWN)).start(); } else { executorService.submit(() -> closingSequence(ShutdownReason.UNKNOWN)); }