Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class ClusterServiceIT extends ESIntegTestCase {

private static final TimeValue TEN_SECONDS = TimeValue.timeValueSeconds(10L);

public void testAckedUpdateTask() throws Exception {
internalCluster().startNode();
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
Expand All @@ -56,7 +58,8 @@ public void testAckedUpdateTask() throws Exception {
final AtomicBoolean executed = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch processedLatch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask<Void>(null, null) {
clusterService.submitStateUpdateTask("test",
new AckedClusterStateUpdateTask<Void>(MasterServiceTests.ackedRequest(TEN_SECONDS, TEN_SECONDS), null) {
@Override
protected Void newResponse(boolean acknowledged) {
return null;
Expand All @@ -79,16 +82,6 @@ public void onAckTimeout() {
latch.countDown();
}

@Override
public TimeValue ackTimeout() {
return TimeValue.timeValueSeconds(10);
}

@Override
public TimeValue timeout() {
return TimeValue.timeValueSeconds(10);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
processedLatch.countDown();
Expand Down Expand Up @@ -129,7 +122,8 @@ public void testAckedUpdateTaskSameClusterState() throws Exception {
final AtomicBoolean executed = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch processedLatch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask<Void>(null, null) {
clusterService.submitStateUpdateTask("test",
new AckedClusterStateUpdateTask<Void>(MasterServiceTests.ackedRequest(TEN_SECONDS, TEN_SECONDS), null) {
@Override
protected Void newResponse(boolean acknowledged) {
return null;
Expand All @@ -147,16 +141,6 @@ public void onAckTimeout() {
latch.countDown();
}

@Override
public TimeValue ackTimeout() {
return TimeValue.timeValueSeconds(10);
}

@Override
public TimeValue timeout() {
return TimeValue.timeValueSeconds(10);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
processedLatch.countDown();
Expand Down Expand Up @@ -196,7 +180,9 @@ public void testAckedUpdateTaskNoAckExpected() throws Exception {
final AtomicBoolean onFailure = new AtomicBoolean(false);
final AtomicBoolean executed = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask<Void>(null, null) {

clusterService.submitStateUpdateTask(
"test", new AckedClusterStateUpdateTask<Void>(MasterServiceTests.ackedRequest(TEN_SECONDS, TEN_SECONDS), null) {
@Override
protected Void newResponse(boolean acknowledged) {
return null;
Expand All @@ -219,16 +205,6 @@ public void onAckTimeout() {
latch.countDown();
}

@Override
public TimeValue ackTimeout() {
return TimeValue.timeValueSeconds(10);
}

@Override
public TimeValue timeout() {
return TimeValue.timeValueSeconds(10);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
Expand Down Expand Up @@ -266,7 +242,8 @@ public void testAckedUpdateTaskTimeoutZero() throws Exception {
final AtomicBoolean executed = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch processedLatch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask<Void>(null, null) {
clusterService.submitStateUpdateTask("test",
new AckedClusterStateUpdateTask<Void>(MasterServiceTests.ackedRequest(TimeValue.ZERO, TEN_SECONDS), null) {
@Override
protected Void newResponse(boolean acknowledged) {
return null;
Expand All @@ -289,16 +266,6 @@ public void onAckTimeout() {
latch.countDown();
}

@Override
public TimeValue ackTimeout() {
return TimeValue.timeValueSeconds(0);
}

@Override
public TimeValue timeout() {
return TimeValue.timeValueSeconds(10);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
processedLatch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ public void onTimeout(TimeValue timeout) {

private void submitClearVotingConfigExclusionsTask(ClearVotingConfigExclusionsRequest request, long startTimeMillis,
ActionListener<ClearVotingConfigExclusionsResponse> listener) {
clusterService.submitStateUpdateTask("clear-voting-config-exclusions", new ClusterStateUpdateTask(Priority.URGENT) {
clusterService.submitStateUpdateTask("clear-voting-config-exclusions", new ClusterStateUpdateTask(Priority.URGENT,
TimeValue.timeValueMillis(
Math.max(0, request.getTimeout().millis() + startTimeMillis - threadPool.relativeTimeInMillis()))) {
@Override
public ClusterState execute(ClusterState currentState) {
final CoordinationMetadata newCoordinationMetadata =
Expand All @@ -119,11 +121,6 @@ public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public TimeValue timeout() {
return TimeValue.timeValueMillis(request.getTimeout().millis() + startTimeMillis - threadPool.relativeTimeInMillis());
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new ClearVotingConfigExclusionsResponse());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,12 @@ public void onFailure(String source, Exception e) {
} else {
final TimeValue taskTimeout = TimeValue.timeValueMillis(Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis()));
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])",
new ClusterStateUpdateTask(request.waitForEvents()) {
new ClusterStateUpdateTask(request.waitForEvents(), taskTimeout) {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}

@Override
public TimeValue timeout() {
return taskTimeout;
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
final long timeoutInMillis = Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener<Respo
}

protected AckedClusterStateUpdateTask(Priority priority, AckedRequest request, ActionListener<Response> listener) {
super(priority);
super(priority, request.masterNodeTimeout());
this.listener = listener;
this.request = request;
}
Expand Down Expand Up @@ -82,12 +82,7 @@ public void onFailure(String source, Exception e) {
/**
* Acknowledgement timeout, maximum time interval to wait for acknowledgements
*/
public TimeValue ackTimeout() {
public final TimeValue ackTimeout() {
return request.ackTimeout();
}

@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,24 @@ public abstract class ClusterStateUpdateTask

private final Priority priority;

@Nullable
private final TimeValue timeout;

public ClusterStateUpdateTask() {
this(Priority.NORMAL);
}

public ClusterStateUpdateTask(Priority priority) {
this(priority, null);
}

public ClusterStateUpdateTask(TimeValue timeout) {
this(Priority.NORMAL, timeout);
}

public ClusterStateUpdateTask(Priority priority, TimeValue timeout) {
this.priority = priority;
this.timeout = timeout;
}

@Override
Expand Down Expand Up @@ -75,12 +87,12 @@ public final void clusterStatePublished(ClusterChangedEvent clusterChangedEvent)
* {@link ClusterStateTaskListener#onFailure(String, Exception)}. May return null to indicate no timeout is needed (default).
*/
@Nullable
public TimeValue timeout() {
return null;
public final TimeValue timeout() {
return timeout;
}

@Override
public Priority priority() {
public final Priority priority() {
return priority;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
Expand Down Expand Up @@ -160,7 +159,7 @@ public void closeIndices(final CloseIndexClusterStateUpdateRequest request, fina
}

clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices),
new ClusterStateUpdateTask(Priority.URGENT) {
new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) {

private final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();

Expand Down Expand Up @@ -235,11 +234,6 @@ public void clusterStateProcessed(final String source,
public void onFailure(final String source, final Exception e) {
listener.onFailure(e);
}

@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
}
);
}
Expand Down Expand Up @@ -411,7 +405,7 @@ public void addIndexBlock(AddIndexBlockClusterStateUpdateRequest request,
}

clusterService.submitStateUpdateTask("add-index-block-[" + request.getBlock().name + "]-" + Arrays.toString(concreteIndices),
new ClusterStateUpdateTask(Priority.URGENT) {
new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) {

private Map<Index, ClusterBlock> blockedIndices;

Expand Down Expand Up @@ -473,11 +467,6 @@ public void clusterStateProcessed(final String source,
public void onFailure(final String source, final Exception e) {
listener.onFailure(e);
}

@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,8 @@ public MetadataIndexTemplateService(ClusterService clusterService,
}

public void removeTemplates(final RemoveRequest request, final RemoveListener listener) {
clusterService.submitStateUpdateTask("remove-index-template [" + request.name + "]", new ClusterStateUpdateTask(Priority.URGENT) {

@Override
public TimeValue timeout() {
return request.masterTimeout;
}
clusterService.submitStateUpdateTask(
"remove-index-template [" + request.name + "]", new ClusterStateUpdateTask(Priority.URGENT, request.masterTimeout) {

@Override
public void onFailure(String source, Exception e) {
Expand Down Expand Up @@ -171,12 +167,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
public void putComponentTemplate(final String cause, final boolean create, final String name, final TimeValue masterTimeout,
final ComponentTemplate template, final ActionListener<AcknowledgedResponse> listener) {
clusterService.submitStateUpdateTask("create-component-template [" + name + "], cause [" + cause + "]",
new ClusterStateUpdateTask(Priority.URGENT) {

@Override
public TimeValue timeout() {
return masterTimeout;
}
new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {

@Override
public void onFailure(String source, Exception e) {
Expand Down Expand Up @@ -305,12 +296,7 @@ public void removeComponentTemplate(final String name, final TimeValue masterTim
final ActionListener<AcknowledgedResponse> listener) {
validateNotInUse(clusterService.state().metadata(), name);
clusterService.submitStateUpdateTask("remove-component-template [" + name + "]",
new ClusterStateUpdateTask(Priority.URGENT) {

@Override
public TimeValue timeout() {
return masterTimeout;
}
new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {

@Override
public void onFailure(String source, Exception e) {
Expand Down Expand Up @@ -384,12 +370,7 @@ public void putIndexTemplateV2(final String cause, final boolean create, final S
final ComposableIndexTemplate template, final ActionListener<AcknowledgedResponse> listener) {
validateV2TemplateRequest(clusterService.state().metadata(), name, template);
clusterService.submitStateUpdateTask("create-index-template-v2 [" + name + "], cause [" + cause + "]",
new ClusterStateUpdateTask(Priority.URGENT) {

@Override
public TimeValue timeout() {
return masterTimeout;
}
new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {

@Override
public void onFailure(String source, Exception e) {
Expand Down Expand Up @@ -639,12 +620,7 @@ static Map<String, List<String>> findConflictingV2Templates(final ClusterState s
public void removeIndexTemplateV2(final String name, final TimeValue masterTimeout,
final ActionListener<AcknowledgedResponse> listener) {
clusterService.submitStateUpdateTask("remove-index-template-v2 [" + name + "]",
new ClusterStateUpdateTask(Priority.URGENT) {

@Override
public TimeValue timeout() {
return masterTimeout;
}
new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {

@Override
public void onFailure(String source, Exception e) {
Expand Down Expand Up @@ -734,12 +710,7 @@ public void putTemplate(final PutRequest request, final PutListener listener) {
final IndexTemplateMetadata.Builder templateBuilder = IndexTemplateMetadata.builder(request.name);

clusterService.submitStateUpdateTask("create-index-template [" + request.name + "], cause [" + request.cause + "]",
new ClusterStateUpdateTask(Priority.URGENT) {

@Override
public TimeValue timeout() {
return request.masterTimeout;
}
new ClusterStateUpdateTask(Priority.URGENT, request.masterTimeout) {

@Override
public void onFailure(String source, Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
Expand Down Expand Up @@ -361,7 +360,7 @@ public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUp
final RepositoryMetadata repositoryMetadataStart = metadata;
getRepositoryData(ActionListener.wrap(repositoryData -> {
final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData);
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) {
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority(), updateTask.timeout()) {

private boolean executedTask = false;

Expand Down Expand Up @@ -397,11 +396,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
executeConsistentStateUpdate(createUpdateTask, source, onFailure);
}
}

@Override
public TimeValue timeout() {
return updateTask.timeout();
}
});
}, onFailure));
}
Expand Down
Loading