Skip to content

Commit de510dd

Browse files
committed
adding required chnages
* `ClientManager`, `Mqttv3ClientManager`, `Mqttv5ClientManager`, `AbstractMqttMessageDrivenChannelAdapter` - renamig `isConnection()` to `isConnected()` * fixing docs `mqtt.adoc` and `whats-new.adoc` * `ClientManagerBackToBackTests` adding factory interface `MessageDrivenChannelAdapterFactory` to create adapters
1 parent 0ae2d5b commit de510dd

File tree

7 files changed

+41
-33
lines changed

7 files changed

+41
-33
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public interface ClientManager<T, C> extends SmartLifecycle, MqttComponent<C> {
7474
* @return the managed clients isConnected.
7575
* @since 6.4
7676
*/
77-
boolean isConnection();
77+
boolean isConnected();
7878

7979
/**
8080
* A contract for a custom callback on {@code connectComplete} event from the client.

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ public void deliveryComplete(IMqttDeliveryToken token) {
200200
}
201201

202202
@Override
203-
public boolean isConnection() {
203+
public boolean isConnected() {
204204
this.lock.lock();
205205
try {
206206
IMqttAsyncClient client = getClient();

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ public void mqttErrorOccurred(MqttException exception) {
208208
}
209209

210210
@Override
211-
public boolean isConnection() {
211+
public boolean isConnected() {
212212
this.lock.lock();
213213
try {
214214
IMqttAsyncClient client = getClient();

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ protected void onInit() {
204204
super.onInit();
205205
if (this.clientManager != null) {
206206
this.clientManager.addCallback(this);
207-
if (this.clientManager.isConnection()) {
207+
if (this.clientManager.isConnected()) {
208208
connectComplete(false);
209209
}
210210
}

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ClientManagerBackToBackTests.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.integration.mqtt;
1818

19-
import java.lang.reflect.Constructor;
2019
import java.nio.charset.StandardCharsets;
2120
import java.util.concurrent.CountDownLatch;
2221
import java.util.concurrent.TimeUnit;
@@ -25,6 +24,7 @@
2524
import org.eclipse.paho.client.mqttv3.MqttException;
2625
import org.junit.jupiter.api.Test;
2726

27+
import org.springframework.context.ApplicationContext;
2828
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
2929
import org.springframework.context.annotation.Bean;
3030
import org.springframework.context.annotation.Configuration;
@@ -35,7 +35,6 @@
3535
import org.springframework.integration.dsl.IntegrationFlow;
3636
import org.springframework.integration.dsl.context.IntegrationFlowContext;
3737
import org.springframework.integration.endpoint.MessageProducerSupport;
38-
import org.springframework.integration.mqtt.core.ClientManager;
3938
import org.springframework.integration.mqtt.core.Mqttv3ClientManager;
4039
import org.springframework.integration.mqtt.core.Mqttv5ClientManager;
4140
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
@@ -84,7 +83,7 @@ void testV3ClientManagerStarted() throws Exception {
8483
@Test
8584
void testV3ClientManagerRuntime() throws Exception {
8685
testSubscribeAndPublishRuntime(Mqttv3ConfigRuntime.class, Mqttv3ConfigRuntime.TOPIC_NAME,
87-
Mqttv3ConfigRuntime.subscribedLatch, Mqttv3ConfigRuntime.adapter);
86+
Mqttv3ConfigRuntime.subscribedLatch);
8887
}
8988

9089
@Test
@@ -102,7 +101,7 @@ void testV5ClientManagerStarted() throws Exception {
102101
@Test
103102
void testV5ClientManagerRuntime() throws Exception {
104103
testSubscribeAndPublishRuntime(Mqttv5ConfigRuntime.class, Mqttv5ConfigRuntime.TOPIC_NAME,
105-
Mqttv5ConfigRuntime.subscribedLatch, Mqttv5ConfigRuntime.adapter);
104+
Mqttv5ConfigRuntime.subscribedLatch);
106105
}
107106

108107
private void testSubscribeAndPublish(Class<?> configClass, String topicName, CountDownLatch subscribedLatch)
@@ -131,19 +130,18 @@ private void testSubscribeAndPublish(Class<?> configClass, String topicName, Cou
131130
}
132131
}
133132

134-
private void testSubscribeAndPublishRuntime(Class<?> configClass, String topicName, CountDownLatch subscribedLatch, Class<?> adapter)
133+
private void testSubscribeAndPublishRuntime(Class<?> configClass, String topicName, CountDownLatch subscribedLatch)
135134
throws Exception {
136135

137136
try (var ctx = new AnnotationConfigApplicationContext(configClass)) {
138137
// given
139138
var input = ctx.getBean("mqttOutFlow.input", MessageChannel.class);
140139
var flowContext = ctx.getBean(IntegrationFlowContext.class);
141-
var clientManager = ctx.getBean(ClientManager.class);
140+
var factory = ctx.getBean(MessageDrivenChannelAdapterFactory.class);
142141
var output = new QueueChannel();
143-
Class<?>[] parameterTypes = {ClientManager.class, String[].class};
144-
Constructor<?> declaredConstructor = adapter.getConstructor(parameterTypes);
142+
145143
flowContext.registration(IntegrationFlow
146-
.from((MessageProducerSupport) declaredConstructor.newInstance(clientManager, new String[] {topicName}))
144+
.from(factory.createMessageDrivenAdapter(ctx))
147145
.channel(output)
148146
.get()).register();
149147
String testPayload = "foo";
@@ -279,14 +277,12 @@ public IntegrationFlow mqttInFlow(Mqttv3ClientManager mqttv3ClientManager) {
279277

280278
@Configuration
281279
@EnableIntegration
282-
public static class Mqttv3ConfigRuntime {
280+
public static class Mqttv3ConfigRuntime implements MessageDrivenChannelAdapterFactory {
283281

284282
static final String TOPIC_NAME = "test-topic-v3";
285283

286284
static final CountDownLatch subscribedLatch = new CountDownLatch(1);
287285

288-
static final Class<?> adapter = MqttPahoMessageDrivenChannelAdapter.class;
289-
290286
@EventListener
291287
public void onSubscribed(MqttSubscribedEvent e) {
292288
subscribedLatch.countDown();
@@ -305,6 +301,11 @@ public IntegrationFlow mqttOutFlow(Mqttv3ClientManager mqttv3ClientManager) {
305301
return f -> f.handle(new MqttPahoMessageHandler(mqttv3ClientManager));
306302
}
307303

304+
@Override
305+
public MessageProducerSupport createMessageDrivenAdapter(ApplicationContext ctx) {
306+
var clientManager = ctx.getBean(Mqttv3ClientManager.class);
307+
return new MqttPahoMessageDrivenChannelAdapter(clientManager, TOPIC_NAME);
308+
}
308309
}
309310

310311
@Configuration
@@ -414,14 +415,12 @@ public IntegrationFlow mqttInFlow(Mqttv5ClientManager mqttv5ClientManager) {
414415

415416
@Configuration
416417
@EnableIntegration
417-
public static class Mqttv5ConfigRuntime {
418+
public static class Mqttv5ConfigRuntime implements MessageDrivenChannelAdapterFactory {
418419

419420
static final String TOPIC_NAME = "test-topic-v5";
420421

421422
static final CountDownLatch subscribedLatch = new CountDownLatch(1);
422423

423-
static final Class<?> adapter = Mqttv5PahoMessageDrivenChannelAdapter.class;
424-
425424
@EventListener
426425
public void onSubscribed(MqttSubscribedEvent e) {
427426
subscribedLatch.countDown();
@@ -438,6 +437,15 @@ public Mqttv5PahoMessageHandler mqttv5PahoMessageHandler(Mqttv5ClientManager mqt
438437
return new Mqttv5PahoMessageHandler(mqttv5ClientManager);
439438
}
440439

440+
@Override
441+
public MessageProducerSupport createMessageDrivenAdapter(ApplicationContext ctx) {
442+
var clientManager = ctx.getBean(Mqttv5ClientManager.class);
443+
return new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, TOPIC_NAME);
444+
}
445+
}
446+
447+
interface MessageDrivenChannelAdapterFactory {
448+
MessageProducerSupport createMessageDrivenAdapter(ApplicationContext ctx);
441449
}
442450

443451
record ClientV3Disconnector(Mqttv3ClientManager clientManager) {

src/reference/antora/modules/ROOT/pages/mqtt.adoc

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -382,9 +382,9 @@ public class MqttJavaApplication {
382382
.run(args);
383383
}
384384
385-
@Bean
386-
public IntegrationFlow mqttOutboundFlow() {
387-
return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
385+
@Bean
386+
public IntegrationFlow mqttOutboundFlow() {
387+
return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
388388
}
389389
390390
}
@@ -553,14 +553,14 @@ NOTE: Starting with version 6.4, multiple instances of `MqttPahoMessageDrivenCha
553553

554554
[source,java]
555555
----
556-
private void addAddRuntimeAdapter(IntegrationFlowContext flowContext,Mqttv5ClientManager clientManager,
557-
String topic,MessageChannel channel){
558-
flowContext
559-
.registration(
560-
IntegrationFlow
561-
.from(new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, topic))
562-
.channel(channel)
563-
.get())
564-
.register();
556+
private void addAddRuntimeAdapter(IntegrationFlowContext flowContext, Mqttv5ClientManager clientManager,
557+
String topic, MessageChannel channel) {
558+
flowContext
559+
.registration(
560+
IntegrationFlow
561+
.from(new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, topic))
562+
.channel(channel)
563+
.get())
564+
.register();
565565
}
566566
----

src/reference/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ See xref:control-bus.adoc[Control Bus] for more information.
6868
The `DefaultSftpSessionFactory` now exposes a `Consumer<SshClient>` configurer property to further customize an internal `SshClient`.
6969
See xref:sftp/session-factory.adoc[SFTP Session Factory] for more information.
7070

71-
[[x6.4-mqtt-changes]]
72-
=== mqtt Changes
71+
[[x6.4-mqtt-support-changes]]
72+
=== MQTT Support Changes
7373

7474
Multiple instances of `MqttPahoMessageDrivenChannelAdapter` and `Mqttv5PahoMessageDrivenChannelAdapter` can now be added at runtime using corresponding `ClientManager` through `IntegrationFlowContext`
7575
See xref:mqtt.adoc[MQTT Support] for more information.

0 commit comments

Comments
 (0)