From e48e145663ac699070adcd07f04f4bd70443aaad Mon Sep 17 00:00:00 2001 From: abilan Date: Tue, 6 Dec 2022 16:03:16 -0500 Subject: [PATCH 1/2] GH-3959: MqttConFailedEvent for normal disconnect Fixes https://github.com/spring-projects/spring-integration/issues/3959 The `MqttCallback.disconnected(MqttDisconnectResponse)` in Paho v5 client is also called when server initiates a disconnection. In this case that `MqttDisconnectResponse` does not have a `cause` value * Modify `MqttConnectionFailedEvent` to make a `cause` property optional * Fix `Mqttv5PahoMessageDrivenChannelAdapter` & `Mqttv5PahoMessageHandler` to not check for `cause`, but emit an `MqttConnectionFailedEvent` for any `disconnected()` calls Unfortunately current Paho v3 client does not call `connectionLost()` for normal disconnections and we cannot react for this callback with an `MqttConnectionFailedEvent` **Cherry-pick to `5.5.x`** --- .../mqtt/event/MqttConnectionFailedEvent.java | 15 +++++++++++++-- .../mqtt/event/MqttIntegrationEvent.java | 9 ++++++--- .../Mqttv5PahoMessageDrivenChannelAdapter.java | 2 +- .../mqtt/outbound/Mqttv5PahoMessageHandler.java | 2 +- src/reference/asciidoc/mqtt.adoc | 1 + 5 files changed, 22 insertions(+), 7 deletions(-) diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttConnectionFailedEvent.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttConnectionFailedEvent.java index 0accc30daa6..c337e312e9b 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttConnectionFailedEvent.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttConnectionFailedEvent.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 the original author or authors. + * Copyright 2015-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,15 +16,26 @@ package org.springframework.integration.mqtt.event; +import org.springframework.lang.Nullable; + /** + * The {@link MqttIntegrationEvent} to notify about lost connection to the server. + * When normal disconnection is happened (initiated by the server), the {@code cause} is null. + * * @author Gary Russell + * @author Artem Bilan + * * @since 4.2.2 * */ @SuppressWarnings("serial") public class MqttConnectionFailedEvent extends MqttIntegrationEvent { - public MqttConnectionFailedEvent(Object source, Throwable cause) { + public MqttConnectionFailedEvent(Object source) { + super(source); + } + + public MqttConnectionFailedEvent(Object source, @Nullable Throwable cause) { super(source, cause); } diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttIntegrationEvent.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttIntegrationEvent.java index b8cb024774e..aeb9f58507d 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttIntegrationEvent.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttIntegrationEvent.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2020 the original author or authors. + * Copyright 2014-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,12 +17,15 @@ package org.springframework.integration.mqtt.event; import org.springframework.integration.events.IntegrationEvent; +import org.springframework.lang.Nullable; /** - * Base class for Mqtt Events. For {@link #getSourceAsType()}, you should use a sub type + * Base class for Mqtt Events. For {@link #getSourceAsType()}, you should use a subtype * of {@link org.springframework.integration.mqtt.core.MqttComponent} for the receiving * variable. + * * @author Gary Russell + * @author Artem Bilan * * @since 4.1 */ @@ -33,7 +36,7 @@ public MqttIntegrationEvent(Object source) { super(source); } - public MqttIntegrationEvent(Object source, Throwable cause) { + public MqttIntegrationEvent(Object source, @Nullable Throwable cause) { super(source, cause); } diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java index 3d5d205b8c7..84e2318ec84 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java @@ -323,7 +323,7 @@ public void messageArrived(String topic, MqttMessage mqttMessage) { public void disconnected(MqttDisconnectResponse disconnectResponse) { MqttException cause = disconnectResponse.getException(); ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); - if (cause != null && applicationEventPublisher != null) { + if (applicationEventPublisher != null) { applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause)); } } diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java index 2680676fffa..6f62a04764c 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java @@ -299,7 +299,7 @@ public void deliveryComplete(IMqttToken token) { public void disconnected(MqttDisconnectResponse disconnectResponse) { MqttException cause = disconnectResponse.getException(); ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); - if (cause != null && applicationEventPublisher != null) { + if (applicationEventPublisher != null) { applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause)); } } diff --git a/src/reference/asciidoc/mqtt.adoc b/src/reference/asciidoc/mqtt.adoc index 6d8ff51fb2e..b2eef3a9f5c 100644 --- a/src/reference/asciidoc/mqtt.adoc +++ b/src/reference/asciidoc/mqtt.adoc @@ -403,6 +403,7 @@ public class MqttJavaApplication { Certain application events are published by the adapters. * `MqttConnectionFailedEvent` - published by both adapters if we fail to connect or a connection is subsequently lost. +For Paho client for MQTT v5 this event is also emitted when server performs a normal disconnection, in which case the `cause` of lost connection is `null`. * `MqttMessageSentEvent` - published by the outbound adapter when a message has been sent, if running in asynchronous mode. * `MqttMessageDeliveredEvent` - published by the outbound adapter when the client indicates that a message has been delivered, if running in asynchronous mode. * `MqttSubscribedEvent` - published by the inbound adapter after subscribing to the topics. From deda22c533ec974e31992956564c1337d4c8e8c2 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 6 Dec 2022 16:10:44 -0500 Subject: [PATCH 2/2] Fix language in doc Co-authored-by: Gary Russell --- src/reference/asciidoc/mqtt.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/reference/asciidoc/mqtt.adoc b/src/reference/asciidoc/mqtt.adoc index b2eef3a9f5c..2efc2ab7df3 100644 --- a/src/reference/asciidoc/mqtt.adoc +++ b/src/reference/asciidoc/mqtt.adoc @@ -403,7 +403,7 @@ public class MqttJavaApplication { Certain application events are published by the adapters. * `MqttConnectionFailedEvent` - published by both adapters if we fail to connect or a connection is subsequently lost. -For Paho client for MQTT v5 this event is also emitted when server performs a normal disconnection, in which case the `cause` of lost connection is `null`. +For the MQTT v5 Paho client, this event is also emitted when the server performs a normal disconnection, in which case the `cause` of the lost connection is `null`. * `MqttMessageSentEvent` - published by the outbound adapter when a message has been sent, if running in asynchronous mode. * `MqttMessageDeliveredEvent` - published by the outbound adapter when the client indicates that a message has been delivered, if running in asynchronous mode. * `MqttSubscribedEvent` - published by the inbound adapter after subscribing to the topics.