diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java index 326dce08d99..7758ae94bcf 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java @@ -215,6 +215,9 @@ protected synchronized void doStop() { && this.clientFactory.getConnectionOptions().isCleanSession())) { this.client.unsubscribe(getTopic()); + // Have to re-subscribe on next start if connection is not lost. + this.readyToSubscribeOnStart = true; + } } catch (MqttException ex1) { @@ -341,6 +344,10 @@ public synchronized void connectionLost(Throwable cause) { applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause)); } } + else { + // The 'connectComplete()' re-subscribes or sets this flag otherwise. + this.readyToSubscribeOnStart = false; + } } @Override @@ -404,7 +411,9 @@ public void connectComplete(boolean isReconnect) { @Override public void connectComplete(boolean reconnect, String serverURI) { - if (isRunning()) { + // The 'running' flag is set after 'doStart()', so possible a race condition + // when start is not finished yet, but server answers with successful connection. + if (isActive()) { subscribe(); } else { diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java index 2474070b0cb..cab526ddc5b 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java @@ -223,15 +223,17 @@ private synchronized void connect() throws MqttException { @Override protected void doStop() { - this.readyToSubscribeOnStart = false; this.topicLock.lock(); + this.readyToSubscribeOnStart = false; String[] topics = getTopic(); try { if (this.mqttClient != null && this.mqttClient.isConnected()) { if (this.connectionOptions.isCleanStart()) { this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout()); - } + // Have to re-subscribe on next start if connection is not lost. + this.readyToSubscribeOnStart = true; + } if (getClientManager() == null) { this.mqttClient.disconnectForcibly(getDisconnectCompletionTimeout()); } @@ -331,10 +333,16 @@ public void messageArrived(String topic, MqttMessage mqttMessage) { @Override public void disconnected(MqttDisconnectResponse disconnectResponse) { - MqttException cause = disconnectResponse.getException(); - ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); - if (applicationEventPublisher != null) { - applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause)); + if (isRunning()) { + MqttException cause = disconnectResponse.getException(); + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause)); + } + } + else { + // The 'connectComplete()' re-subscribes or sets this flag otherwise. + this.readyToSubscribeOnStart = false; } } @@ -358,7 +366,9 @@ public void connectComplete(boolean isReconnect) { @Override public void connectComplete(boolean reconnect, String serverURI) { - if (isRunning()) { + // The 'running' flag is set after 'doStart()', so possible a race condition + // when start is not finished yet, but server answers with successful connection. + if (isActive()) { subscribe(); } else { diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ResubscribeAfterAutomaticReconnectTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ResubscribeAfterAutomaticReconnectTests.java index 97a5262b8d9..2a337327b8a 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ResubscribeAfterAutomaticReconnectTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ResubscribeAfterAutomaticReconnectTests.java @@ -60,12 +60,18 @@ public class ResubscribeAfterAutomaticReconnectTests implements MosquittoContain @Autowired private MqttConnectionOptions connectionOptions; + @Autowired + Mqttv5PahoMessageDrivenChannelAdapter pahoMessageDrivenChannelAdapter; + @Autowired Config config; @Test void messageReceivedAfterResubscriptionOnLostConnection() throws InterruptedException { GenericMessage testMessage = new GenericMessage<>("test"); + + assertThat(this.config.subscribeFirstLatch.await(10, TimeUnit.SECONDS)).isTrue(); + this.mqttOutFlowInput.send(testMessage); assertThat(this.fromMqttChannel.receive(10_000)).isNotNull(); @@ -73,7 +79,16 @@ void messageReceivedAfterResubscriptionOnLostConnection() throws InterruptedExce MOSQUITTO_CONTAINER.start(); connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()}); - assertThat(this.config.subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.subscribeSecondLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + this.mqttOutFlowInput.send(testMessage); + assertThat(this.fromMqttChannel.receive(10_000)).isNotNull(); + + // Re-subscription on channel adapter restart with cleanStart + this.pahoMessageDrivenChannelAdapter.stop(); + this.pahoMessageDrivenChannelAdapter.start(); + + assertThat(this.config.subscribeThirdLatch.await(10, TimeUnit.SECONDS)).isTrue(); this.mqttOutFlowInput.send(testMessage); assertThat(this.fromMqttChannel.receive(10_000)).isNotNull(); @@ -83,13 +98,18 @@ void messageReceivedAfterResubscriptionOnLostConnection() throws InterruptedExce @EnableIntegration public static class Config { - CountDownLatch subscribeLatch = new CountDownLatch(2); + CountDownLatch subscribeFirstLatch = new CountDownLatch(1); + + CountDownLatch subscribeSecondLatch = new CountDownLatch(2); + + CountDownLatch subscribeThirdLatch = new CountDownLatch(3); @Bean public MqttConnectionOptions mqttConnectOptions() { return new MqttConnectionOptionsBuilder() .serverURI(MosquittoContainerTest.mqttUrl()) .automaticReconnect(true) + .cleanStart(true) .build(); } @@ -105,7 +125,6 @@ public IntegrationFlow mqttOutFlow(MqttConnectionOptions mqttConnectOptions) { public IntegrationFlow mqttInFlow(MqttConnectionOptions mqttConnectOptions) { Mqttv5PahoMessageDrivenChannelAdapter messageProducer = new Mqttv5PahoMessageDrivenChannelAdapter(mqttConnectOptions, "mqttInClient", "siTest"); - return IntegrationFlow.from(messageProducer) .channel(c -> c.queue("fromMqttChannel")) .get(); @@ -113,7 +132,9 @@ public IntegrationFlow mqttInFlow(MqttConnectionOptions mqttConnectOptions) { @EventListener(MqttSubscribedEvent.class) public void mqttEvents() { - this.subscribeLatch.countDown(); + this.subscribeFirstLatch.countDown(); + this.subscribeSecondLatch.countDown(); + this.subscribeThirdLatch.countDown(); } }