From 7bdb2a1ef680651a1559cc2f7edcf882ba421f27 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 4 Oct 2021 18:01:43 -0400 Subject: [PATCH 1/4] GH-3432: Add MQTT v5 channel adapters Fixes https://github.com/spring-projects/spring-integration/issues/3432 * Add `optional` dependency for `org.eclipse.paho:org.eclipse.paho.mqttv5.client` * Add `MqttProtocolErrorEvent` and emit it from the `mqttErrorOccurred()` callback of the MQTT v5 client * Add `MqttHeaderMapper` since MQTT v5 has introduced user properties pair to transfer over the protocol * Add `Mqttv5PahoMessageHandler` as one more extension of the `AbstractMqttMessageHandler` * Add more convenient `MqttHeaders` constants for easier headers mapping configuration * Ensure via `Mqttv5BackToBackTests` that MQTT v5 is supported by the provided components * Change `pr-build-workflow.yml` to use `eclipse-mosquitto` container for testing all the MQTT interactions * Change `cyrilix/rabbitmq-mqtt` service to the `rabbitmq:management` since RabbitMQ does not support MQTT v5 --- .github/workflows/pr-build-workflow.yml | 6 +- build.gradle | 1 + .../mqtt/event/MqttProtocolErrorEvent.java | 37 ++ ...stractMqttMessageDrivenChannelAdapter.java | 67 +++- .../MqttPahoMessageDrivenChannelAdapter.java | 86 ++--- ...Mqttv5PahoMessageDrivenChannelAdapter.java | 328 ++++++++++++++++++ .../outbound/AbstractMqttMessageHandler.java | 91 ++++- .../mqtt/outbound/MqttPahoMessageHandler.java | 89 ++--- .../outbound/Mqttv5PahoMessageHandler.java | 305 ++++++++++++++++ .../mqtt/support/MqttHeaderMapper.java | 208 +++++++++++ .../integration/mqtt/support/MqttHeaders.java | 10 +- .../mqtt/Mqttv5BackToBackTests.java | 172 +++++++++ src/reference/asciidoc/mqtt.adoc | 5 + src/reference/asciidoc/whats-new.adoc | 6 + 14 files changed, 1272 insertions(+), 139 deletions(-) create mode 100644 spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttProtocolErrorEvent.java create mode 100644 spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java create mode 100644 spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java create mode 100644 spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/support/MqttHeaderMapper.java create mode 100644 spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java diff --git a/.github/workflows/pr-build-workflow.yml b/.github/workflows/pr-build-workflow.yml index 6ca162f7abe..769312b3aba 100644 --- a/.github/workflows/pr-build-workflow.yml +++ b/.github/workflows/pr-build-workflow.yml @@ -12,10 +12,14 @@ jobs: services: rabbitmq: - image: cyrilix/rabbitmq-mqtt + image: rabbitmq:management ports: - 5672:5672 - 15672:15672 + + mosquitto: + image: eclipse-mosquitto + ports: - 1883:1883 mongodb: diff --git a/build.gradle b/build.gradle index 79d69fb7140..ac7cf0c6a5d 100644 --- a/build.gradle +++ b/build.gradle @@ -680,6 +680,7 @@ project('spring-integration-mqtt') { dependencies { api project(':spring-integration-core') api "org.eclipse.paho:org.eclipse.paho.client.mqttv3:$pahoMqttClientVersion" + optionalApi "org.eclipse.paho:org.eclipse.paho.mqttv5.client:$pahoMqttClientVersion" testImplementation project(':spring-integration-jmx') testImplementation 'com.fasterxml.jackson.core:jackson-databind' diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttProtocolErrorEvent.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttProtocolErrorEvent.java new file mode 100644 index 00000000000..c7434c6feb5 --- /dev/null +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttProtocolErrorEvent.java @@ -0,0 +1,37 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mqtt.event; + +import org.eclipse.paho.mqttv5.common.MqttException; + +/** + * The even representing an MQTT error occured during client interaction. + * + * @author Artem Bilan + * + * @since 5.5.5 + * + * @see org.eclipse.paho.mqttv5.client.MqttCallback#mqttErrorOccurred(MqttException) + */ +@SuppressWarnings("serial") +public class MqttProtocolErrorEvent extends MqttIntegrationEvent { + + public MqttProtocolErrorEvent(Object source, MqttException exception) { + super(source, exception); + } + +} diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java index 487b29bfb1c..c8f072980a9 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java @@ -21,9 +21,10 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.core.log.LogMessage; import org.springframework.integration.endpoint.MessageProducerSupport; -import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.support.MqttMessageConverter; import org.springframework.integration.support.management.IntegrationManagedResource; import org.springframework.jmx.export.annotation.ManagedAttribute; @@ -45,7 +46,13 @@ */ @ManagedResource @IntegrationManagedResource -public abstract class AbstractMqttMessageDrivenChannelAdapter extends MessageProducerSupport { +public abstract class AbstractMqttMessageDrivenChannelAdapter extends MessageProducerSupport + implements ApplicationEventPublisherAware { + + /** + * The default completion timeout in milliseconds. + */ + public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L; private final String url; @@ -53,7 +60,13 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter extends MessagePro private final Set topics; - private volatile MqttMessageConverter converter; + private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT; + + private boolean manualAcks; + + private ApplicationEventPublisher applicationEventPublisher; + + private MqttMessageConverter converter; protected final Lock topicLock = new ReentrantLock(); // NOSONAR @@ -147,6 +160,42 @@ public String getComponentType() { return "mqtt:inbound-channel-adapter"; } + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.applicationEventPublisher = applicationEventPublisher; // NOSONAR (inconsistent synchronization) + } + + protected ApplicationEventPublisher getApplicationEventPublisher() { + return this.applicationEventPublisher; + } + + /** + * Set the acknowledgment mode to manual. + * @param manualAcks true for manual acks. + * @since 5.3 + */ + public void setManualAcks(boolean manualAcks) { + this.manualAcks = manualAcks; + } + + protected boolean isManualAcks() { + return this.manualAcks; + } + + /** + * Set the completion timeout for operations. Not settable using the namespace. + * Default {@value #DEFAULT_COMPLETION_TIMEOUT} milliseconds. + * @param completionTimeout The timeout. + * @since 4.1 + */ + public synchronized void setCompletionTimeout(long completionTimeout) { + this.completionTimeout = completionTimeout; + } + + protected long getCompletionTimeout() { + return this.completionTimeout; + } + /** * Add a topic to the subscribed list. * @param topic The topic. @@ -239,18 +288,6 @@ public void removeTopic(String... topic) { } } - @Override - protected void onInit() { - super.onInit(); - if (this.converter == null) { - DefaultPahoMessageConverter pahoMessageConverter = new DefaultPahoMessageConverter(); - pahoMessageConverter.setBeanFactory(getBeanFactory()); - this.converter = pahoMessageConverter; - - } - } - - /** * @since 4.1 */ diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java index d7b99d0f317..6d6856f84c3 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java @@ -29,7 +29,6 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.context.ApplicationEventPublisher; -import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.acks.SimpleAcknowledgment; import org.springframework.integration.mqtt.core.ConsumerStopAction; @@ -38,6 +37,7 @@ import org.springframework.integration.mqtt.core.MqttPahoComponent; import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent; import org.springframework.integration.mqtt.event.MqttSubscribedEvent; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.support.MqttUtils; import org.springframework.integration.support.AbstractIntegrationMessageBuilder; import org.springframework.messaging.Message; @@ -60,12 +60,7 @@ * */ public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDrivenChannelAdapter - implements MqttCallback, MqttPahoComponent, ApplicationEventPublisherAware { - - /** - * The default completion timeout in milliseconds. - */ - public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L; + implements MqttCallback, MqttPahoComponent { /** * The default disconnect completion timeout in milliseconds. @@ -78,14 +73,8 @@ public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDriv private int recoveryInterval = DEFAULT_RECOVERY_INTERVAL; - private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT; - private long disconnectCompletionTimeout = DISCONNECT_COMPLETION_TIMEOUT; - private boolean manualAcks; - - private ApplicationEventPublisher applicationEventPublisher; - private volatile IMqttClient client; private volatile ScheduledFuture reconnectFuture; @@ -139,16 +128,6 @@ public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, String.. this(url, clientId, new DefaultMqttPahoClientFactory(), topic); } - /** - * Set the completion timeout for operations. Not settable using the namespace. - * Default {@value #DEFAULT_COMPLETION_TIMEOUT} milliseconds. - * @param completionTimeout The timeout. - * @since 4.1 - */ - public synchronized void setCompletionTimeout(long completionTimeout) { - this.completionTimeout = completionTimeout; - } - /** * Set the completion timeout when disconnecting. Not settable using the namespace. * Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds. @@ -169,23 +148,6 @@ public synchronized void setRecoveryInterval(int recoveryInterval) { this.recoveryInterval = recoveryInterval; } - /** - * Set the acknowledgment mode to manual. - * @param manualAcks true for manual acks. - * @since 5.3 - */ - public void setManualAcks(boolean manualAcks) { - this.manualAcks = manualAcks; - } - - /** - * @since 4.2.2 - */ - @Override - public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { - this.applicationEventPublisher = applicationEventPublisher; // NOSONAR (inconsistent synchronization) - } - @Override public MqttConnectOptions getConnectionInfo() { MqttConnectOptions options = this.clientFactory.getConnectionOptions(); @@ -199,6 +161,17 @@ public MqttConnectOptions getConnectionInfo() { return options; } + @Override + protected void onInit() { + super.onInit(); + if (getConverter() == null) { + DefaultPahoMessageConverter pahoMessageConverter = new DefaultPahoMessageConverter(); + pahoMessageConverter.setBeanFactory(getBeanFactory()); + setConverter(pahoMessageConverter); + + } + } + @Override protected void doStart() { Assert.state(getTaskScheduler() != null, "A 'taskScheduler' is required"); @@ -293,22 +266,26 @@ private synchronized void connectAndSubscribe() throws MqttException { this.client = this.clientFactory.getClientInstance(getUrl(), getClientId()); this.client.setCallback(this); if (this.client instanceof MqttClient) { - ((MqttClient) this.client).setTimeToWait(this.completionTimeout); + ((MqttClient) this.client).setTimeToWait(getCompletionTimeout()); } this.topicLock.lock(); String[] topics = getTopic(); + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); try { this.client.connect(connectionOptions); - this.client.setManualAcks(this.manualAcks); - int[] requestedQos = getQos(); - int[] grantedQos = Arrays.copyOf(requestedQos, requestedQos.length); - this.client.subscribe(topics, grantedQos); - warnInvalidQosForSubscription(topics, requestedQos, grantedQos); + this.client.setManualAcks(isManualAcks()); + if (topics.length > 0) { + int[] requestedQos = getQos(); + int[] grantedQos = Arrays.copyOf(requestedQos, requestedQos.length); + this.client.subscribe(topics, grantedQos); + warnInvalidQosForSubscription(topics, requestedQos, grantedQos); + } } catch (MqttException ex) { - if (this.applicationEventPublisher != null) { - this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex)); + + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex)); } logger.error(ex, () -> "Error connecting or subscribing to " + Arrays.toString(topics)); if (this.client != null) { // Could be reset during event handling before @@ -331,8 +308,8 @@ private synchronized void connectAndSubscribe() throws MqttException { this.connected = true; String message = "Connected and subscribed to " + Arrays.toString(topics); logger.debug(message); - if (this.applicationEventPublisher != null) { - this.applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message)); + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message)); } } } @@ -397,8 +374,9 @@ public synchronized void connectionLost(Throwable cause) { } this.client = null; scheduleReconnect(); - if (this.applicationEventPublisher != null) { - this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause)); + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause)); } } } @@ -407,7 +385,7 @@ public synchronized void connectionLost(Throwable cause) { public void messageArrived(String topic, MqttMessage mqttMessage) { AbstractIntegrationMessageBuilder builder = toMessageBuilder(topic, mqttMessage); if (builder != null) { - if (this.manualAcks) { + if (isManualAcks()) { builder.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, new AcknowledgmentImpl(mqttMessage.getId(), mqttMessage.getQos(), this.client)); } @@ -458,7 +436,7 @@ public void deliveryComplete(IMqttDeliveryToken token) { } /** - * Used to complete message arrival when {@link #manualAcks} is true. + * Used to complete message arrival when {@link #isManualAcks()} is true. * * @since 5.3 */ diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java new file mode 100644 index 00000000000..16f85fee7d0 --- /dev/null +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java @@ -0,0 +1,328 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mqtt.inbound; + +import java.util.Arrays; +import java.util.Map; + +import org.eclipse.paho.mqttv5.client.IMqttAsyncClient; +import org.eclipse.paho.mqttv5.client.IMqttToken; +import org.eclipse.paho.mqttv5.client.MqttAsyncClient; +import org.eclipse.paho.mqttv5.client.MqttCallback; +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; + +import org.springframework.beans.factory.BeanCreationException; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.integration.acks.SimpleAcknowledgment; +import org.springframework.integration.context.IntegrationContextUtils; +import org.springframework.integration.mapping.HeaderMapper; +import org.springframework.integration.mqtt.core.MqttComponent; +import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent; +import org.springframework.integration.mqtt.event.MqttProtocolErrorEvent; +import org.springframework.integration.mqtt.event.MqttSubscribedEvent; +import org.springframework.integration.mqtt.support.MqttHeaderMapper; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.integration.mqtt.support.MqttMessageConverter; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.converter.SmartMessageConverter; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.util.Assert; + +/** + * The {@link AbstractMqttMessageDrivenChannelAdapter} implementation for MQTT v5. + * + * The {@link MqttProperties} are mapped via provided {@link HeaderMapper}; + * meanwhile the regular {@link MqttMessage} properties are always mapped into headers. + * + * It is recommended to have the {@link MqttConnectionOptions#setAutomaticReconnect(boolean)} + * set to true to let an internal {@link IMqttAsyncClient} instance to handle reconnects. + * Otherwise, the manual restart of this component can only handle reconnects, e.g. via + * {@link MqttConnectionFailedEvent} handling on disconnection. + * + * See {@link #setPayloadType} for more information about type conversion. + * + * @author Artem Bilan + * + * @since 5.5.5 + * + */ +public class Mqttv5PahoMessageDrivenChannelAdapter extends AbstractMqttMessageDrivenChannelAdapter + implements MqttCallback, MqttComponent { + + private final MqttConnectionOptions connectionOptions; + + private IMqttAsyncClient mqttClient; + + private SmartMessageConverter messageConverter; + + private Class payloadType = byte[].class; + + private HeaderMapper headerMapper = new MqttHeaderMapper(); + + public Mqttv5PahoMessageDrivenChannelAdapter(String url, String clientId, String... topic) { + super(url, clientId, topic); + this.connectionOptions = new MqttConnectionOptions(); + this.connectionOptions.setServerURIs(new String[]{ url }); + this.connectionOptions.setAutomaticReconnect(true); + } + + public Mqttv5PahoMessageDrivenChannelAdapter(MqttConnectionOptions connectionOptions, String clientId, + String... topic) { + + super(obtainServerUrlFromOptions(connectionOptions), clientId, topic); + this.connectionOptions = connectionOptions; + if (!this.connectionOptions.isAutomaticReconnect()) { + logger.warn("It is recommended to set 'automaticReconnect' MQTT client option. " + + "Otherwise the current channel adapter restart should be used explicitly, " + + "e.g. via handling 'MqttConnectionFailedEvent' on client disconnection."); + } + } + + @Override + public MqttConnectionOptions getConnectionInfo() { + return this.connectionOptions; + } + + @Override + public void setConverter(MqttMessageConverter converter) { + throw new UnsupportedOperationException("Use setMessageConverter(SmartMessageConverter) instead"); + } + + public void setMessageConverter(SmartMessageConverter messageConverter) { + this.messageConverter = messageConverter; + } + + /** + * Set the type of the target message payload to produce after conversion from MQTT message. + * Defaults to {@code byte[].class} - just extract MQTT message payload without conversion. + * Can be set to {@link MqttMessage} class to produce the whole MQTT message as a payload. + * @param payloadType the expected payload type to convert MQTT message to. + */ + public void setPayloadType(Class payloadType) { + Assert.notNull(payloadType, "'payloadType' must not be null."); + this.payloadType = payloadType; + } + + public void setHeaderMapper(HeaderMapper headerMapper) { + Assert.notNull(headerMapper, "'headerMapper' must not be null."); + this.headerMapper = headerMapper; + } + + @Override + protected void onInit() { + super.onInit(); + try { + this.mqttClient = new MqttAsyncClient(getUrl(), getClientId()); + this.mqttClient.setCallback(this); + this.mqttClient.setManualAcks(isManualAcks()); + } + catch (MqttException ex) { + throw new BeanCreationException("Cannot create 'MqttAsyncClient' for: " + getComponentName(), ex); + } + if (this.messageConverter == null) { + setMessageConverter(getBeanFactory() + .getBean(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME, + SmartMessageConverter.class)); + } + } + + @Override + protected void doStart() { + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + this.topicLock.lock(); + String[] topics = getTopic(); + try { + this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout()); + if (topics.length > 0) { + int[] requestedQos = getQos(); + this.mqttClient.subscribe(topics, requestedQos).waitForCompletion(getCompletionTimeout()); + String message = "Connected and subscribed to " + Arrays.toString(topics); + logger.debug(message); + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message)); + } + } + } + catch (MqttException ex) { + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex)); + } + logger.error(ex, () -> "Error connecting or subscribing to " + Arrays.toString(topics)); + } + finally { + this.topicLock.unlock(); + } + } + + @Override + protected void doStop() { + this.topicLock.lock(); + String[] topics = getTopic(); + try { + this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout()); + } + catch (MqttException ex) { + logger.error(ex, () -> "Error unsubscribing from " + Arrays.toString(topics)); + } + finally { + this.topicLock.unlock(); + } + } + + @Override + public void addTopic(String topic, int qos) { + this.topicLock.lock(); + try { + this.mqttClient.subscribe(topic, qos).waitForCompletion(getCompletionTimeout()); + super.addTopic(topic, qos); + } + catch (MqttException ex) { + throw new MessagingException("Failed to subscribe to topic " + topic, ex); + } + finally { + this.topicLock.unlock(); + } + } + + @Override + public void removeTopic(String... topic) { + this.topicLock.lock(); + try { + this.mqttClient.unsubscribe(topic).waitForCompletion(getCompletionTimeout()); + super.removeTopic(topic); + } + catch (MqttException ex) { + throw new MessagingException("Failed to unsubscribe from topic(s) " + Arrays.toString(topic), ex); + } + finally { + this.topicLock.unlock(); + } + } + + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) { + Map headers = this.headerMapper.toHeaders(mqttMessage.getProperties()); + headers.put(MqttHeaders.ID, mqttMessage.getId()); + headers.put(MqttHeaders.RECEIVED_QOS, mqttMessage.getQos()); + headers.put(MqttHeaders.DUPLICATE, mqttMessage.isDuplicate()); + headers.put(MqttHeaders.RECEIVED_RETAINED, mqttMessage.isRetained()); + headers.put(MqttHeaders.RECEIVED_TOPIC, topic); + + Object payload = + MqttMessage.class.isAssignableFrom(this.payloadType) + ? mqttMessage + : mqttMessage.getPayload(); + + Message message; + if (MqttMessage.class.isAssignableFrom(this.payloadType) || byte[].class.isAssignableFrom(this.payloadType)) { + message = new GenericMessage<>(payload, headers); + } + else { + message = this.messageConverter.toMessage(payload, new MessageHeaders(headers), this.payloadType); + } + + try { + sendMessage(message); + } + catch (RuntimeException ex) { + logger.error(ex, () -> "Unhandled exception for " + message); + throw ex; + } + } + + @Override + public void disconnected(MqttDisconnectResponse disconnectResponse) { + MqttException cause = disconnectResponse.getException(); + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (cause != null && applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause)); + } + } + + @Override + public void mqttErrorOccurred(MqttException exception) { + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttProtocolErrorEvent(this, exception)); + } + } + + @Override + public void deliveryComplete(IMqttToken token) { + + } + + @Override + public void connectComplete(boolean reconnect, String serverURI) { + + } + + @Override + public void authPacketArrived(int reasonCode, MqttProperties properties) { + + } + + private static String obtainServerUrlFromOptions(MqttConnectionOptions connectionOptions) { + Assert.notNull(connectionOptions, "'connectionOptions' must not be null"); + String[] serverURIs = connectionOptions.getServerURIs(); + Assert.notEmpty(serverURIs, "'serverURIs' must be provided in the 'MqttConnectionOptions'"); + return serverURIs[0]; + } + + + /** + * Used to complete message arrival when {@link #isManualAcks()} is true. + */ + private static class AcknowledgmentImpl implements SimpleAcknowledgment { + + private final int id; + + private final int qos; + + private final IMqttAsyncClient ackClient; + + /** + * Construct an instance with the provided properties. + * @param id the message id. + * @param qos the message QOS. + * @param client the client. + */ + AcknowledgmentImpl(int id, int qos, IMqttAsyncClient client) { + this.id = id; + this.qos = qos; + this.ackClient = client; + } + + @Override + public void acknowledge() { + try { + this.ackClient.messageArrivedComplete(this.id, this.qos); + } + catch (MqttException ex) { + throw new IllegalStateException(ex); + } + } + + } + +} diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java index 6f638cbdf11..a87077cd6fb 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java @@ -19,11 +19,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.expression.Expression; import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor; import org.springframework.integration.handler.MessageProcessor; -import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.integration.mqtt.support.MqttMessageConverter; import org.springframework.integration.support.management.ManageableLifecycle; @@ -41,7 +42,18 @@ * @since 4.0 * */ -public abstract class AbstractMqttMessageHandler extends AbstractMessageHandler implements ManageableLifecycle { +public abstract class AbstractMqttMessageHandler extends AbstractMessageHandler + implements ManageableLifecycle, ApplicationEventPublisherAware { + + /** + * The default disconnect completion timeout in milliseconds. + */ + public static final long DISCONNECT_COMPLETION_TIMEOUT = 5_000L; + + /** + * The default completion timeout in milliseconds. + */ + public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L; private static final MessageProcessor DEFAULT_TOPIC_PROCESSOR = (message) -> message.getHeaders().get(MqttHeaders.TOPIC, String.class); @@ -52,6 +64,10 @@ public abstract class AbstractMqttMessageHandler extends AbstractMessageHandler private final String clientId; + private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT; + + private long disconnectCompletionTimeout = DISCONNECT_COMPLETION_TIMEOUT; + private String defaultTopic; private MessageProcessor topicProcessor = DEFAULT_TOPIC_PROCESSOR; @@ -66,6 +82,8 @@ public abstract class AbstractMqttMessageHandler extends AbstractMessageHandler private MessageConverter converter; + private ApplicationEventPublisher applicationEventPublisher; + private int clientInstance; public AbstractMqttMessageHandler(@Nullable String url, String clientId) { @@ -74,6 +92,15 @@ public AbstractMqttMessageHandler(@Nullable String url, String clientId) { this.clientId = clientId; } + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.applicationEventPublisher = applicationEventPublisher; + } + + protected ApplicationEventPublisher getApplicationEventPublisher() { + return this.applicationEventPublisher; + } + /** * Set the topic to which the message will be published if the * {@link #setTopicExpression(Expression) topicExpression} evaluates to `null`. @@ -83,6 +110,10 @@ public void setDefaultTopic(String defaultTopic) { this.defaultTopic = defaultTopic; } + protected String getDefaultTopic() { + return this.defaultTopic; + } + /** * Set the topic expression; default "headers['mqtt_topic']". * @param topicExpression the expression. @@ -103,6 +134,10 @@ public void setTopicExpressionString(String topicExpression) { this.topicProcessor = new ExpressionEvaluatingMessageProcessor<>(topicExpression); } + protected MessageProcessor getTopicProcessor() { + return this.topicProcessor; + } + /** * Set the qos for messages if the {@link #setQosExpression(Expression) qosExpression} * evaluates to null. Only applies if a message converter is not provided. @@ -113,6 +148,10 @@ public void setDefaultQos(int defaultQos) { this.defaultQos = defaultQos; } + protected int getDefaultQos() { + return this.defaultQos; + } + /** * Set the qos expression; default "headers['mqtt_qos']". * Only applies if a message converter is not provided. @@ -137,6 +176,10 @@ public void setQosExpressionString(String qosExpression) { this.qosProcessor = new ExpressionEvaluatingMessageProcessor<>(qosExpression); } + protected MessageProcessor getQosProcessor() { + return this.qosProcessor; + } + /** * Set the retained boolean for messages if the * {@link #setRetainedExpression(Expression) retainedExpression} evaluates to null. @@ -148,6 +191,10 @@ public void setDefaultRetained(boolean defaultRetained) { this.defaultRetained = defaultRetained; } + protected boolean getDefaultRetained() { + return this.defaultRetained; + } + /** * Set the retained expression; default "headers['mqtt_retained']". * Only applies if a message converter is not provided. @@ -172,6 +219,10 @@ public void setRetainedExpressionString(String retainedExpression) { this.retainedProcessor = new ExpressionEvaluatingMessageProcessor<>(retainedExpression); } + protected MessageProcessor getRetainedProcessor() { + return this.retainedProcessor; + } + /** * Set the message converter to use; if this is provided, the adapter qos and retained * settings are ignored. @@ -213,6 +264,34 @@ protected void incrementClientInstance() { this.clientInstance++; //NOSONAR - false positive - called from synchronized block } + /** + * Set the completion timeout for async operations. Not settable using the namespace. + * Default {@value #DEFAULT_COMPLETION_TIMEOUT} milliseconds. + * @param completionTimeout The timeout. + * @since 4.1 + */ + public void setCompletionTimeout(long completionTimeout) { + this.completionTimeout = completionTimeout; // NOSONAR (sync) + } + + protected long getCompletionTimeout() { + return this.completionTimeout; + } + + /** + * Set the completion timeout when disconnecting. Not settable using the namespace. + * Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds. + * @param completionTimeout The timeout. + * @since 5.1.10 + */ + public void setDisconnectCompletionTimeout(long completionTimeout) { + this.disconnectCompletionTimeout = completionTimeout; // NOSONAR (sync) + } + + protected long getDisconnectCompletionTimeout() { + return this.disconnectCompletionTimeout; + } + @Override protected void onInit() { super.onInit(); @@ -225,14 +304,6 @@ protected void onInit() { if (this.retainedProcessor instanceof BeanFactoryAware && getBeanFactory() != null) { ((BeanFactoryAware) this.retainedProcessor).setBeanFactory(getBeanFactory()); } - if (this.converter == null) { - DefaultPahoMessageConverter defaultConverter = new DefaultPahoMessageConverter(this.defaultQos, - this.qosProcessor, this.defaultRetained, this.retainedProcessor); - if (getBeanFactory() != null) { - defaultConverter.setBeanFactory(getBeanFactory()); - } - this.converter = defaultConverter; - } } @Override diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java index 8c954257edf..aa00aca70b4 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,18 +24,19 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.context.ApplicationEventPublisher; -import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoComponent; import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent; import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent; import org.springframework.integration.mqtt.event.MqttMessageSentEvent; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.support.MqttMessageConverter; import org.springframework.integration.mqtt.support.MqttUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandlingException; import org.springframework.messaging.MessagingException; +import org.springframework.messaging.converter.MessageConverter; import org.springframework.util.Assert; /** @@ -51,22 +52,7 @@ * @since 4.0 * */ -public class MqttPahoMessageHandler extends AbstractMqttMessageHandler - implements MqttCallback, MqttPahoComponent, ApplicationEventPublisherAware { - - /** - * The default completion timeout in milliseconds. - */ - public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L; - - /** - * The default disconnect completion timeout in milliseconds. - */ - public static final long DISCONNECT_COMPLETION_TIMEOUT = 5_000L; - - private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT; - - private long disconnectCompletionTimeout = DISCONNECT_COMPLETION_TIMEOUT; +public class MqttPahoMessageHandler extends AbstractMqttMessageHandler implements MqttCallback, MqttPahoComponent { private final MqttPahoClientFactory clientFactory; @@ -74,8 +60,6 @@ public class MqttPahoMessageHandler extends AbstractMqttMessageHandler private boolean asyncEvents; - private ApplicationEventPublisher applicationEventPublisher; - private volatile IMqttAsyncClient client; /** @@ -135,31 +119,6 @@ public void setAsyncEvents(boolean asyncEvents) { this.asyncEvents = asyncEvents; } - /** - * Set the completion timeout for async operations. Not settable using the namespace. - * Default {@value #DEFAULT_COMPLETION_TIMEOUT} milliseconds. - * @param completionTimeout The timeout. - * @since 4.1 - */ - public void setCompletionTimeout(long completionTimeout) { - this.completionTimeout = completionTimeout; // NOSONAR (sync) - } - - /** - * Set the completion timeout when disconnecting. Not settable using the namespace. - * Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds. - * @param completionTimeout The timeout. - * @since 5.1.10 - */ - public void setDisconnectCompletionTimeout(long completionTimeout) { - this.disconnectCompletionTimeout = completionTimeout; // NOSONAR (sync) - } - - @Override - public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { - this.applicationEventPublisher = applicationEventPublisher; - } - @Override public MqttConnectOptions getConnectionInfo() { MqttConnectOptions options = this.clientFactory.getConnectionOptions(); @@ -176,8 +135,18 @@ public MqttConnectOptions getConnectionInfo() { @Override protected void onInit() { super.onInit(); - Assert.state(getConverter() instanceof MqttMessageConverter, - "MessageConverter must be an MqttMessageConverter"); + MessageConverter converter = getConverter(); + if (converter == null) { + DefaultPahoMessageConverter defaultConverter = new DefaultPahoMessageConverter(getDefaultQos(), + getQosProcessor(), getDefaultRetained(), getRetainedProcessor()); + if (getBeanFactory() != null) { + defaultConverter.setBeanFactory(getBeanFactory()); + } + setConverter(defaultConverter); + } + else { + Assert.state(converter instanceof MqttMessageConverter, "MessageConverter must be an MqttMessageConverter"); + } } @Override @@ -189,7 +158,7 @@ protected void doStop() { try { IMqttAsyncClient theClient = this.client; if (theClient != null) { - theClient.disconnect().waitForCompletion(this.disconnectCompletionTimeout); + theClient.disconnect().waitForCompletion(getDisconnectCompletionTimeout()); theClient.close(); this.client = null; } @@ -213,7 +182,7 @@ private synchronized IMqttAsyncClient checkConnection() throws MqttException { this.client = this.clientFactory.getAsyncClientInstance(this.getUrl(), this.getClientId()); incrementClientInstance(); this.client.setCallback(this); - this.client.connect(connectionOptions).waitForCompletion(this.completionTimeout); + this.client.connect(connectionOptions).waitForCompletion(getCompletionTimeout()); logger.debug("Client connected"); } catch (MqttException e) { @@ -221,8 +190,9 @@ private synchronized IMqttAsyncClient checkConnection() throws MqttException { this.client.close(); this.client = null; } - if (this.applicationEventPublisher != null) { - this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, e)); + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, e)); } throw new MessagingException("Failed to connect", e); } @@ -236,11 +206,12 @@ protected void publish(String topic, Object mqttMessage, Message message) { try { IMqttDeliveryToken token = checkConnection() .publish(topic, (MqttMessage) mqttMessage); + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); if (!this.async) { - token.waitForCompletion(this.completionTimeout); // NOSONAR (sync) + token.waitForCompletion(getCompletionTimeout()); // NOSONAR (sync) } - else if (this.asyncEvents && this.applicationEventPublisher != null) { - this.applicationEventPublisher.publishEvent( + else if (this.asyncEvents && applicationEventPublisher != null) { + applicationEventPublisher.publishEvent( new MqttMessageSentEvent(this, message, topic, token.getMessageId(), getClientId(), getClientInstance())); } @@ -251,8 +222,9 @@ else if (this.asyncEvents && this.applicationEventPublisher != null) { } private void sendDeliveryComplete(IMqttDeliveryToken token) { - if (this.async && this.asyncEvents && this.applicationEventPublisher != null) { - this.applicationEventPublisher.publishEvent( + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (this.async && this.asyncEvents && applicationEventPublisher != null) { + applicationEventPublisher.publishEvent( new MqttMessageDeliveredEvent(this, token.getMessageId(), getClientId(), getClientInstance())); } @@ -270,8 +242,9 @@ public synchronized void connectionLost(Throwable cause) { // NOSONAR } this.client = null; - if (this.applicationEventPublisher != null) { - this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause)); + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause)); } } } diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java new file mode 100644 index 00000000000..2eb23d67b6a --- /dev/null +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java @@ -0,0 +1,305 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mqtt.outbound; + + +import java.nio.charset.StandardCharsets; + +import org.eclipse.paho.mqttv5.client.IMqttAsyncClient; +import org.eclipse.paho.mqttv5.client.IMqttToken; +import org.eclipse.paho.mqttv5.client.MqttAsyncClient; +import org.eclipse.paho.mqttv5.client.MqttCallback; +import org.eclipse.paho.mqttv5.client.MqttClientPersistence; +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; + +import org.springframework.beans.factory.BeanCreationException; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.integration.context.IntegrationContextUtils; +import org.springframework.integration.mapping.HeaderMapper; +import org.springframework.integration.mqtt.core.MqttComponent; +import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent; +import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent; +import org.springframework.integration.mqtt.event.MqttMessageSentEvent; +import org.springframework.integration.mqtt.event.MqttProtocolErrorEvent; +import org.springframework.integration.mqtt.support.MqttHeaderMapper; +import org.springframework.integration.mqtt.support.MqttMessageConverter; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandlingException; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.util.Assert; + +/** + * The {@link AbstractMqttMessageHandler} implementation for MQTT v5. + * + * @author Artem Bilan + * + * @since 5.5.5 + */ +public class Mqttv5PahoMessageHandler extends AbstractMqttMessageHandler + implements MqttCallback, MqttComponent { + + private final MqttConnectionOptions connectionOptions; + + private IMqttAsyncClient mqttClient; + + @Nullable + private MqttClientPersistence persistence; + + private boolean async; + + private boolean asyncEvents; + + private HeaderMapper headerMapper = new MqttHeaderMapper(); + + public Mqttv5PahoMessageHandler(String url, String clientId) { + super(url, clientId); + this.connectionOptions = new MqttConnectionOptions(); + this.connectionOptions.setServerURIs(new String[]{ url }); + this.connectionOptions.setAutomaticReconnect(true); + } + + public Mqttv5PahoMessageHandler(MqttConnectionOptions connectionOptions, String clientId) { + super(obtainServerUrlFromOptions(connectionOptions), clientId); + this.connectionOptions = connectionOptions; + if (!this.connectionOptions.isAutomaticReconnect()) { + logger.warn("It is recommended to set 'automaticReconnect' MQTT client option. " + + "Otherwise the current channel adapter restart should be used explicitly, " + + "e.g. via handling 'MqttConnectionFailedEvent' on client disconnection."); + } + } + + + private static String obtainServerUrlFromOptions(MqttConnectionOptions connectionOptions) { + Assert.notNull(connectionOptions, "'connectionOptions' must not be null"); + String[] serverURIs = connectionOptions.getServerURIs(); + Assert.notEmpty(serverURIs, "'serverURIs' must be provided in the 'MqttConnectionOptions'"); + return serverURIs[0]; + } + + @Override + public MqttConnectionOptions getConnectionInfo() { + return this.connectionOptions; + } + + public void setPersistence(@Nullable MqttClientPersistence persistence) { + this.persistence = persistence; + } + + public void setHeaderMapper(HeaderMapper headerMapper) { + Assert.notNull(headerMapper, "'headerMapper' must not be null"); + this.headerMapper = headerMapper; + } + + /** + * Set to true if you don't want to block when sending messages. Default false. + * When true, message sent/delivered events will be published for reception + * by a suitably configured 'ApplicationListener' or an event + * inbound-channel-adapter. + * @param async true for async. + * @see #setAsyncEvents(boolean) + */ + public void setAsync(boolean async) { + this.async = async; + } + + /** + * When {@link #setAsync(boolean)} is true, setting this to true enables + * publication of {@link MqttMessageSentEvent} and {@link MqttMessageDeliveredEvent} + * to be emitted. Default false. + * @param asyncEvents the asyncEvents. + */ + public void setAsyncEvents(boolean asyncEvents) { + this.asyncEvents = asyncEvents; + } + + @Override + protected void onInit() { + super.onInit(); + try { + this.mqttClient = new MqttAsyncClient(getUrl(), getClientId(), this.persistence); + this.mqttClient.setCallback(this); + incrementClientInstance(); + } + catch (MqttException ex) { + throw new BeanCreationException("Cannot create 'MqttAsyncClient' for: " + getComponentName(), ex); + } + if (getConverter() == null) { + setConverter(getBeanFactory() + .getBean(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME, + MessageConverter.class)); + } + else { + Assert.state(!(getConverter() instanceof MqttMessageConverter), + "MessageConverter must not be an MqttMessageConverter"); + } + } + + @Override + protected void doStart() { + try { + this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout()); + } + catch (MqttException ex) { + throw new IllegalStateException("Cannot connect 'MqttAsyncClient' for: " + getComponentName(), ex); + } + } + + @Override + protected void doStop() { + try { + this.mqttClient.disconnect().waitForCompletion(getDisconnectCompletionTimeout()); + } + catch (MqttException ex) { + logger.error(ex, "Failed to disconnect 'MqttAsyncClient'"); + } + } + + @Override + public void destroy() { + super.destroy(); + try { + this.mqttClient.close(); + } + catch (MqttException ex) { + logger.error(ex, "Failed to close 'MqttAsyncClient'"); + } + } + + @Override + protected void handleMessageInternal(Message message) { + MqttMessage mqttMessage; + Object payload = message.getPayload(); + if (payload instanceof MqttMessage) { + mqttMessage = (MqttMessage) payload; + } + else { + mqttMessage = buildMqttMessage(message); + } + + publish(obtainTopicToPublish(message), mqttMessage, message); + } + + private String obtainTopicToPublish(Message message) { + String topic = getTopicProcessor().processMessage(message); + if (topic == null) { + topic = getDefaultTopic(); + } + Assert.state(topic != null, + () -> "No topic could be determined from the '" + message + "' and no default topic defined"); + return topic; + } + + private MqttMessage buildMqttMessage(Message message) { + Object payload = message.getPayload(); + byte[] body; + if (payload instanceof byte[]) { + body = (byte[]) payload; + } + else if (payload instanceof String) { + body = ((String) payload).getBytes(StandardCharsets.UTF_8); + } + else { + MessageConverter converter = getConverter(); + body = (byte[]) converter.fromMessage(message, byte[].class); + Assert.state(body != null, + () -> "The MQTT payload cannot be null. The '" + converter + "' returned null for: " + message); + } + + MqttMessage mqttMessage = new MqttMessage(); + mqttMessage.setPayload(body); + Integer qos = getQosProcessor().processMessage(message); + mqttMessage.setQos(qos == null ? getDefaultQos() : qos); + Boolean retained = getRetainedProcessor().processMessage(message); + mqttMessage.setRetained(retained == null ? getDefaultRetained() : retained); + MqttProperties properties = new MqttProperties(); + this.headerMapper.fromHeaders(message.getHeaders(), properties); + mqttMessage.setProperties(properties); + return mqttMessage; + } + + @Override + protected void publish(String topic, Object mqttMessage, Message message) { + Assert.isInstanceOf(MqttMessage.class, mqttMessage, "The 'mqttMessage' must be an instance of 'MqttMessage'"); + try { + IMqttToken token = this.mqttClient.publish(topic, (MqttMessage) mqttMessage); + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (!this.async) { + token.waitForCompletion(getCompletionTimeout()); // NOSONAR (sync) + } + else if (this.asyncEvents && applicationEventPublisher != null) { + applicationEventPublisher.publishEvent( + new MqttMessageSentEvent(this, message, topic, token.getMessageId(), getClientId(), + getClientInstance())); + } + } + catch (MqttException ex) { + throw new MessageHandlingException(message, "Failed to publish to MQTT in the [" + this + ']', ex); + } + } + + private void sendDeliveryComplete(IMqttToken token) { + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (this.async && this.asyncEvents && applicationEventPublisher != null) { + applicationEventPublisher.publishEvent( + new MqttMessageDeliveredEvent(this, token.getMessageId(), getClientId(), + getClientInstance())); + } + } + + @Override + public void deliveryComplete(IMqttToken token) { + sendDeliveryComplete(token); + } + + @Override + public void disconnected(MqttDisconnectResponse disconnectResponse) { + MqttException cause = disconnectResponse.getException(); + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (cause != null && applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause)); + } + } + + @Override + public void mqttErrorOccurred(MqttException exception) { + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttProtocolErrorEvent(this, exception)); + } + } + + @Override + public void messageArrived(String topic, MqttMessage message) { + + } + + @Override + public void connectComplete(boolean reconnect, String serverURI) { + + } + + @Override + public void authPacketArrived(int reasonCode, MqttProperties properties) { + + } + +} diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/support/MqttHeaderMapper.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/support/MqttHeaderMapper.java new file mode 100644 index 00000000000..252ff8e2430 --- /dev/null +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/support/MqttHeaderMapper.java @@ -0,0 +1,208 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mqtt.support; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; + +import org.springframework.core.log.LogAccessor; +import org.springframework.core.log.LogMessage; +import org.springframework.integration.mapping.HeaderMapper; +import org.springframework.lang.Nullable; +import org.springframework.messaging.MessageHeaders; +import org.springframework.util.Assert; +import org.springframework.util.MimeType; +import org.springframework.util.PatternMatchUtils; + +/** + * The default {@link HeaderMapper} implementation for MQTT v5 message properties mapping. + * + * @author Artem Bilan + * + * @since 5.5.5 + */ +public class MqttHeaderMapper implements HeaderMapper { + + private static final LogAccessor LOGGER = new LogAccessor(MqttHeaderMapper.class); + + private String[] inboundHeaderNames = { "*" }; + + private String[] outboundHeaderNames = { + MessageHeaders.CONTENT_TYPE, + MqttHeaders.MESSAGE_EXPIRY_INTERVAL, + MqttHeaders.RESPONSE_TOPIC, + MqttHeaders.CORRELATION_DATA + }; + + /** + * Provide a list of patterns to map MQTT message properties into message headers. + * By default, it maps all valid MQTT PUBLISH packet headers + * (see {@link org.eclipse.paho.mqttv5.common.packet.MqttPublish}), including all the user properties. + * @param inboundHeaderNames the MQTT message property patterns to map. + */ + public void setInboundHeaderNames(String... inboundHeaderNames) { + Assert.notNull(inboundHeaderNames, "'inboundHeaderNames' must not be null"); + this.inboundHeaderNames = inboundHeaderNames; + } + + /** + * Provide a list of patterns to map header into a PUBLISH MQTT message. + * Default headers are: + * {@link MessageHeaders#CONTENT_TYPE}, {@link MqttHeaders#MESSAGE_EXPIRY_INTERVAL}, + * {@link MqttHeaders#RESPONSE_TOPIC}, {@link MqttHeaders#CORRELATION_DATA}. + * @param outboundHeaderNames the header patterns to map. + */ + public void setOutboundHeaderNames(String... outboundHeaderNames) { + Assert.notNull(outboundHeaderNames, "'outboundHeaderNames' must not be null"); + this.outboundHeaderNames = outboundHeaderNames; + } + + @Override + public void fromHeaders(MessageHeaders headers, MqttProperties target) { + for (Map.Entry entry : headers.entrySet()) { + String name = entry.getKey(); + if (shouldMapHeader(name, this.outboundHeaderNames)) { + Object value = entry.getValue(); + if (value != null) { + setMqttHeader(target, name, value); + } + } + } + } + + @Override + public Map toHeaders(MqttProperties source) { + Map headers = new HashMap<>(); + if (source.getPayloadFormat()) { + headers.compute(MessageHeaders.CONTENT_TYPE, (k, v) -> mapPropertyIfMatch(k, source.getContentType())); + } + headers.compute(MqttHeaders.TOPIC_ALIAS, (k, v) -> mapPropertyIfMatch(k, source.getTopicAlias())); + headers.compute(MqttHeaders.RESPONSE_TOPIC, (k, v) -> mapPropertyIfMatch(k, source.getResponseTopic())); + headers.compute(MqttHeaders.CORRELATION_DATA, (k, v) -> mapPropertyIfMatch(k, source.getCorrelationData())); + + List userProperties = source.getUserProperties(); + for (UserProperty userProperty : userProperties) { + String name = userProperty.getKey(); + if (shouldMapHeader(name, this.inboundHeaderNames)) { + headers.put(name, userProperty.getValue()); + } + } + return headers; + } + + private Object mapPropertyIfMatch(String headerName, @Nullable Object value) { + return (value != null && shouldMapHeader(headerName, this.inboundHeaderNames)) ? value : null; + } + + private static boolean shouldMapHeader(String headerName, String[] patterns) { + if (patterns != null && patterns.length > 0) { + for (String pattern : patterns) { + if (PatternMatchUtils.simpleMatch(pattern, headerName)) { + LOGGER.debug(LogMessage.format("headerName=[{0}] WILL be mapped, matched pattern={1}", + headerName, pattern)); + return true; + } + } + } + LOGGER.debug(LogMessage.format("headerName=[{0}] WILL NOT be mapped", headerName)); + return false; + } + + private static void setMqttHeader(MqttProperties target, String name, Object value) { + switch (name) { + case MessageHeaders.CONTENT_TYPE: + setContentType(target, value); + target.setPayloadFormat(true); + break; + case MqttHeaders.MESSAGE_EXPIRY_INTERVAL: + setMessageExpiryInterval(target, value); + break; + case MqttHeaders.RESPONSE_TOPIC: + setResponseTopic(target, value); + break; + case MqttHeaders.CORRELATION_DATA: + setCorrelationData(target, value); + break; + default: + if (value instanceof String) { + target.getUserProperties().add(new UserProperty(name, (String) value)); + } + else if (value != null) { + throw new IllegalArgumentException( + "Expected String value for MQTT user properties, but received: " + value.getClass()); + } + } + } + + private static void setContentType(MqttProperties target, Object value) { + if (value instanceof MimeType) { + target.setContentType(((MimeType) value).toString()); + } + else if (value instanceof String) { + target.setContentType((String) value); + } + else { + throw new IllegalArgumentException( + "Expected MediaType or String value for 'content-type' header value, but received: " + + value.getClass()); + } + } + + private static void setMessageExpiryInterval(MqttProperties target, Object value) { + if (value instanceof Long) { + target.setMessageExpiryInterval((Long) value); + } + else if (value instanceof String) { + target.setMessageExpiryInterval(Long.parseLong((String) value)); + } + else { + throw new IllegalArgumentException( + "Expected Long or String value for 'mqtt_messageExpiryInterval' header value, but received: " + + value.getClass()); + } + } + + private static void setResponseTopic(MqttProperties target, Object value) { + if (value instanceof String) { + target.setResponseTopic((String) value); + } + else { + throw new IllegalArgumentException( + "Expected String value for 'mqtt_responseTopic' header value, but received: " + value.getClass()); + } + } + + private static void setCorrelationData(MqttProperties target, Object value) { + if (value instanceof byte[]) { + target.setCorrelationData((byte[]) value); + } + else if (value instanceof String) { + target.setCorrelationData(((String) value).getBytes(StandardCharsets.UTF_8)); + } + else { + throw new IllegalArgumentException( + "Expected byte[] or String value for 'mqtt_correlationData' header value, but received: " + + value.getClass()); + } + } + +} diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/support/MqttHeaders.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/support/MqttHeaders.java index 727f95cf69c..19c730d33e3 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/support/MqttHeaders.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/support/MqttHeaders.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,6 +44,14 @@ public final class MqttHeaders { public static final String RECEIVED_TOPIC = PREFIX + "receivedTopic"; + public static final String MESSAGE_EXPIRY_INTERVAL = PREFIX + "messageExpiryInterval"; + + public static final String TOPIC_ALIAS = PREFIX + "topicAlias"; + + public static final String RESPONSE_TOPIC = PREFIX + "responseTopic"; + + public static final String CORRELATION_DATA = PREFIX + "correlationData"; + private MqttHeaders() { throw new AssertionError(); } diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java new file mode 100644 index 00000000000..c7bfcd28994 --- /dev/null +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java @@ -0,0 +1,172 @@ +/* + * Copyright 2002-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mqtt; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.EventListener; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.mqtt.event.MqttIntegrationEvent; +import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent; +import org.springframework.integration.mqtt.event.MqttMessageSentEvent; +import org.springframework.integration.mqtt.event.MqttSubscribedEvent; +import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler; +import org.springframework.integration.mqtt.support.MqttHeaderMapper; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.converter.AbstractMessageConverter; +import org.springframework.messaging.converter.SmartMessageConverter; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * @author Gary Russell + * @author Artem Bilan + * + * @since 4.0 + * + */ +@RunWith(SpringRunner.class) +@DirtiesContext +public class Mqttv5BackToBackTests { + + @ClassRule + public static final BrokerRunning brokerRunning = BrokerRunning.isRunning(1883); + + @Autowired + @Qualifier("mqttOutFlow.input") + private MessageChannel mqttOutFlowInput; + + @Autowired + private PollableChannel fromMqttChannel; + + @Autowired + private Config config; + + @Test + public void testSimpleMqttv5Interaction() { + String testPayload = "foo"; + + this.mqttOutFlowInput.send( + MessageBuilder.withPayload(testPayload) + .setHeader(MqttHeaders.TOPIC, "siTest") + .setHeader("foo", "bar") + .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") + .build()); + + Message receive = this.fromMqttChannel.receive(10_000); + + assertThat(receive).isNotNull(); + assertThat(receive.getPayload()).isEqualTo(testPayload); + assertThat(receive.getHeaders()) + .containsEntry("foo", "bar") + .containsEntry(MessageHeaders.CONTENT_TYPE, "text/plain"); + + assertThat(this.config.events) + .isNotEmpty() + .hasAtLeastOneElementOfType(MqttMessageSentEvent.class) + .hasAtLeastOneElementOfType(MqttMessageDeliveredEvent.class) + .hasAtLeastOneElementOfType(MqttSubscribedEvent.class); + } + + + @Configuration + @EnableIntegration + public static class Config { + + List events = new ArrayList<>(); + + @EventListener + void mqttEvents(MqttIntegrationEvent event) { + this.events.add(event); + } + + @Bean + public SmartMessageConverter mqttStringToBytesConverter() { + return new AbstractMessageConverter() { + + @Override + protected boolean supports(Class clazz) { + return true; + } + + @Override + protected Object convertFromInternal(Message message, Class targetClass, + Object conversionHint) { + + return message.getPayload().toString().getBytes(StandardCharsets.UTF_8); + } + + @Override + protected Object convertToInternal(Object payload, MessageHeaders headers, + Object conversionHint) { + + return new String((byte[]) payload); + } + + }; + } + + @Bean + public IntegrationFlow mqttOutFlow() { + Mqttv5PahoMessageHandler messageHandler = + new Mqttv5PahoMessageHandler("tcp://localhost:1883", "mqttv5SIout"); + MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper(); + mqttHeaderMapper.setOutboundHeaderNames("foo", MessageHeaders.CONTENT_TYPE); + messageHandler.setHeaderMapper(mqttHeaderMapper); + messageHandler.setAsync(true); + messageHandler.setAsyncEvents(true); + messageHandler.setConverter(mqttStringToBytesConverter()); + + return f -> f.handle(messageHandler); + } + + @Bean + public IntegrationFlow mqttInFlow() { + Mqttv5PahoMessageDrivenChannelAdapter messageProducer = + new Mqttv5PahoMessageDrivenChannelAdapter("tcp://localhost:1883", "mqttv5SIin", "siTest"); + messageProducer.setPayloadType(String.class); + messageProducer.setMessageConverter(mqttStringToBytesConverter()); + + return IntegrationFlows.from( + messageProducer) + .channel(c -> c.queue("fromMqttChannel")) + .get(); + } + + } + +} diff --git a/src/reference/asciidoc/mqtt.adoc b/src/reference/asciidoc/mqtt.adoc index 0ccde980cc0..b7c461f7fa1 100644 --- a/src/reference/asciidoc/mqtt.adoc +++ b/src/reference/asciidoc/mqtt.adoc @@ -417,3 +417,8 @@ String beanName = source.getBeanName(); MqttConnectOptions options = source.getConnectionInfo(); ---- ==== + +[[mqtt-v5]] +=== MQTT v5 Support + +TODO \ No newline at end of file diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 1d50e1cdc84..fd2e87204d1 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -21,6 +21,12 @@ If you are interested in more details, see the Issue Tracker tickets that were r A `FileSplitter.FileMaker`-based implementation of `CorrelationStrategy`, `ReleaseStrategy` and `MessageGroupProcessor` as a `FileAggregator` component was introduced. See <<./file.adoc#file-aggregator, File Aggregator>> for more information. +[[x5.5-mqtt-v5]] +==== MQTT v5 Support + +The `Mqttv5PahoMessageDrivenChannelAdapter` and `Mqttv5PahoMessageHandler` (including respective `MqttHeaderMapper`) were introduced to support MQTT v5 protocol communication. +See <<./mqtt.adoc#mqtt-v5, MQTT v5 Support>> for more information. + [[x5.5-general]] === General Changes From 084044f4f6a6cf7c98c177ef99461f4d4436a01e Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 5 Oct 2021 10:04:05 -0400 Subject: [PATCH 2/4] * Handle manual acks * Add `Mqttv5PahoMessageDrivenChannelAdapter.persistence` property --- .../Mqttv5PahoMessageDrivenChannelAdapter.java | 17 ++++++++++++++++- .../integration/mqtt/Mqttv5BackToBackTests.java | 10 +++++++++- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java index 16f85fee7d0..a178bea90f9 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java @@ -23,6 +23,7 @@ import org.eclipse.paho.mqttv5.client.IMqttToken; import org.eclipse.paho.mqttv5.client.MqttAsyncClient; import org.eclipse.paho.mqttv5.client.MqttCallback; +import org.eclipse.paho.mqttv5.client.MqttClientPersistence; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse; import org.eclipse.paho.mqttv5.common.MqttException; @@ -31,6 +32,7 @@ import org.springframework.beans.factory.BeanCreationException; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.acks.SimpleAcknowledgment; import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.mapping.HeaderMapper; @@ -41,6 +43,7 @@ import org.springframework.integration.mqtt.support.MqttHeaderMapper; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.integration.mqtt.support.MqttMessageConverter; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessagingException; @@ -73,6 +76,9 @@ public class Mqttv5PahoMessageDrivenChannelAdapter extends AbstractMqttMessageDr private IMqttAsyncClient mqttClient; + @Nullable + private MqttClientPersistence persistence; + private SmartMessageConverter messageConverter; private Class payloadType = byte[].class; @@ -103,6 +109,10 @@ public MqttConnectionOptions getConnectionInfo() { return this.connectionOptions; } + public void setPersistence(@Nullable MqttClientPersistence persistence) { + this.persistence = persistence; + } + @Override public void setConverter(MqttMessageConverter converter) { throw new UnsupportedOperationException("Use setMessageConverter(SmartMessageConverter) instead"); @@ -132,7 +142,7 @@ public void setHeaderMapper(HeaderMapper headerMapper) { protected void onInit() { super.onInit(); try { - this.mqttClient = new MqttAsyncClient(getUrl(), getClientId()); + this.mqttClient = new MqttAsyncClient(getUrl(), getClientId(), this.persistence); this.mqttClient.setCallback(this); this.mqttClient.setManualAcks(isManualAcks()); } @@ -228,6 +238,11 @@ public void messageArrived(String topic, MqttMessage mqttMessage) { headers.put(MqttHeaders.RECEIVED_RETAINED, mqttMessage.isRetained()); headers.put(MqttHeaders.RECEIVED_TOPIC, topic); + if (isManualAcks()) { + headers.put(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, + new AcknowledgmentImpl(mqttMessage.getId(), mqttMessage.getQos(), this.mqttClient)); + } + Object payload = MqttMessage.class.isAssignableFrom(this.payloadType) ? mqttMessage diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java index c7bfcd28994..78fbec431b5 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java @@ -31,6 +31,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.EventListener; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.acks.SimpleAcknowledgment; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; @@ -93,7 +95,12 @@ public void testSimpleMqttv5Interaction() { assertThat(receive.getPayload()).isEqualTo(testPayload); assertThat(receive.getHeaders()) .containsEntry("foo", "bar") - .containsEntry(MessageHeaders.CONTENT_TYPE, "text/plain"); + .containsEntry(MessageHeaders.CONTENT_TYPE, "text/plain") + .containsKey(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK); + + receive.getHeaders() + .get(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, SimpleAcknowledgment.class) + .acknowledge(); assertThat(this.config.events) .isNotEmpty() @@ -160,6 +167,7 @@ public IntegrationFlow mqttInFlow() { new Mqttv5PahoMessageDrivenChannelAdapter("tcp://localhost:1883", "mqttv5SIin", "siTest"); messageProducer.setPayloadType(String.class); messageProducer.setMessageConverter(mqttStringToBytesConverter()); + messageProducer.setManualAcks(true); return IntegrationFlows.from( messageProducer) From ee95a9cb81ca23622f0cd6919e876ee694b94b61 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 6 Oct 2021 10:49:15 -0400 Subject: [PATCH 3/4] * Add documentation * Add `MosquittoContainerTest` for TestContainers support with Mosquitto image --- .github/workflows/pr-build-workflow.yml | 6 +- build.gradle | 1 + ...stractMqttMessageDrivenChannelAdapter.java | 2 +- ...Mqttv5PahoMessageDrivenChannelAdapter.java | 2 +- .../mqtt/MosquittoContainerTest.java | 39 ++++++++++ .../mqtt/Mqttv5BackToBackTests.java | 19 ++--- src/checkstyle/checkstyle-suppressions.xml | 1 + src/reference/asciidoc/mqtt.adoc | 75 ++++++++++++++++++- 8 files changed, 126 insertions(+), 19 deletions(-) create mode 100644 spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MosquittoContainerTest.java diff --git a/.github/workflows/pr-build-workflow.yml b/.github/workflows/pr-build-workflow.yml index 769312b3aba..6ca162f7abe 100644 --- a/.github/workflows/pr-build-workflow.yml +++ b/.github/workflows/pr-build-workflow.yml @@ -12,14 +12,10 @@ jobs: services: rabbitmq: - image: rabbitmq:management + image: cyrilix/rabbitmq-mqtt ports: - 5672:5672 - 15672:15672 - - mosquitto: - image: eclipse-mosquitto - ports: - 1883:1883 mongodb: diff --git a/build.gradle b/build.gradle index ac7cf0c6a5d..e376e4fa177 100644 --- a/build.gradle +++ b/build.gradle @@ -251,6 +251,7 @@ configure(javaProjects) { subproject -> testImplementation 'org.jetbrains.kotlin:kotlin-reflect' testImplementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8' testImplementation 'io.projectreactor:reactor-test' + testImplementation 'org.testcontainers:junit-jupiter:1.16.0' testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java index c8f072980a9..e40c7e0578d 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java @@ -188,7 +188,7 @@ protected boolean isManualAcks() { * @param completionTimeout The timeout. * @since 4.1 */ - public synchronized void setCompletionTimeout(long completionTimeout) { + public void setCompletionTimeout(long completionTimeout) { this.completionTimeout = completionTimeout; } diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java index a178bea90f9..be233487be5 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java @@ -54,7 +54,7 @@ /** * The {@link AbstractMqttMessageDrivenChannelAdapter} implementation for MQTT v5. * - * The {@link MqttProperties} are mapped via provided {@link HeaderMapper}; + * The {@link MqttProperties} are mapped via the provided {@link HeaderMapper}; * meanwhile the regular {@link MqttMessage} properties are always mapped into headers. * * It is recommended to have the {@link MqttConnectionOptions#setAutomaticReconnect(boolean)} diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MosquittoContainerTest.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MosquittoContainerTest.java new file mode 100644 index 00000000000..3c189bc3ec4 --- /dev/null +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MosquittoContainerTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mqtt; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +/** + * + * @author Artem Bilan + * + * @since 5.5.5 + */ +@Testcontainers(disabledWithoutDocker = true) +public interface MosquittoContainerTest { + + @Container + GenericContainer MOSQUITTO_CONTAINER = + new GenericContainer<>("eclipse-mosquitto:2.0.12") + .withCommand("mosquitto -c /mosquitto-no-auth.conf") + .withReuse(true) + .withExposedPorts(1883); + +} diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java index 78fbec431b5..71983291b6d 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java @@ -22,9 +22,7 @@ import java.util.ArrayList; import java.util.List; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -52,7 +50,7 @@ import org.springframework.messaging.converter.AbstractMessageConverter; import org.springframework.messaging.converter.SmartMessageConverter; import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; /** * @author Gary Russell @@ -61,12 +59,9 @@ * @since 4.0 * */ -@RunWith(SpringRunner.class) +@SpringJUnitConfig @DirtiesContext -public class Mqttv5BackToBackTests { - - @ClassRule - public static final BrokerRunning brokerRunning = BrokerRunning.isRunning(1883); +public class Mqttv5BackToBackTests implements MosquittoContainerTest { @Autowired @Qualifier("mqttOutFlow.input") @@ -114,6 +109,8 @@ public void testSimpleMqttv5Interaction() { @EnableIntegration public static class Config { + private static final String MQTT_URL = "tcp://localhost:" + MOSQUITTO_CONTAINER.getFirstMappedPort(); + List events = new ArrayList<>(); @EventListener @@ -150,7 +147,7 @@ protected Object convertToInternal(Object payload, MessageHeaders headers, @Bean public IntegrationFlow mqttOutFlow() { Mqttv5PahoMessageHandler messageHandler = - new Mqttv5PahoMessageHandler("tcp://localhost:1883", "mqttv5SIout"); + new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout"); MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper(); mqttHeaderMapper.setOutboundHeaderNames("foo", MessageHeaders.CONTENT_TYPE); messageHandler.setHeaderMapper(mqttHeaderMapper); @@ -164,7 +161,7 @@ public IntegrationFlow mqttOutFlow() { @Bean public IntegrationFlow mqttInFlow() { Mqttv5PahoMessageDrivenChannelAdapter messageProducer = - new Mqttv5PahoMessageDrivenChannelAdapter("tcp://localhost:1883", "mqttv5SIin", "siTest"); + new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest"); messageProducer.setPayloadType(String.class); messageProducer.setMessageConverter(mqttStringToBytesConverter()); messageProducer.setManualAcks(true); diff --git a/src/checkstyle/checkstyle-suppressions.xml b/src/checkstyle/checkstyle-suppressions.xml index 51b0e524944..d51d566bde4 100644 --- a/src/checkstyle/checkstyle-suppressions.xml +++ b/src/checkstyle/checkstyle-suppressions.xml @@ -12,4 +12,5 @@ + diff --git a/src/reference/asciidoc/mqtt.adoc b/src/reference/asciidoc/mqtt.adoc index b7c461f7fa1..e38aeaf37cb 100644 --- a/src/reference/asciidoc/mqtt.adoc +++ b/src/reference/asciidoc/mqtt.adoc @@ -25,6 +25,9 @@ compile "org.springframework.integration:spring-integration-mqtt:{project-versio The current implementation uses the https://www.eclipse.org/paho/[Eclipse Paho MQTT Client] library. +IMPORTANT: The XML configuration and most of this chapter are about MQTT v3.1 protocol support and respective Paho Client. +See <> paragraph for respective protocol support. + Configuration of both adapters is achieved using the `DefaultMqttPahoClientFactory`. Refer to the Paho documentation for more information about configuration options. @@ -421,4 +424,74 @@ MqttConnectOptions options = source.getConnectionInfo(); [[mqtt-v5]] === MQTT v5 Support -TODO \ No newline at end of file +Starting with version 5.5.5, `spring-integration-mqtt` module provides channel adapter implementations for MQTT v5 protocol. +The `org.eclipse.paho:org.eclipse.paho.mqttv5.client` is `optional` dependency, so has to be included explicitly in the target project. + +Since MQTT v5 protocol supports extra arbitrary properties in MQTT message, the `MqttHeaderMapper` implementation has been introduced to map to/from headers on publish and receive operations. +By default (`*` pattern) it maps all the received `PUBLISH` frame properties (including user properties). +On the outbound side it maps these subset of headers for `PUBLISH` frame: `contentType`, `mqtt_messageExpiryInterval`, `mqtt_responseTopic`, `mqtt_correlationData`. + +The outbound channel adapter for MQTT v5 protocol is present as an `Mqttv5PahoMessageHandler`. +It requires a `clientId` and MQTT broker URL or `MqttConnectionOptions` reference. +It does support a `MqttClientPersistence` option, can be `async` and can emit `MqttIntegrationEvent` objects in that case (see `asyncEvents` option). +If request message payload is an `org.eclipse.paho.mqttv5.common.MqttMessage`, it is published as is via internal `IMqttAsyncClient`. +If the payload is `byte[]` it is used as is for target `MqttMessage` payload to publish. +If the payload is a `String` it is converted to `byte[]` to publish. +The rest use-cases are delegated to the provided `MessageConverter` which is a `IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME` `ConfigurableCompositeMessageConverter` bean from the application context. +Note: the provided `HeaderMapper` is not used when the requested message payload is already an `MqttMessage`. +The following Java DSL configuration sample demonstrates how to use this channel adapter in the integration flow: + +==== +[source, java] +---- +@Bean +public IntegrationFlow mqttOutFlow() { + Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout"); + MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper(); + mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE); + messageHandler.setHeaderMapper(mqttHeaderMapper); + messageHandler.setAsync(true); + messageHandler.setAsyncEvents(true); + messageHandler.setConverter(mqttStringToBytesConverter()); + + return f -> f.handle(messageHandler); +} +---- +==== + +IMPORTANT: The `org.springframework.integration.mqtt.support.MqttMessageConverter` cannot be used with the `Mqttv5PahoMessageHandler` since its contract is aimed only for MQTT v3 protocol. + +See more information in the `Mqttv5PahoMessageHandler` javadocs and its superclass. + +The inbound channel adapter for MQTT v5 protocol is present as an `Mqttv5PahoMessageDrivenChannelAdapter`. +It requires a `clientId` and MQTT broker URL or `MqttConnectionOptions` reference, plus topics to subscribe and consume. +It does support a `MqttClientPersistence` option, which is in-memory by default. +The expected `payloadType` (`byte[]` by default) can be configured and it is propagated to the provided `SmartMessageConverter` for conversion from `byte[]` of the received `MqttMessage`. +If the `manualAck` option is set, then an `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK` header is added to the message to produce as an instance of `SimpleAcknowledgment`. +The `HeaderMapper` is used to map `PUBLISH` frame properties (including user properties) into the target message headers. +Standard `MqttMessage` properties, such as `qos`, `id`, `dup`, `retained`, plus received topic are always mapped to headers. +See `MqttHeaders` for more information. + +The following Java DSL configuration sample demonstrates how to use this channel adapter in the integration flow: + +==== +[source, java] +---- +@Bean +public IntegrationFlow mqttInFlow() { + Mqttv5PahoMessageDrivenChannelAdapter messageProducer = + new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest"); + messageProducer.setPayloadType(String.class); + messageProducer.setMessageConverter(mqttStringToBytesConverter()); + messageProducer.setManualAcks(true); + + return IntegrationFlows.from(messageProducer) + .channel(c -> c.queue("fromMqttChannel")) + .get(); +} +---- +==== + +IMPORTANT: The `org.springframework.integration.mqtt.support.MqttMessageConverter` cannot be used with the `Mqttv5PahoMessageDrivenChannelAdapter` since its contract is aimed only for MQTT v3 protocol. + +See more information in the `Mqttv5PahoMessageDrivenChannelAdapter` javadocs and its superclass. \ No newline at end of file From 357a22f733931e671ab1820d8930a16c0cab0572 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 6 Oct 2021 11:27:05 -0400 Subject: [PATCH 4/4] Fix language in the docs after review Co-authored-by: Gary Russell --- src/reference/asciidoc/mqtt.adoc | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/reference/asciidoc/mqtt.adoc b/src/reference/asciidoc/mqtt.adoc index e38aeaf37cb..efc8babddac 100644 --- a/src/reference/asciidoc/mqtt.adoc +++ b/src/reference/asciidoc/mqtt.adoc @@ -424,20 +424,20 @@ MqttConnectOptions options = source.getConnectionInfo(); [[mqtt-v5]] === MQTT v5 Support -Starting with version 5.5.5, `spring-integration-mqtt` module provides channel adapter implementations for MQTT v5 protocol. -The `org.eclipse.paho:org.eclipse.paho.mqttv5.client` is `optional` dependency, so has to be included explicitly in the target project. +Starting with version 5.5.5, the `spring-integration-mqtt` module provides channel adapter implementations for the MQTT v5 protocol. +The `org.eclipse.paho:org.eclipse.paho.mqttv5.client` is an `optional` dependency, so has to be included explicitly in the target project. -Since MQTT v5 protocol supports extra arbitrary properties in MQTT message, the `MqttHeaderMapper` implementation has been introduced to map to/from headers on publish and receive operations. -By default (`*` pattern) it maps all the received `PUBLISH` frame properties (including user properties). -On the outbound side it maps these subset of headers for `PUBLISH` frame: `contentType`, `mqtt_messageExpiryInterval`, `mqtt_responseTopic`, `mqtt_correlationData`. +Since the MQTT v5 protocol supports extra arbitrary properties in an MQTT message, the `MqttHeaderMapper` implementation has been introduced to map to/from headers on publish and receive operations. +By default (via the `*` pattern) it maps all the received `PUBLISH` frame properties (including user properties). +On the outbound side it maps this subset of headers for `PUBLISH` frame: `contentType`, `mqtt_messageExpiryInterval`, `mqtt_responseTopic`, `mqtt_correlationData`. -The outbound channel adapter for MQTT v5 protocol is present as an `Mqttv5PahoMessageHandler`. +The outbound channel adapter for the MQTT v5 protocol is present as an `Mqttv5PahoMessageHandler`. It requires a `clientId` and MQTT broker URL or `MqttConnectionOptions` reference. -It does support a `MqttClientPersistence` option, can be `async` and can emit `MqttIntegrationEvent` objects in that case (see `asyncEvents` option). -If request message payload is an `org.eclipse.paho.mqttv5.common.MqttMessage`, it is published as is via internal `IMqttAsyncClient`. -If the payload is `byte[]` it is used as is for target `MqttMessage` payload to publish. +It supports a `MqttClientPersistence` option, can be `async` and can emit `MqttIntegrationEvent` objects in that case (see `asyncEvents` option). +If a request message payload is an `org.eclipse.paho.mqttv5.common.MqttMessage`, it is published as is via the internal `IMqttAsyncClient`. +If the payload is `byte[]` it is used as is for the target `MqttMessage` payload to publish. If the payload is a `String` it is converted to `byte[]` to publish. -The rest use-cases are delegated to the provided `MessageConverter` which is a `IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME` `ConfigurableCompositeMessageConverter` bean from the application context. +The remaining use-cases are delegated to the provided `MessageConverter` which is a `IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME` `ConfigurableCompositeMessageConverter` bean from the application context. Note: the provided `HeaderMapper` is not used when the requested message payload is already an `MqttMessage`. The following Java DSL configuration sample demonstrates how to use this channel adapter in the integration flow: @@ -459,13 +459,13 @@ public IntegrationFlow mqttOutFlow() { ---- ==== -IMPORTANT: The `org.springframework.integration.mqtt.support.MqttMessageConverter` cannot be used with the `Mqttv5PahoMessageHandler` since its contract is aimed only for MQTT v3 protocol. +IMPORTANT: The `org.springframework.integration.mqtt.support.MqttMessageConverter` cannot be used with the `Mqttv5PahoMessageHandler` since its contract is aimed only for the MQTT v3 protocol. See more information in the `Mqttv5PahoMessageHandler` javadocs and its superclass. -The inbound channel adapter for MQTT v5 protocol is present as an `Mqttv5PahoMessageDrivenChannelAdapter`. -It requires a `clientId` and MQTT broker URL or `MqttConnectionOptions` reference, plus topics to subscribe and consume. -It does support a `MqttClientPersistence` option, which is in-memory by default. +The inbound channel adapter for the MQTT v5 protocol is present as an `Mqttv5PahoMessageDrivenChannelAdapter`. +It requires a `clientId` and MQTT broker URL or `MqttConnectionOptions` reference, plus topics to which to subscribe and consume from. +It supports a `MqttClientPersistence` option, which is in-memory by default. The expected `payloadType` (`byte[]` by default) can be configured and it is propagated to the provided `SmartMessageConverter` for conversion from `byte[]` of the received `MqttMessage`. If the `manualAck` option is set, then an `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK` header is added to the message to produce as an instance of `SimpleAcknowledgment`. The `HeaderMapper` is used to map `PUBLISH` frame properties (including user properties) into the target message headers. @@ -492,6 +492,6 @@ public IntegrationFlow mqttInFlow() { ---- ==== -IMPORTANT: The `org.springframework.integration.mqtt.support.MqttMessageConverter` cannot be used with the `Mqttv5PahoMessageDrivenChannelAdapter` since its contract is aimed only for MQTT v3 protocol. +IMPORTANT: The `org.springframework.integration.mqtt.support.MqttMessageConverter` cannot be used with the `Mqttv5PahoMessageDrivenChannelAdapter` since its contract is aimed only for the MQTT v3 protocol. See more information in the `Mqttv5PahoMessageDrivenChannelAdapter` javadocs and its superclass. \ No newline at end of file