Skip to content

Commit 5da520f

Browse files
committed
Make dispatching idempotent
1 parent 7600254 commit 5da520f

File tree

1 file changed

+20
-19
lines changed

1 file changed

+20
-19
lines changed

src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,7 @@ private static class AmqpHandler extends ChannelInboundHandlerAdapter {
429429
private final AtomicBoolean writable = new AtomicBoolean(true);
430430
private final AtomicReference<CountDownLatch> writableLatch =
431431
new AtomicReference<>(new CountDownLatch(1));
432+
private final AtomicBoolean shutdownDispatched = new AtomicBoolean(false);
432433

433434
private AmqpHandler(
434435
int maxPayloadSize,
@@ -563,27 +564,27 @@ private CountDownLatch writableLatch() {
563564
}
564565

565566
protected void dispatchShutdownToConnection(Runnable connectionShutdownRunnable) {
566-
String name = "rabbitmq-connection-shutdown";
567-
AMQConnection c = this.connection;
568-
if (c == null || ch == null) {
569-
// not enough information, we dispatch in separate thread
570-
Environment.newThread(connectionShutdownRunnable, name).start();
571-
} else {
572-
if (ch.eventLoop().inEventLoop()) {
573-
if (this.willRecover.test(c.getCloseReason())) {
574-
// the connection will recover, we don't want this to happen in the event loop,
575-
// it could cause a deadlock, so using a separate thread
576-
name = name + "-" + c;
577-
System.out.println("in separate thread");
578-
Environment.newThread(connectionShutdownRunnable, name).start();
567+
if (this.shutdownDispatched.compareAndSet(false, true)) {
568+
String name = "rabbitmq-connection-shutdown";
569+
AMQConnection c = this.connection;
570+
if (c == null || ch == null) {
571+
// not enough information, we dispatch in separate thread
572+
Environment.newThread(connectionShutdownRunnable, name).start();
573+
} else {
574+
if (ch.eventLoop().inEventLoop()) {
575+
if (this.willRecover.test(c.getCloseReason()) || ch.eventLoop().isShuttingDown()) {
576+
// the connection will recover, we don't want this to happen in the event loop,
577+
// it could cause a deadlock, so using a separate thread
578+
name = name + "-" + c;
579+
Environment.newThread(connectionShutdownRunnable, name).start();
580+
} else {
581+
// no recovery, it is safe to dispatch in the event loop
582+
ch.eventLoop().submit(connectionShutdownRunnable);
583+
}
579584
} else {
580-
// no recovery, it is safe to dispatch in the event loop
581-
System.out.println("in event loop");
582-
ch.eventLoop().submit(connectionShutdownRunnable);
585+
// not in the event loop, we can run it in the same thread
586+
connectionShutdownRunnable.run();
583587
}
584-
} else {
585-
// not in the event loop, we can run it in the same thread
586-
connectionShutdownRunnable.run();
587588
}
588589
}
589590
}

0 commit comments

Comments
 (0)