Skip to content

Commit e406a6d

Browse files
committed
Check Netty event loop group is open before creating frame handler
No need to try to connect if the event loop group was shut down. This also triggers an infinite cycle of connection recovery in the following case: connection disconnected, recovery starts, event loop group closed, new connection attempt, Netty channel created and becomes inactive immediately, recovery restarts, etc. References #1663
1 parent 0520d33 commit e406a6d

File tree

8 files changed

+70
-51
lines changed

8 files changed

+70
-51
lines changed

src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,14 @@ private NettyFrameHandler(
200200
} else {
201201
this.eventLoopGroup = null;
202202
}
203+
204+
if (b.config().group() == null) {
205+
throw new IllegalStateException("The event loop group is not set");
206+
} else if (b.config().group().isShuttingDown()) {
207+
LOGGER.warn("The Netty loop group was shut down, it is not possible to connect or recover");
208+
throw new IllegalStateException("The event loop group was shut down");
209+
}
210+
203211
if (b.config().channelFactory() == null) {
204212
b.channel(NioSocketChannel.class);
205213
}
@@ -317,6 +325,10 @@ public void sendHeader() {
317325

318326
@Override
319327
public void initialize(AMQConnection connection) {
328+
LOGGER.debug(
329+
"Setting connection {} to AMQP handler {}",
330+
connection.getClientProvidedName(),
331+
this.handler.id);
320332
this.handler.connection = connection;
321333
}
322334

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -591,16 +591,16 @@ private synchronized void beginAutomaticRecovery() throws InterruptedException {
591591
}
592592
LOGGER.debug("Connection {} has recovered", newConn);
593593
this.addAutomaticRecoveryListener(newConn);
594-
this.recoverShutdownListeners(newConn);
595-
this.recoverBlockedListeners(newConn);
596-
this.recoverChannels(newConn);
597-
// don't assign new delegate connection until channel recovery is complete
598-
this.delegate = newConn;
599-
if (this.params.isTopologyRecoveryEnabled()) {
600-
notifyTopologyRecoveryListenersStarted();
601-
recoverTopology(params.getTopologyRecoveryExecutor());
602-
}
603-
this.notifyRecoveryListenersComplete();
594+
this.recoverShutdownListeners(newConn);
595+
this.recoverBlockedListeners(newConn);
596+
this.recoverChannels(newConn);
597+
// don't assign new delegate connection until channel recovery is complete
598+
this.delegate = newConn;
599+
if (this.params.isTopologyRecoveryEnabled()) {
600+
notifyTopologyRecoveryListenersStarted();
601+
recoverTopology(params.getTopologyRecoveryExecutor());
602+
}
603+
this.notifyRecoveryListenersComplete();
604604
}
605605

606606
private void recoverShutdownListeners(final RecoveryAwareAMQConnection newConn) {
@@ -624,25 +624,24 @@ private RecoveryAwareAMQConnection recoverConnection() throws InterruptedExcepti
624624
attempts++;
625625
// No Sonar: no need to close this resource because we're the one that creates it
626626
// and hands it over to the user
627-
RecoveryAwareAMQConnection newConn = this.cf.newConnection(); //NOSONAR
628-
synchronized(recoveryLock) {
629-
if (!manuallyClosed) {
630-
// This is the standard case.
631-
return newConn;
632-
}
633-
}
634-
// This is the once in a blue moon case.
635-
// Application code just called close as the connection
636-
// was being re-established. So we attempt to close the newly created connection.
637-
newConn.abort();
638-
return null;
627+
RecoveryAwareAMQConnection newConn = this.cf.newConnection(); //NOSONAR
628+
synchronized(recoveryLock) {
629+
if (!manuallyClosed) {
630+
// This is the standard case.
631+
return newConn;
632+
}
633+
}
634+
// This is the once in a blue moon case.
635+
// Application code just called close as the connection
636+
// was being re-established. So we attempt to close the newly created connection.
637+
newConn.abort();
638+
return null;
639639
} catch (Exception e) {
640640
Thread.sleep(this.params.getRecoveryDelayHandler().getDelay(attempts));
641641
this.getExceptionHandler().handleConnectionRecoveryException(this, e);
642642
}
643643
}
644-
645-
return null;
644+
return null;
646645
}
647646

648647
private void recoverChannels(final RecoveryAwareAMQConnection newConn) {

src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,8 @@ public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutExc
7171
conn.start();
7272
metricsCollector.newConnection(conn);
7373
return conn;
74-
} catch (IOException e) {
74+
} catch (IOException | TimeoutException e) {
7575
lastException = e;
76-
} catch (TimeoutException te) {
77-
lastException = te;
7876
}
7977
}
8078

src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -143,16 +143,19 @@ public void afterAll(ExtensionContext context) {
143143
.getRoot()
144144
.getStore(ExtensionContext.Namespace.GLOBAL)
145145
.getOrComputeIfAbsent(ExecutorServiceCloseableResourceWrapper.class);
146-
wrapper.executorService.submit(
147-
() -> {
148-
try {
149-
eventLoopGroup.shutdownGracefully(0, 0, SECONDS).get(10, SECONDS);
150-
} catch (InterruptedException e) {
151-
Thread.currentThread().interrupt();
152-
} catch (Exception e) {
153-
LOGGER.warn("Error while asynchronously closing Netty event loop group", e);
154-
}
155-
});
146+
147+
wrapper
148+
.executorService
149+
.submit(
150+
() -> {
151+
try {
152+
eventLoopGroup.shutdownGracefully(0, 0, SECONDS).get(10, SECONDS);
153+
} catch (InterruptedException e) {
154+
Thread.currentThread().interrupt();
155+
} catch (Exception e) {
156+
LOGGER.warn("Error while asynchronously closing Netty event loop group", e);
157+
}
158+
});
156159
}
157160
}
158161

src/test/java/com/rabbitmq/client/test/BrokerTestCase.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ protected void bareRestart()
125125
public void openConnection()
126126
throws IOException, TimeoutException {
127127
if (connection == null) {
128-
connection = connectionFactory.newConnection(UUID.randomUUID().toString());
128+
connection = connectionFactory.newConnection(generateConnectionName());
129129
}
130130
}
131131

@@ -327,6 +327,11 @@ protected String generateExchangeName() {
327327
this.testInfo.getTestMethod().get().getName());
328328
}
329329

330+
protected String generateConnectionName() {
331+
return name("conn", this.testInfo.getTestClass().get(),
332+
this.testInfo.getTestMethod().get().getName());
333+
}
334+
330335
private static String name(String prefix, Class<?> testClass, String testMethodName) {
331336
String uuid = UUID.randomUUID().toString();
332337
return String.format(

src/test/java/com/rabbitmq/client/test/ClientTestSuite.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
ValueWriterTest.class,
7474
BlockedConnectionTest.class,
7575
NettyTest.class,
76+
IoDeadlockOnConnectionClosing.class,
7677
ProtocolVersionMismatch.class
7778
})
7879
public class ClientTestSuite {

src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public class ConnectionRecovery extends BrokerTestCase {
5757

5858
@Test public void namedConnectionRecovery()
5959
throws IOException, InterruptedException, TimeoutException {
60-
String connectionName = "custom-name";
60+
String connectionName = generateConnectionName();
6161
RecoverableConnection c = newRecoveringConnection(connectionName);
6262
try {
6363
assertThat(c.isOpen()).isTrue();
@@ -151,7 +151,7 @@ public String getPassword() {
151151
return password;
152152
}
153153
});
154-
RecoverableConnection c = (RecoverableConnection) cf.newConnection(UUID.randomUUID().toString());
154+
RecoverableConnection c = (RecoverableConnection) cf.newConnection(generateConnectionName());
155155
try {
156156
assertThat(c.isOpen()).isTrue();
157157
assertThat(usernameRequested.get()).isEqualTo(1);
@@ -787,13 +787,14 @@ public void handleDelivery(String consumerTag,
787787
@Test public void recoveryWithExponentialBackoffDelayHandler() throws Exception {
788788
ConnectionFactory connectionFactory = TestUtils.connectionFactory();
789789
connectionFactory.setRecoveryDelayHandler(new RecoveryDelayHandler.ExponentialBackoffDelayHandler());
790-
Connection testConnection = connectionFactory.newConnection(UUID.randomUUID().toString());
790+
String connName = generateConnectionName();
791+
Connection testConnection = connectionFactory.newConnection(connName);
791792
try {
792793
assertThat(testConnection.isOpen()).isTrue();
793794
TestUtils.closeAndWaitForRecovery((RecoverableConnection) testConnection);
794795
assertThat(testConnection.isOpen()).isTrue();
795796
} finally {
796-
connection.close();
797+
testConnection.close();
797798
}
798799
}
799800

@@ -807,7 +808,7 @@ public void handleDelivery(String consumerTag,
807808
connectionFactory.setTopologyRecoveryExecutor(executor);
808809
assertThat(connectionFactory.getTopologyRecoveryExecutor()).isEqualTo(executor);
809810
RecoverableConnection testConnection = (RecoverableConnection) connectionFactory.newConnection(
810-
UUID.randomUUID().toString()
811+
generateConnectionName()
811812
);
812813
try {
813814
final List<Channel> channels = new ArrayList<Channel>();
@@ -970,26 +971,26 @@ protected ConnectionFactory newConnectionFactory() {
970971
return buildConnectionFactoryWithRecoveryEnabled(false);
971972
}
972973

973-
private static RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery)
974+
private RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery)
974975
throws IOException, TimeoutException {
975976
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery);
976-
return (AutorecoveringConnection) cf.newConnection(UUID.randomUUID().toString());
977+
return (AutorecoveringConnection) cf.newConnection(generateConnectionName());
977978
}
978979

979-
private static RecoverableConnection newRecoveringConnection(Address[] addresses)
980+
private RecoverableConnection newRecoveringConnection(Address[] addresses)
980981
throws IOException, TimeoutException {
981982
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(false);
982983
// specifically use the Address[] overload
983-
return (AutorecoveringConnection) cf.newConnection(addresses, UUID.randomUUID().toString());
984+
return (AutorecoveringConnection) cf.newConnection(addresses, generateConnectionName());
984985
}
985986

986-
private static RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery, List<Address> addresses)
987+
private RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery, List<Address> addresses)
987988
throws IOException, TimeoutException {
988989
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery);
989-
return (AutorecoveringConnection) cf.newConnection(addresses, UUID.randomUUID().toString());
990+
return (AutorecoveringConnection) cf.newConnection(addresses, generateConnectionName());
990991
}
991992

992-
private static RecoverableConnection newRecoveringConnection(List<Address> addresses)
993+
private RecoverableConnection newRecoveringConnection(List<Address> addresses)
993994
throws IOException, TimeoutException {
994995
return newRecoveringConnection(false, addresses);
995996
}

src/test/java/com/rabbitmq/tools/Host.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class Host {
4040
private static final Logger LOGGER = LoggerFactory.getLogger(Host.class);
4141

4242
private static final String DOCKER_PREFIX = "DOCKER:";
43-
private static final Pattern CONNECTION_NAME_PATTERN = Pattern.compile("\"connection_name\",\"(?<name>[a-zA-Z0-9\\-]+)?\"");
43+
private static final Pattern CONNECTION_NAME_PATTERN = Pattern.compile("\"connection_name\",\"(?<name>[a-zA-Z0-9\\-_]+)?\"");
4444

4545
public static String hostname() {
4646
try {

0 commit comments

Comments
 (0)