Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ protected synchronized void doStop() {
&& this.clientFactory.getConnectionOptions().isCleanSession())) {

this.client.unsubscribe(getTopic());
// Have to re-subscribe on next start if connection is not lost.
this.readyToSubscribeOnStart = true;

}
}
catch (MqttException ex1) {
Expand Down Expand Up @@ -341,6 +344,10 @@ public synchronized void connectionLost(Throwable cause) {
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause));
}
}
else {
// The 'connectComplete()' re-subscribes or sets this flag otherwise.
this.readyToSubscribeOnStart = false;
}
}

@Override
Expand Down Expand Up @@ -404,7 +411,9 @@ public void connectComplete(boolean isReconnect) {

@Override
public void connectComplete(boolean reconnect, String serverURI) {
if (isRunning()) {
// The 'running' flag is set after 'doStart()', so possible a race condition
// when start is not finished yet, but server answers with successful connection.
if (isActive()) {
subscribe();
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,17 @@ private synchronized void connect() throws MqttException {

@Override
protected void doStop() {
this.readyToSubscribeOnStart = false;
this.topicLock.lock();
this.readyToSubscribeOnStart = false;
String[] topics = getTopic();
try {
if (this.mqttClient != null && this.mqttClient.isConnected()) {
if (this.connectionOptions.isCleanStart()) {
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
}
// Have to re-subscribe on next start if connection is not lost.
this.readyToSubscribeOnStart = true;

}
if (getClientManager() == null) {
this.mqttClient.disconnectForcibly(getDisconnectCompletionTimeout());
}
Expand Down Expand Up @@ -331,10 +333,16 @@ public void messageArrived(String topic, MqttMessage mqttMessage) {

@Override
public void disconnected(MqttDisconnectResponse disconnectResponse) {
MqttException cause = disconnectResponse.getException();
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause));
if (isRunning()) {
MqttException cause = disconnectResponse.getException();
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause));
}
}
else {
// The 'connectComplete()' re-subscribes or sets this flag otherwise.
this.readyToSubscribeOnStart = false;
}
}

Expand All @@ -358,7 +366,9 @@ public void connectComplete(boolean isReconnect) {

@Override
public void connectComplete(boolean reconnect, String serverURI) {
if (isRunning()) {
// The 'running' flag is set after 'doStart()', so possible a race condition
// when start is not finished yet, but server answers with successful connection.
if (isActive()) {
subscribe();
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,35 @@ public class ResubscribeAfterAutomaticReconnectTests implements MosquittoContain
@Autowired
private MqttConnectionOptions connectionOptions;

@Autowired
Mqttv5PahoMessageDrivenChannelAdapter pahoMessageDrivenChannelAdapter;

@Autowired
Config config;

@Test
void messageReceivedAfterResubscriptionOnLostConnection() throws InterruptedException {
GenericMessage<String> testMessage = new GenericMessage<>("test");

assertThat(this.config.subscribeFirstLatch.await(10, TimeUnit.SECONDS)).isTrue();

this.mqttOutFlowInput.send(testMessage);
assertThat(this.fromMqttChannel.receive(10_000)).isNotNull();

MOSQUITTO_CONTAINER.stop();
MOSQUITTO_CONTAINER.start();
connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()});

assertThat(this.config.subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.subscribeSecondLatch.await(10, TimeUnit.SECONDS)).isTrue();

this.mqttOutFlowInput.send(testMessage);
assertThat(this.fromMqttChannel.receive(10_000)).isNotNull();

// Re-subscription on channel adapter restart with cleanStart
this.pahoMessageDrivenChannelAdapter.stop();
this.pahoMessageDrivenChannelAdapter.start();

assertThat(this.config.subscribeThirdLatch.await(10, TimeUnit.SECONDS)).isTrue();

this.mqttOutFlowInput.send(testMessage);
assertThat(this.fromMqttChannel.receive(10_000)).isNotNull();
Expand All @@ -83,13 +98,18 @@ void messageReceivedAfterResubscriptionOnLostConnection() throws InterruptedExce
@EnableIntegration
public static class Config {

CountDownLatch subscribeLatch = new CountDownLatch(2);
CountDownLatch subscribeFirstLatch = new CountDownLatch(1);

CountDownLatch subscribeSecondLatch = new CountDownLatch(2);

CountDownLatch subscribeThirdLatch = new CountDownLatch(3);

@Bean
public MqttConnectionOptions mqttConnectOptions() {
return new MqttConnectionOptionsBuilder()
.serverURI(MosquittoContainerTest.mqttUrl())
.automaticReconnect(true)
.cleanStart(true)
.build();
}

Expand All @@ -105,15 +125,16 @@ public IntegrationFlow mqttOutFlow(MqttConnectionOptions mqttConnectOptions) {
public IntegrationFlow mqttInFlow(MqttConnectionOptions mqttConnectOptions) {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(mqttConnectOptions, "mqttInClient", "siTest");

return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}

@EventListener(MqttSubscribedEvent.class)
public void mqttEvents() {
this.subscribeLatch.countDown();
this.subscribeFirstLatch.countDown();
this.subscribeSecondLatch.countDown();
this.subscribeThirdLatch.countDown();
}

}
Expand Down