From d53097d3de2266d17d4fa1c430666207e7867b99 Mon Sep 17 00:00:00 2001 From: abilan Date: Tue, 14 Feb 2023 16:17:47 -0500 Subject: [PATCH] GH-4014: MQTT ClientManager: completion timeouts Fixes https://github.com/spring-projects/spring-integration/issues/4014 The `ClientManager` implementations uses by mistake a `connectionTimeout` for operations with completion wait * Introduce `completionTimeout` and `disconnectCompletionTimeout` for `ClientManager` abstraction to realign the logic with existing channel adapters and Paho Client by itself. * Deprecate `DEFAULT_COMPLETION_TIMEOUT` and `DISCONNECT_COMPLETION_TIMEOUT` constants in the `AbstractMqttMessageDrivenChannelAdapter` in favor of respective replacement in the `ClientManager` * Pull `disconnectCompletionTimeout` property from the `MqttPahoMessageDrivenChannelAdapter` to its superclass * Use new `disconnectCompletionTimeout` in the `Mqttv5PahoMessageDrivenChannelAdapter` for similar `disconnectForcibly()` call * Fix Lifecycle race condition when `ClientManager` is started by the outbound channel adapter (`Integer.MIN_VALUE` phase and auto-startup - see `DefaultLifecycleProcessor.doStart()` and logic around `dependencies`) which is much earlier than `MessageProducerSupport` (`Integer.MAX_VALUE / 2` phase) and there a `connectComplete()` callback might be called before the `MessageProducerSupport.start()`. For that purpose check for an `isRunning()` in the `connectComplete()` before subscribing and set `readyToSubscribeOnStart` flag to `subscribe()` in a `doStart()` of this `MqttMessageDrivenChannelAdapter` * Remove redundant `MqttPahoMessageDrivenChannelAdapter.cleanSession` property in favor of `this.clientFactory.getConnectionOptions().isCleanSession()` call GH-8550: MQTT: Always re-subscribe on re-connect Fixes https://github.com/spring-projects/spring-integration/issues/8550 Turns out the Paho MQTT client does not re-subscribe when connection re-established on automatic reconnection * Fix `AbstractMqttMessageDrivenChannelAdapter` to always subscribe to their topics in the `connectComplete()` independently of the `reconnect` status * Verify behavior with `MOSQUITTO_CONTAINER` image restart in Docker --- .../mqtt/core/AbstractMqttClientManager.java | 36 +++++- .../integration/mqtt/core/ClientManager.java | 12 +- .../mqtt/core/Mqttv3ClientManager.java | 15 ++- .../mqtt/core/Mqttv5ClientManager.java | 7 +- ...stractMqttMessageDrivenChannelAdapter.java | 33 ++++- .../MqttPahoMessageDrivenChannelAdapter.java | 79 +++++------- ...Mqttv5PahoMessageDrivenChannelAdapter.java | 66 ++++++---- ...subscribeAfterAutomaticReconnectTests.java | 121 ++++++++++++++++++ 8 files changed, 280 insertions(+), 89 deletions(-) create mode 100644 spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ResubscribeAfterAutomaticReconnectTests.java diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java index d57ceeb182d..0de50381bde 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,6 +53,10 @@ public abstract class AbstractMqttClientManager implements ClientManager getCallbacks() { return this.connectCallbacks; } + /** + * Set the completion timeout for operations. + * Default {@value #DEFAULT_COMPLETION_TIMEOUT} milliseconds. + * @param completionTimeout The timeout. + * @since 6.0.3 + */ + public void setCompletionTimeout(long completionTimeout) { + this.completionTimeout = completionTimeout; + } + + protected long getCompletionTimeout() { + return this.completionTimeout; + } + + /** + * Set the completion timeout when disconnecting. + * Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds. + * @param completionTimeout The timeout. + * @since 6.0.3 + */ + public synchronized void setDisconnectCompletionTimeout(long completionTimeout) { + this.disconnectCompletionTimeout = completionTimeout; + } + + protected long getDisconnectCompletionTimeout() { + return this.disconnectCompletionTimeout; + } + @Override public boolean isManualAcks() { return this.manualAcks; @@ -123,7 +155,7 @@ public String getBeanName() { } /** - * The phase of component autostart in {@link SmartLifecycle}. + * The phase of component auto-start in {@link SmartLifecycle}. * If the custom one is required, note that for the correct behavior it should be less than phase of * {@link AbstractMqttMessageDrivenChannelAdapter} implementations. * The default phase is {@link #DEFAULT_MANAGER_PHASE}. diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java index 6febcd19e55..00692df436f 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,16 @@ */ public interface ClientManager extends SmartLifecycle, MqttComponent { + /** + * The default completion timeout in milliseconds. + */ + long DEFAULT_COMPLETION_TIMEOUT = 30_000L; + + /** + * The default disconnect completion timeout in milliseconds. + */ + long DISCONNECT_COMPLETION_TIMEOUT = 5_000L; + /** * Return the managed client. * @return the managed client. diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java index dd073eedb2d..b88416aa803 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,8 @@ * {@link MqttConnectionFailedEvent} and reconnect the MQTT client manually. * * @author Artem Vozhdayenko + * @author Artem Bilan + * * @since 6.0 */ public class Mqttv3ClientManager @@ -97,10 +99,9 @@ public synchronized void start() { } setClient(client); try { - client.connect(this.connectionOptions) - .waitForCompletion(this.connectionOptions.getConnectionTimeout()); + client.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout()); } - catch (MqttException e) { + catch (MqttException ex) { // See GH-3822 if (this.connectionOptions.isAutomaticReconnect()) { try { @@ -113,10 +114,10 @@ public synchronized void start() { else { var applicationEventPublisher = getApplicationEventPublisher(); if (applicationEventPublisher != null) { - applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, e)); + applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex)); } else { - logger.error("Could not start client manager, client_id=" + getClientId(), e); + logger.error("Could not start client manager, client_id=" + getClientId(), ex); } } } @@ -138,7 +139,7 @@ public synchronized void stop() { return; } try { - client.disconnectForcibly(this.connectionOptions.getConnectionTimeout()); + client.disconnectForcibly(getDisconnectCompletionTimeout()); } catch (MqttException e) { logger.error("Could not disconnect from the client", e); diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java index d709431d166..72ec62472d9 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -101,8 +101,7 @@ public synchronized void start() { } setClient(client); try { - client.connect(this.connectionOptions) - .waitForCompletion(this.connectionOptions.getConnectionTimeout()); + client.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout()); } catch (MqttException ex) { if (this.connectionOptions.isAutomaticReconnect()) { @@ -142,7 +141,7 @@ public synchronized void stop() { } try { - client.disconnectForcibly(this.connectionOptions.getConnectionTimeout()); + client.disconnectForcibly(getDisconnectCompletionTimeout()); } catch (MqttException e) { logger.error("Could not disconnect from the client", e); diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java index 24dd4e09327..99836f95dc1 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,7 +61,14 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter extends Mess /** * The default completion timeout in milliseconds. */ - public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L; + @Deprecated(since = "6.0.3", forRemoval = true) + public static final long DEFAULT_COMPLETION_TIMEOUT = ClientManager.DEFAULT_COMPLETION_TIMEOUT; + + /** + * The default disconnect completion timeout in milliseconds. + */ + @Deprecated(since = "6.0.3", forRemoval = true) + public static final long DISCONNECT_COMPLETION_TIMEOUT = ClientManager.DISCONNECT_COMPLETION_TIMEOUT; protected final Lock topicLock = new ReentrantLock(); // NOSONAR @@ -73,7 +80,9 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter extends Mess private final ClientManager clientManager; - private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT; + private long completionTimeout = ClientManager.DEFAULT_COMPLETION_TIMEOUT; + + private long disconnectCompletionTimeout = ClientManager.DISCONNECT_COMPLETION_TIMEOUT; private boolean manualAcks; @@ -179,6 +188,20 @@ public String[] getTopic() { } } + /** + * Set the completion timeout when disconnecting. + * Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds. + * @param completionTimeout The timeout. + * @since 5.1.10 + */ + public synchronized void setDisconnectCompletionTimeout(long completionTimeout) { + this.disconnectCompletionTimeout = completionTimeout; + } + + protected long getDisconnectCompletionTimeout() { + return this.disconnectCompletionTimeout; + } + @Override protected void onInit() { super.onInit(); @@ -223,8 +246,8 @@ protected boolean isManualAcks() { } /** - * Set the completion timeout for operations. Not settable using the namespace. - * Default {@value #DEFAULT_COMPLETION_TIMEOUT} milliseconds. + * Set the completion timeout for operations. + * Default {@value ClientManager#DEFAULT_COMPLETION_TIMEOUT} milliseconds. * @param completionTimeout The timeout. * @since 4.1 */ diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java index cd4f54cc014..326dce08d99 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java @@ -64,22 +64,15 @@ public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDrivenChannelAdapter implements MqttCallbackExtended, MqttPahoComponent { - /** - * The default disconnect completion timeout in milliseconds. - */ - public static final long DISCONNECT_COMPLETION_TIMEOUT = 5_000L; - private final MqttPahoClientFactory clientFactory; - private long disconnectCompletionTimeout = DISCONNECT_COMPLETION_TIMEOUT; - private volatile IMqttAsyncClient client; - private volatile boolean cleanSession; - @SuppressWarnings("deprecation") private volatile org.springframework.integration.mqtt.core.ConsumerStopAction consumerStopAction; + private volatile boolean readyToSubscribeOnStart; + /** * Use this constructor when you don't need additional {@link MqttConnectOptions}. * @param url The URL. @@ -138,16 +131,6 @@ public MqttPahoMessageDrivenChannelAdapter(ClientManager * The {@link MqttProperties} are mapped via the provided {@link HeaderMapper}; * meanwhile the regular {@link MqttMessage} properties are always mapped into headers. - * + *

* It is recommended to have the {@link MqttConnectionOptions#setAutomaticReconnect(boolean)} * set to true to let an internal {@link IMqttAsyncClient} instance to handle reconnects. * Otherwise, only the manual restart of this component can handle reconnects, e.g. via * {@link MqttConnectionFailedEvent} handling on disconnection. - * + *

* See {@link #setPayloadType} for more information about type conversion. * * @author Artem Bilan @@ -94,6 +94,8 @@ public class Mqttv5PahoMessageDrivenChannelAdapter private HeaderMapper headerMapper = new MqttHeaderMapper(); + private volatile boolean readyToSubscribeOnStart; + public Mqttv5PahoMessageDrivenChannelAdapter(String url, String clientId, String... topic) { super(url, clientId, topic); Assert.hasText(url, "'url' cannot be null or empty"); @@ -184,28 +186,35 @@ protected void onInit() { @Override protected void doStart() { - var clientManager = getClientManager(); - if (clientManager == null) { - try { - this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout()); + try { + connect(); + if (this.readyToSubscribeOnStart) { + subscribe(); } - catch (MqttException ex) { - if (getConnectionInfo().isAutomaticReconnect()) { - try { - this.mqttClient.reconnect(); - } - catch (MqttException re) { - logger.error(re, "MQTT client failed to connect. Never happens."); - } + } + catch (MqttException ex) { + if (getConnectionInfo().isAutomaticReconnect()) { + try { + this.mqttClient.reconnect(); } - else { - ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); - if (applicationEventPublisher != null) { - applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex)); - } - logger.error(ex, "MQTT client failed to connect."); + catch (MqttException re) { + logger.error(re, "MQTT client failed to connect. Never happens."); } } + else { + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex)); + } + logger.error(ex, "MQTT client failed to connect."); + } + } + } + + private synchronized void connect() throws MqttException { + var clientManager = getClientManager(); + if (clientManager == null) { + this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout()); } else { this.mqttClient = clientManager.getClient(); @@ -214,6 +223,7 @@ protected void doStart() { @Override protected void doStop() { + this.readyToSubscribeOnStart = false; this.topicLock.lock(); String[] topics = getTopic(); try { @@ -223,7 +233,7 @@ protected void doStop() { } if (getClientManager() == null) { - this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout()); + this.mqttClient.disconnectForcibly(getDisconnectCompletionTimeout()); } } } @@ -348,9 +358,15 @@ public void connectComplete(boolean isReconnect) { @Override public void connectComplete(boolean reconnect, String serverURI) { - if (reconnect) { - return; + if (isRunning()) { + subscribe(); + } + else { + this.readyToSubscribeOnStart = true; } + } + + private void subscribe() { var clientManager = getClientManager(); if (clientManager != null && this.mqttClient == null) { this.mqttClient = clientManager.getClient(); diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ResubscribeAfterAutomaticReconnectTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ResubscribeAfterAutomaticReconnectTests.java new file mode 100644 index 00000000000..97a5262b8d9 --- /dev/null +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ResubscribeAfterAutomaticReconnectTests.java @@ -0,0 +1,121 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mqtt; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.EventListener; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.mqtt.event.MqttSubscribedEvent; +import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Artem Bilan + * + * @since 6.0.3 + */ +@SpringJUnitConfig +@DirtiesContext +public class ResubscribeAfterAutomaticReconnectTests implements MosquittoContainerTest { + + @Autowired + @Qualifier("mqttOutFlow.input") + private MessageChannel mqttOutFlowInput; + + @Autowired + private PollableChannel fromMqttChannel; + + @Autowired + private MqttConnectionOptions connectionOptions; + + @Autowired + Config config; + + @Test + void messageReceivedAfterResubscriptionOnLostConnection() throws InterruptedException { + GenericMessage testMessage = new GenericMessage<>("test"); + this.mqttOutFlowInput.send(testMessage); + assertThat(this.fromMqttChannel.receive(10_000)).isNotNull(); + + MOSQUITTO_CONTAINER.stop(); + MOSQUITTO_CONTAINER.start(); + connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()}); + + assertThat(this.config.subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + this.mqttOutFlowInput.send(testMessage); + assertThat(this.fromMqttChannel.receive(10_000)).isNotNull(); + } + + @Configuration + @EnableIntegration + public static class Config { + + CountDownLatch subscribeLatch = new CountDownLatch(2); + + @Bean + public MqttConnectionOptions mqttConnectOptions() { + return new MqttConnectionOptionsBuilder() + .serverURI(MosquittoContainerTest.mqttUrl()) + .automaticReconnect(true) + .build(); + } + + @Bean + public IntegrationFlow mqttOutFlow(MqttConnectionOptions mqttConnectOptions) { + Mqttv5PahoMessageHandler messageHandler = + new Mqttv5PahoMessageHandler(mqttConnectOptions, "mqttv5SIout"); + messageHandler.setDefaultTopic("siTest"); + return f -> f.handle(messageHandler); + } + + @Bean + public IntegrationFlow mqttInFlow(MqttConnectionOptions mqttConnectOptions) { + Mqttv5PahoMessageDrivenChannelAdapter messageProducer = + new Mqttv5PahoMessageDrivenChannelAdapter(mqttConnectOptions, "mqttInClient", "siTest"); + + return IntegrationFlow.from(messageProducer) + .channel(c -> c.queue("fromMqttChannel")) + .get(); + } + + @EventListener(MqttSubscribedEvent.class) + public void mqttEvents() { + this.subscribeLatch.countDown(); + } + + } + +}