Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public long applyAsLong(Object value) {
private final Map<String, String> connectionProperties;
private final Duration rpcTimeout;
private final List<String> saslMechanisms;
private volatile ShutdownReason shutdownReason = null;
private AtomicReference<ShutdownReason> shutdownReason = new AtomicReference<>();
private final Runnable streamStatsCommandVersionsCheck;
private final boolean filteringSupported;
private final Runnable superStreamManagementCommandVersionsCheck;
Expand Down Expand Up @@ -1435,17 +1435,21 @@ public Response unsubscribe(byte subscriptionId) {

public void close() {
if (closing.compareAndSet(false, true)) {
LOGGER.debug("Closing client");

sendClose(RESPONSE_CODE_OK, "OK");

closingSequence(ShutdownContext.ShutdownReason.CLIENT_CLOSE);

LOGGER.debug("Closing client, channel still active? {}", this.channel.isActive());
ShutdownReason reason;
if (this.channel.isActive()) {
sendClose(RESPONSE_CODE_OK, "OK");
reason = ShutdownReason.CLIENT_CLOSE;
} else {
reason = ShutdownReason.UNKNOWN;
}
closingSequence(reason);
LOGGER.debug("Client closed");
}
}

void closingSequence(ShutdownContext.ShutdownReason reason) {
this.shutdownReason(reason);
if (reason != null) {
this.shutdownListenerCallback.accept(reason);
}
Expand Down Expand Up @@ -1713,7 +1717,7 @@ public void consumerUpdateResponse(
}

void shutdownReason(ShutdownReason reason) {
this.shutdownReason = reason;
this.shutdownReason.compareAndSet(null, reason);
}

public SocketAddress localAddress() {
Expand Down Expand Up @@ -2858,16 +2862,14 @@ public void channelInactive(ChannelHandlerContext ctx) {
// the event is actually dispatched to the listener, emitting
// an UNKNOWN reason instead of SERVER_CLOSE. So we skip the closing here
// because it will be handled later anyway.
if (shutdownReason == null) {
if (shutdownReason.get() == null) {
LOGGER.debug("No shutdown reason");
if (closing.compareAndSet(false, true)) {
LOGGER.debug("Closing with 'unknown' shutdown reason");
if (executorService == null) {
// the TCP connection is closed before the state is initialized
// we do our best the execute the closing sequence
new Thread(
() -> {
closingSequence(ShutdownReason.UNKNOWN);
})
.start();
new Thread(() -> closingSequence(ShutdownReason.UNKNOWN)).start();
} else {
executorService.submit(() -> closingSequence(ShutdownReason.UNKNOWN));
}
Expand Down