2323import com .rabbitmq .client .Address ;
2424import com .rabbitmq .client .ConnectionFactory ;
2525import com .rabbitmq .client .MalformedFrameException ;
26+ import com .rabbitmq .client .ShutdownSignalException ;
2627import com .rabbitmq .client .SocketConfigurator ;
2728import io .netty .bootstrap .Bootstrap ;
2829import io .netty .buffer .ByteBuf ;
5960import java .util .concurrent .atomic .AtomicReference ;
6061import java .util .function .Consumer ;
6162import java .util .function .Function ;
63+ import java .util .function .Predicate ;
6264import javax .net .ssl .SSLHandshakeException ;
6365import org .slf4j .Logger ;
6466import org .slf4j .LoggerFactory ;
@@ -71,6 +73,7 @@ public final class NettyFrameHandlerFactory extends AbstractFrameHandlerFactory
7173 private final Consumer <Channel > channelCustomizer ;
7274 private final Consumer <Bootstrap > bootstrapCustomizer ;
7375 private final Duration enqueuingTimeout ;
76+ private final Predicate <ShutdownSignalException > willRecover ;
7477
7578 public NettyFrameHandlerFactory (
7679 EventLoopGroup eventLoopGroup ,
@@ -80,14 +83,30 @@ public NettyFrameHandlerFactory(
8083 Duration enqueuingTimeout ,
8184 int connectionTimeout ,
8285 SocketConfigurator configurator ,
83- int maxInboundMessageBodySize ) {
86+ int maxInboundMessageBodySize ,
87+ boolean automaticRecovery ,
88+ Predicate <ShutdownSignalException > recoveryCondition ) {
8489 super (connectionTimeout , configurator , sslContextFactory != null , maxInboundMessageBodySize );
8590 this .eventLoopGroup = eventLoopGroup ;
8691 this .sslContextFactory = sslContextFactory == null ? connName -> null : sslContextFactory ;
8792 this .channelCustomizer = channelCustomizer == null ? Utils .noOpConsumer () : channelCustomizer ;
8893 this .bootstrapCustomizer =
8994 bootstrapCustomizer == null ? Utils .noOpConsumer () : bootstrapCustomizer ;
9095 this .enqueuingTimeout = enqueuingTimeout ;
96+ this .willRecover =
97+ sse -> {
98+ if (!automaticRecovery ) {
99+ return false ;
100+ } else {
101+ try {
102+ return recoveryCondition .test (sse );
103+ } catch (Exception e ) {
104+ // we assume it will recover, so we take the safe path to dispatch the closing
105+ // it avoids the risk of deadlock
106+ return true ;
107+ }
108+ }
109+ };
91110 }
92111
93112 private static void closeNettyState (Channel channel , EventLoopGroup eventLoopGroup ) {
@@ -131,6 +150,7 @@ public FrameHandler create(Address addr, String connectionName) throws IOExcepti
131150 sslContext ,
132151 this .eventLoopGroup ,
133152 this .enqueuingTimeout ,
153+ this .willRecover ,
134154 this .channelCustomizer ,
135155 this .bootstrapCustomizer );
136156 }
@@ -161,6 +181,7 @@ private NettyFrameHandler(
161181 SslContext sslContext ,
162182 EventLoopGroup elg ,
163183 Duration enqueuingTimeout ,
184+ Predicate <ShutdownSignalException > willRecover ,
164185 Consumer <Channel > channelCustomizer ,
165186 Consumer <Bootstrap > bootstrapCustomizer )
166187 throws IOException {
@@ -193,7 +214,8 @@ private NettyFrameHandler(
193214 int lengthFieldOffset = 3 ;
194215 int lengthFieldLength = 4 ;
195216 int lengthAdjustement = 1 ;
196- AmqpHandler amqpHandler = new AmqpHandler (maxInboundMessageBodySize , this ::close );
217+ AmqpHandler amqpHandler =
218+ new AmqpHandler (maxInboundMessageBodySize , this ::close , willRecover );
197219 int port = ConnectionFactory .portOrDefault (addr .getPort (), sslContext != null );
198220 b .handler (
199221 new ChannelInitializer <SocketChannel >() {
@@ -401,14 +423,26 @@ private static class AmqpHandler extends ChannelInboundHandlerAdapter {
401423
402424 private final int maxPayloadSize ;
403425 private final Runnable closeSequence ;
426+ private final Predicate <ShutdownSignalException > willRecover ;
404427 private volatile AMQConnection connection ;
428+ private volatile Channel ch ;
405429 private final AtomicBoolean writable = new AtomicBoolean (true );
406430 private final AtomicReference <CountDownLatch > writableLatch =
407431 new AtomicReference <>(new CountDownLatch (1 ));
408432
409- private AmqpHandler (int maxPayloadSize , Runnable closeSequence ) {
433+ private AmqpHandler (
434+ int maxPayloadSize ,
435+ Runnable closeSequence ,
436+ Predicate <ShutdownSignalException > willRecover ) {
410437 this .maxPayloadSize = maxPayloadSize ;
411438 this .closeSequence = closeSequence ;
439+ this .willRecover = willRecover ;
440+ }
441+
442+ @ Override
443+ public void channelActive (ChannelHandlerContext ctx ) throws Exception {
444+ this .ch = ctx .channel ();
445+ super .channelActive (ctx );
412446 }
413447
414448 @ Override
@@ -441,7 +475,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
441475 if (noProblem
442476 && (!this .connection .isRunning () || this .connection .hasBrokerInitiatedShutdown ())) {
443477 // looks like the frame was Close-Ok or Close
444- ctx . executor (). submit (() -> this .connection .doFinalShutdown ());
478+ this . dispatchShutdownToConnection (() -> this .connection .doFinalShutdown ());
445479 }
446480 } finally {
447481 m .release ();
@@ -468,10 +502,10 @@ public void channelInactive(ChannelHandlerContext ctx) {
468502 AMQConnection c = this .connection ;
469503 if (c .isOpen ()) {
470504 // it is likely to be an IO exception
471- c .handleIoError (null );
505+ this . dispatchShutdownToConnection (() -> c .handleIoError (null ) );
472506 } else {
473507 // just in case, the call is idempotent anyway
474- c . doFinalShutdown ( );
508+ this . dispatchShutdownToConnection ( c :: doFinalShutdown );
475509 }
476510 }
477511 }
@@ -497,7 +531,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
497531 this .connection .getAddress ().getHostName (),
498532 this .connection .getPort ());
499533 if (needToDispatchIoError ()) {
500- this .connection .handleHeartbeatFailure ();
534+ this .dispatchShutdownToConnection (() -> this . connection .handleHeartbeatFailure () );
501535 }
502536 } else if (e .state () == IdleState .WRITER_IDLE ) {
503537 this .connection .writeFrame (new Frame (AMQP .FRAME_HEARTBEAT , 0 ));
@@ -509,7 +543,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
509543
510544 private void handleIoError (Throwable cause ) {
511545 if (needToDispatchIoError ()) {
512- this .connection .handleIoError (cause );
546+ this .dispatchShutdownToConnection (() -> this . connection .handleIoError (cause ) );
513547 } else {
514548 this .closeSequence .run ();
515549 }
@@ -527,6 +561,32 @@ private boolean isWritable() {
527561 private CountDownLatch writableLatch () {
528562 return this .writableLatch .get ();
529563 }
564+
565+ protected void dispatchShutdownToConnection (Runnable connectionShutdownRunnable ) {
566+ String name = "rabbitmq-connection-shutdown" ;
567+ AMQConnection c = this .connection ;
568+ if (c == null || ch == null ) {
569+ // not enough information, we dispatch in separate thread
570+ Environment .newThread (connectionShutdownRunnable , name ).start ();
571+ } else {
572+ if (ch .eventLoop ().inEventLoop ()) {
573+ if (this .willRecover .test (c .getCloseReason ())) {
574+ // the connection will recover, we don't want this to happen in the event loop,
575+ // it could cause a deadlock, so using a separate thread
576+ name = name + "-" + c ;
577+ System .out .println ("in separate thread" );
578+ Environment .newThread (connectionShutdownRunnable , name ).start ();
579+ } else {
580+ // no recovery, it is safe to dispatch in the event loop
581+ System .out .println ("in event loop" );
582+ ch .eventLoop ().submit (connectionShutdownRunnable );
583+ }
584+ } else {
585+ // not in the event loop, we can run it in the same thread
586+ connectionShutdownRunnable .run ();
587+ }
588+ }
589+ }
530590 }
531591
532592 private static final class ProtocolVersionMismatchHandler extends ChannelInboundHandlerAdapter {
0 commit comments