Skip to content
Closed

Gh 4014 #8554

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@
*/
public abstract class AbstractMqttClientManager<T, C> implements ClientManager<T, C>, ApplicationEventPublisherAware {

/**
* The default completion timeout in milliseconds.
* @since 6.0.3
*/
public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L;

protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR

private static final int DEFAULT_MANAGER_PHASE = 0;
Expand All @@ -67,6 +73,8 @@ public abstract class AbstractMqttClientManager<T, C> implements ClientManager<T

private T client;

private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;

protected AbstractMqttClientManager(String clientId) {
Assert.notNull(clientId, "'clientId' is required");
this.clientId = clientId;
Expand Down Expand Up @@ -154,6 +162,21 @@ public String getBeanName() {
return this.beanName;
}

/**
* Set the completion timeout for operations. Not settable using the namespace.
* Default {@value #DEFAULT_COMPLETION_TIMEOUT} milliseconds.
* @param completionTimeout The timeout.
* @since 6.0.3
* @author Jiří Patera
*/
public void setCompletionTimeout(long completionTimeout) {
this.completionTimeout = completionTimeout;
}

protected long getCompletionTimeout() {
return this.completionTimeout;
}

/**
* The phase of component auto-start in {@link SmartLifecycle}.
* If the custom one is required, note that for the correct behavior it should be less than phase of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public synchronized void stop() {
return;
}
try {
client.disconnectForcibly(getDisconnectCompletionTimeout());
client.disconnectForcibly(getCompletionTimeout());
}
catch (MqttException e) {
logger.error("Could not disconnect from the client", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public synchronized void stop() {
}

try {
client.disconnectForcibly(getDisconnectCompletionTimeout());
client.disconnectForcibly(getCompletionTimeout());
}
catch (MqttException e) {
logger.error("Could not disconnect from the client", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ public class MqttPahoMessageDrivenChannelAdapter
extends AbstractMqttMessageDrivenChannelAdapter<IMqttAsyncClient, MqttConnectOptions>
implements MqttCallbackExtended, MqttPahoComponent {

/**
* The default disconnect completion timeout in milliseconds.
* @deprecated As of release 6.0.3, replaced by {@link AbstractMqttMessageDrivenChannelAdapter#DEFAULT_COMPLETION_TIMEOUT}
*/
@Deprecated(since = "6.0.3")
public static final long DISCONNECT_COMPLETION_TIMEOUT = DEFAULT_COMPLETION_TIMEOUT;

private final MqttPahoClientFactory clientFactory;

private volatile IMqttAsyncClient client;
Expand Down Expand Up @@ -131,6 +138,18 @@ public MqttPahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient, MqttC
this.clientFactory = factory;
}

/**
* Set the completion timeout when disconnecting. Not settable using the namespace.
* Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds.
* @param completionTimeout The timeout.
* @since 5.1.10
* @deprecated As of release 6.0.3, replaced by {@link #setCompletionTimeout(long)}
*/
@Deprecated(since = "6.0.3")
public synchronized void setDisconnectCompletionTimeout(long completionTimeout) {
setCompletionTimeout(completionTimeout);
}

@Override
public MqttConnectOptions getConnectionInfo() {
MqttConnectOptions options = this.clientFactory.getConnectionOptions();
Expand Down Expand Up @@ -226,7 +245,7 @@ protected synchronized void doStop() {
}

try {
this.client.disconnectForcibly(getDisconnectCompletionTimeout());
this.client.disconnectForcibly(getCompletionTimeout());
}
catch (MqttException ex) {
logger.error(ex, "Exception while disconnecting");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ public void testDifferentQos() throws Exception {

new DirectFieldAccessor(adapter).setPropertyValue("running", Boolean.TRUE);
adapter.stop();
verify(client).disconnectForcibly(5_000L);
verify(client).disconnectForcibly(30_000L);
}

private MqttPahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttAsyncClient client, Boolean cleanSession)
Expand Down