diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java index be233487be5..d9b3ce87ac2 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java @@ -59,7 +59,7 @@ * * It is recommended to have the {@link MqttConnectionOptions#setAutomaticReconnect(boolean)} * set to true to let an internal {@link IMqttAsyncClient} instance to handle reconnects. - * Otherwise, the manual restart of this component can only handle reconnects, e.g. via + * Otherwise, only the manual restart of this component can handle reconnects, e.g. via * {@link MqttConnectionFailedEvent} handling on disconnection. * * See {@link #setPayloadType} for more information about type conversion. @@ -190,6 +190,7 @@ protected void doStop() { String[] topics = getTopic(); try { this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout()); + this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout()); } catch (MqttException ex) { logger.error(ex, () -> "Error unsubscribing from " + Arrays.toString(topics)); @@ -199,6 +200,17 @@ protected void doStop() { } } + @Override + public void destroy() { + super.destroy(); + try { + this.mqttClient.close(true); + } + catch (MqttException ex) { + logger.error(ex, "Failed to close 'MqttAsyncClient'"); + } + } + @Override public void addTopic(String topic, int qos) { this.topicLock.lock(); diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java index 2eb23d67b6a..10318d98c50 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java @@ -50,6 +50,12 @@ /** * The {@link AbstractMqttMessageHandler} implementation for MQTT v5. * + * It is recommended to have the {@link MqttConnectionOptions#setAutomaticReconnect(boolean)} + * set to true to let an internal {@link IMqttAsyncClient} instance to handle reconnects. + * Otherwise, only the manual restart of this component can handle reconnects, e.g. via + * {@link MqttConnectionFailedEvent} handling on disconnection. + * + * * @author Artem Bilan * * @since 5.5.5 @@ -159,7 +165,11 @@ protected void doStart() { this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout()); } catch (MqttException ex) { - throw new IllegalStateException("Cannot connect 'MqttAsyncClient' for: " + getComponentName(), ex); + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex)); + } + logger.error(ex, "MQTT client failed to connect. Will retry if 'ConnectionOptions.isAutomaticReconnect()'."); } } @@ -177,7 +187,7 @@ protected void doStop() { public void destroy() { super.destroy(); try { - this.mqttClient.close(); + this.mqttClient.close(true); } catch (MqttException ex) { logger.error(ex, "Failed to close 'MqttAsyncClient'"); diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java index 6cd1b1e9e02..1fc4a34e763 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java @@ -56,7 +56,7 @@ * @author Gary Russell * @author Artem Bilan * - * @since 4.0 + * @since 5.5.5 * */ @SpringJUnitConfig @@ -164,8 +164,7 @@ public IntegrationFlow mqttInFlow() { messageProducer.setMessageConverter(mqttStringToBytesConverter()); messageProducer.setManualAcks(true); - return IntegrationFlows.from( - messageProducer) + return IntegrationFlows.from(messageProducer) .channel(c -> c.queue("fromMqttChannel")) .get(); } diff --git a/src/reference/asciidoc/mqtt.adoc b/src/reference/asciidoc/mqtt.adoc index e250a9ef8f1..77c5749b080 100644 --- a/src/reference/asciidoc/mqtt.adoc +++ b/src/reference/asciidoc/mqtt.adoc @@ -494,4 +494,7 @@ public IntegrationFlow mqttInFlow() { IMPORTANT: The `org.springframework.integration.mqtt.support.MqttMessageConverter` cannot be used with the `Mqttv5PahoMessageDrivenChannelAdapter` since its contract is aimed only for the MQTT v3 protocol. -See more information in the `Mqttv5PahoMessageDrivenChannelAdapter` javadocs and its superclass. \ No newline at end of file +See more information in the `Mqttv5PahoMessageDrivenChannelAdapter` javadocs and its superclass. + +IMPORTANT: It is recommended to have the `MqttConnectionOptions#setAutomaticReconnect(boolean)` set to true to let an internal `IMqttAsyncClient` instance to handle reconnects. +Otherwise, only the manual restart of these channel adapters can handle reconnects, e.g. via `MqttConnectionFailedEvent` handling on disconnection. \ No newline at end of file