|
21 | 21 | import java.util.Collections; |
22 | 22 | import java.util.List; |
23 | 23 | import java.util.Map; |
24 | | -import java.util.concurrent.atomic.AtomicInteger; |
25 | 24 | import java.util.concurrent.locks.Lock; |
26 | 25 | import java.util.concurrent.locks.ReentrantLock; |
27 | 26 | import java.util.stream.IntStream; |
28 | 27 |
|
29 | 28 | import org.eclipse.paho.mqttv5.client.IMqttAsyncClient; |
30 | | -import org.eclipse.paho.mqttv5.client.IMqttMessageListener; |
31 | 29 | import org.eclipse.paho.mqttv5.client.IMqttToken; |
32 | 30 | import org.eclipse.paho.mqttv5.client.MqttAsyncClient; |
33 | 31 | import org.eclipse.paho.mqttv5.client.MqttCallback; |
@@ -109,8 +107,6 @@ public class Mqttv5PahoMessageDrivenChannelAdapter |
109 | 107 |
|
110 | 108 | private volatile boolean readyToSubscribeOnStart; |
111 | 109 |
|
112 | | - private final AtomicInteger subscriptionIdentifierCounter = new AtomicInteger(0); |
113 | | - |
114 | 110 | /** |
115 | 111 | * Create an instance based on the MQTT url, client id and subscriptions. |
116 | 112 | * @param url the MQTT url to connect. |
@@ -344,9 +340,10 @@ public void addTopic(String topic, int qos) { |
344 | 340 | } |
345 | 341 | if (this.mqttClient != null && this.mqttClient.isConnected()) { |
346 | 342 | MqttProperties subscriptionProperties = new MqttProperties(); |
347 | | - subscriptionProperties.setSubscriptionIdentifier(this.subscriptionIdentifierCounter.incrementAndGet()); |
| 343 | + // Make use of mqttSession.getNextSubscriptionIdentifier() if available in connection |
| 344 | + subscriptionProperties.setSubscriptionIdentifiers(List.of(0)); |
348 | 345 | this.mqttClient.subscribe(new MqttSubscription[] {subscription}, |
349 | | - null, null, new IMqttMessageListener[] {this::messageArrived}, subscriptionProperties) |
| 346 | + null, null, this::messageArrived, subscriptionProperties) |
350 | 347 | .waitForCompletion(getCompletionTimeout()); |
351 | 348 | } |
352 | 349 | } |
@@ -472,15 +469,10 @@ private void subscribe() { |
472 | 469 | ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); |
473 | 470 | this.topicLock.lock(); |
474 | 471 | try { |
475 | | - IMqttMessageListener listener = this::messageArrived; |
476 | | - IMqttMessageListener[] listeners = IntStream.range(0, mqttSubscriptions.length) |
477 | | - .mapToObj(t -> listener) |
478 | | - .toArray(IMqttMessageListener[]::new); |
479 | 472 | MqttProperties subscriptionProperties = new MqttProperties(); |
480 | | - subscriptionProperties.setSubscriptionIdentifiers(IntStream.range(0, mqttSubscriptions.length) |
481 | | - .mapToObj(i -> this.subscriptionIdentifierCounter.incrementAndGet()) |
482 | | - .toList()); |
483 | | - this.mqttClient.subscribe(mqttSubscriptions, null, null, listeners, subscriptionProperties) |
| 473 | + // Make use of mqttSession.getNextSubscriptionIdentifier() if available in connection |
| 474 | + subscriptionProperties.setSubscriptionIdentifiers(List.of(0)); |
| 475 | + this.mqttClient.subscribe(mqttSubscriptions, null, null, this::messageArrived, subscriptionProperties) |
484 | 476 | .waitForCompletion(getCompletionTimeout()); |
485 | 477 | String message = "Connected and subscribed to " + Arrays.toString(mqttSubscriptions); |
486 | 478 | logger.debug(message); |
|
0 commit comments