Skip to content

Commit ef4ea4a

Browse files
Simplify ClusterStateUpdateTask Timeout Handling (#64117)
It's confusing and slightly error prone (see #64116) to handle the timeouts via overrides but the priority via a field. This simplifies the code to to avoid future issues and save over 100 LOC. Also this fixes a bug in `TransportVotingConfigExclusionsAction` where trying to instantiate a time value with a negative time could throw and unexpected exception and as a result leak a listener.
1 parent b245348 commit ef4ea4a

File tree

15 files changed

+79
-202
lines changed

15 files changed

+79
-202
lines changed

server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java

Lines changed: 11 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
4747
public class ClusterServiceIT extends ESIntegTestCase {
4848

49+
private static final TimeValue TEN_SECONDS = TimeValue.timeValueSeconds(10L);
50+
4951
public void testAckedUpdateTask() throws Exception {
5052
internalCluster().startNode();
5153
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
@@ -56,7 +58,8 @@ public void testAckedUpdateTask() throws Exception {
5658
final AtomicBoolean executed = new AtomicBoolean(false);
5759
final CountDownLatch latch = new CountDownLatch(1);
5860
final CountDownLatch processedLatch = new CountDownLatch(1);
59-
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask<Void>(null, null) {
61+
clusterService.submitStateUpdateTask("test",
62+
new AckedClusterStateUpdateTask<Void>(MasterServiceTests.ackedRequest(TEN_SECONDS, TEN_SECONDS), null) {
6063
@Override
6164
protected Void newResponse(boolean acknowledged) {
6265
return null;
@@ -79,16 +82,6 @@ public void onAckTimeout() {
7982
latch.countDown();
8083
}
8184

82-
@Override
83-
public TimeValue ackTimeout() {
84-
return TimeValue.timeValueSeconds(10);
85-
}
86-
87-
@Override
88-
public TimeValue timeout() {
89-
return TimeValue.timeValueSeconds(10);
90-
}
91-
9285
@Override
9386
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
9487
processedLatch.countDown();
@@ -129,7 +122,8 @@ public void testAckedUpdateTaskSameClusterState() throws Exception {
129122
final AtomicBoolean executed = new AtomicBoolean(false);
130123
final CountDownLatch latch = new CountDownLatch(1);
131124
final CountDownLatch processedLatch = new CountDownLatch(1);
132-
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask<Void>(null, null) {
125+
clusterService.submitStateUpdateTask("test",
126+
new AckedClusterStateUpdateTask<Void>(MasterServiceTests.ackedRequest(TEN_SECONDS, TEN_SECONDS), null) {
133127
@Override
134128
protected Void newResponse(boolean acknowledged) {
135129
return null;
@@ -147,16 +141,6 @@ public void onAckTimeout() {
147141
latch.countDown();
148142
}
149143

150-
@Override
151-
public TimeValue ackTimeout() {
152-
return TimeValue.timeValueSeconds(10);
153-
}
154-
155-
@Override
156-
public TimeValue timeout() {
157-
return TimeValue.timeValueSeconds(10);
158-
}
159-
160144
@Override
161145
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
162146
processedLatch.countDown();
@@ -196,7 +180,9 @@ public void testAckedUpdateTaskNoAckExpected() throws Exception {
196180
final AtomicBoolean onFailure = new AtomicBoolean(false);
197181
final AtomicBoolean executed = new AtomicBoolean(false);
198182
final CountDownLatch latch = new CountDownLatch(1);
199-
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask<Void>(null, null) {
183+
184+
clusterService.submitStateUpdateTask(
185+
"test", new AckedClusterStateUpdateTask<Void>(MasterServiceTests.ackedRequest(TEN_SECONDS, TEN_SECONDS), null) {
200186
@Override
201187
protected Void newResponse(boolean acknowledged) {
202188
return null;
@@ -219,16 +205,6 @@ public void onAckTimeout() {
219205
latch.countDown();
220206
}
221207

222-
@Override
223-
public TimeValue ackTimeout() {
224-
return TimeValue.timeValueSeconds(10);
225-
}
226-
227-
@Override
228-
public TimeValue timeout() {
229-
return TimeValue.timeValueSeconds(10);
230-
}
231-
232208
@Override
233209
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
234210
}
@@ -266,7 +242,8 @@ public void testAckedUpdateTaskTimeoutZero() throws Exception {
266242
final AtomicBoolean executed = new AtomicBoolean(false);
267243
final CountDownLatch latch = new CountDownLatch(1);
268244
final CountDownLatch processedLatch = new CountDownLatch(1);
269-
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask<Void>(null, null) {
245+
clusterService.submitStateUpdateTask("test",
246+
new AckedClusterStateUpdateTask<Void>(MasterServiceTests.ackedRequest(TimeValue.ZERO, TEN_SECONDS), null) {
270247
@Override
271248
protected Void newResponse(boolean acknowledged) {
272249
return null;
@@ -289,16 +266,6 @@ public void onAckTimeout() {
289266
latch.countDown();
290267
}
291268

292-
@Override
293-
public TimeValue ackTimeout() {
294-
return TimeValue.timeValueSeconds(0);
295-
}
296-
297-
@Override
298-
public TimeValue timeout() {
299-
return TimeValue.timeValueSeconds(10);
300-
}
301-
302269
@Override
303270
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
304271
processedLatch.countDown();

server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,9 @@ public void onTimeout(TimeValue timeout) {
105105

106106
private void submitClearVotingConfigExclusionsTask(ClearVotingConfigExclusionsRequest request, long startTimeMillis,
107107
ActionListener<ClearVotingConfigExclusionsResponse> listener) {
108-
clusterService.submitStateUpdateTask("clear-voting-config-exclusions", new ClusterStateUpdateTask(Priority.URGENT) {
108+
clusterService.submitStateUpdateTask("clear-voting-config-exclusions", new ClusterStateUpdateTask(Priority.URGENT,
109+
TimeValue.timeValueMillis(
110+
Math.max(0, request.getTimeout().millis() + startTimeMillis - threadPool.relativeTimeInMillis()))) {
109111
@Override
110112
public ClusterState execute(ClusterState currentState) {
111113
final CoordinationMetadata newCoordinationMetadata =
@@ -120,11 +122,6 @@ public void onFailure(String source, Exception e) {
120122
listener.onFailure(e);
121123
}
122124

123-
@Override
124-
public TimeValue timeout() {
125-
return TimeValue.timeValueMillis(request.getTimeout().millis() + startTimeMillis - threadPool.relativeTimeInMillis());
126-
}
127-
128125
@Override
129126
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
130127
listener.onResponse(new ClearVotingConfigExclusionsResponse());

server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -120,17 +120,12 @@ public void onFailure(String source, Exception e) {
120120
} else {
121121
final TimeValue taskTimeout = TimeValue.timeValueMillis(Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis()));
122122
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])",
123-
new ClusterStateUpdateTask(request.waitForEvents()) {
123+
new ClusterStateUpdateTask(request.waitForEvents(), taskTimeout) {
124124
@Override
125125
public ClusterState execute(ClusterState currentState) {
126126
return currentState;
127127
}
128128

129-
@Override
130-
public TimeValue timeout() {
131-
return taskTimeout;
132-
}
133-
134129
@Override
135130
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
136131
final long timeoutInMillis = Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis());

server/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener<Respo
3939
}
4040

4141
protected AckedClusterStateUpdateTask(Priority priority, AckedRequest request, ActionListener<Response> listener) {
42-
super(priority);
42+
super(priority, request.masterNodeTimeout());
4343
this.listener = listener;
4444
this.request = request;
4545
}
@@ -82,12 +82,7 @@ public void onFailure(String source, Exception e) {
8282
/**
8383
* Acknowledgement timeout, maximum time interval to wait for acknowledgements
8484
*/
85-
public TimeValue ackTimeout() {
85+
public final TimeValue ackTimeout() {
8686
return request.ackTimeout();
8787
}
88-
89-
@Override
90-
public TimeValue timeout() {
91-
return request.masterNodeTimeout();
92-
}
9388
}

server/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,24 @@ public abstract class ClusterStateUpdateTask
3333

3434
private final Priority priority;
3535

36+
@Nullable
37+
private final TimeValue timeout;
38+
3639
public ClusterStateUpdateTask() {
3740
this(Priority.NORMAL);
3841
}
3942

4043
public ClusterStateUpdateTask(Priority priority) {
44+
this(priority, null);
45+
}
46+
47+
public ClusterStateUpdateTask(TimeValue timeout) {
48+
this(Priority.NORMAL, timeout);
49+
}
50+
51+
public ClusterStateUpdateTask(Priority priority, TimeValue timeout) {
4152
this.priority = priority;
53+
this.timeout = timeout;
4254
}
4355

4456
@Override
@@ -75,12 +87,12 @@ public final void clusterStatePublished(ClusterChangedEvent clusterChangedEvent)
7587
* {@link ClusterStateTaskListener#onFailure(String, Exception)}. May return null to indicate no timeout is needed (default).
7688
*/
7789
@Nullable
78-
public TimeValue timeout() {
79-
return null;
90+
public final TimeValue timeout() {
91+
return timeout;
8092
}
8193

8294
@Override
83-
public Priority priority() {
95+
public final Priority priority() {
8496
return priority;
8597
}
8698

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
import org.elasticsearch.common.inject.Inject;
6565
import org.elasticsearch.common.settings.Setting;
6666
import org.elasticsearch.common.settings.Settings;
67-
import org.elasticsearch.common.unit.TimeValue;
6867
import org.elasticsearch.common.util.concurrent.AtomicArray;
6968
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
7069
import org.elasticsearch.common.util.concurrent.CountDown;
@@ -158,7 +157,7 @@ public void closeIndices(final CloseIndexClusterStateUpdateRequest request, fina
158157
}
159158

160159
clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices),
161-
new ClusterStateUpdateTask(Priority.URGENT) {
160+
new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) {
162161

163162
private final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
164163

@@ -233,11 +232,6 @@ public void clusterStateProcessed(final String source,
233232
public void onFailure(final String source, final Exception e) {
234233
listener.onFailure(e);
235234
}
236-
237-
@Override
238-
public TimeValue timeout() {
239-
return request.masterNodeTimeout();
240-
}
241235
}
242236
);
243237
}
@@ -409,7 +403,7 @@ public void addIndexBlock(AddIndexBlockClusterStateUpdateRequest request,
409403
}
410404

411405
clusterService.submitStateUpdateTask("add-index-block-[" + request.getBlock().name + "]-" + Arrays.toString(concreteIndices),
412-
new ClusterStateUpdateTask(Priority.URGENT) {
406+
new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) {
413407

414408
private Map<Index, ClusterBlock> blockedIndices;
415409

@@ -471,11 +465,6 @@ public void clusterStateProcessed(final String source,
471465
public void onFailure(final String source, final Exception e) {
472466
listener.onFailure(e);
473467
}
474-
475-
@Override
476-
public TimeValue timeout() {
477-
return request.masterNodeTimeout();
478-
}
479468
}
480469
);
481470
}

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java

Lines changed: 7 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,8 @@ public MetadataIndexTemplateService(ClusterService clusterService,
118118
}
119119

120120
public void removeTemplates(final RemoveRequest request, final RemoveListener listener) {
121-
clusterService.submitStateUpdateTask("remove-index-template [" + request.name + "]", new ClusterStateUpdateTask(Priority.URGENT) {
122-
123-
@Override
124-
public TimeValue timeout() {
125-
return request.masterTimeout;
126-
}
121+
clusterService.submitStateUpdateTask(
122+
"remove-index-template [" + request.name + "]", new ClusterStateUpdateTask(Priority.URGENT, request.masterTimeout) {
127123

128124
@Override
129125
public void onFailure(String source, Exception e) {
@@ -169,12 +165,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
169165
public void putComponentTemplate(final String cause, final boolean create, final String name, final TimeValue masterTimeout,
170166
final ComponentTemplate template, final ActionListener<AcknowledgedResponse> listener) {
171167
clusterService.submitStateUpdateTask("create-component-template [" + name + "], cause [" + cause + "]",
172-
new ClusterStateUpdateTask(Priority.URGENT) {
173-
174-
@Override
175-
public TimeValue timeout() {
176-
return masterTimeout;
177-
}
168+
new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {
178169

179170
@Override
180171
public void onFailure(String source, Exception e) {
@@ -321,12 +312,7 @@ public void removeComponentTemplate(final String name, final TimeValue masterTim
321312
final ActionListener<AcknowledgedResponse> listener) {
322313
validateNotInUse(clusterService.state().metadata(), name);
323314
clusterService.submitStateUpdateTask("remove-component-template [" + name + "]",
324-
new ClusterStateUpdateTask(Priority.URGENT) {
325-
326-
@Override
327-
public TimeValue timeout() {
328-
return masterTimeout;
329-
}
315+
new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {
330316

331317
@Override
332318
public void onFailure(String source, Exception e) {
@@ -400,12 +386,7 @@ public void putIndexTemplateV2(final String cause, final boolean create, final S
400386
final ComposableIndexTemplate template, final ActionListener<AcknowledgedResponse> listener) {
401387
validateV2TemplateRequest(clusterService.state().metadata(), name, template);
402388
clusterService.submitStateUpdateTask("create-index-template-v2 [" + name + "], cause [" + cause + "]",
403-
new ClusterStateUpdateTask(Priority.URGENT) {
404-
405-
@Override
406-
public TimeValue timeout() {
407-
return masterTimeout;
408-
}
389+
new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {
409390

410391
@Override
411392
public void onFailure(String source, Exception e) {
@@ -646,12 +627,7 @@ static Map<String, List<String>> findConflictingV2Templates(final ClusterState s
646627
public void removeIndexTemplateV2(final String name, final TimeValue masterTimeout,
647628
final ActionListener<AcknowledgedResponse> listener) {
648629
clusterService.submitStateUpdateTask("remove-index-template-v2 [" + name + "]",
649-
new ClusterStateUpdateTask(Priority.URGENT) {
650-
651-
@Override
652-
public TimeValue timeout() {
653-
return masterTimeout;
654-
}
630+
new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {
655631

656632
@Override
657633
public void onFailure(String source, Exception e) {
@@ -741,12 +717,7 @@ public void putTemplate(final PutRequest request, final PutListener listener) {
741717
final IndexTemplateMetadata.Builder templateBuilder = IndexTemplateMetadata.builder(request.name);
742718

743719
clusterService.submitStateUpdateTask("create-index-template [" + request.name + "], cause [" + request.cause + "]",
744-
new ClusterStateUpdateTask(Priority.URGENT) {
745-
746-
@Override
747-
public TimeValue timeout() {
748-
return request.masterTimeout;
749-
}
720+
new ClusterStateUpdateTask(Priority.URGENT, request.masterTimeout) {
750721

751722
@Override
752723
public void onFailure(String source, Exception e) {

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@
7777
import org.elasticsearch.common.settings.Settings;
7878
import org.elasticsearch.common.unit.ByteSizeUnit;
7979
import org.elasticsearch.common.unit.ByteSizeValue;
80-
import org.elasticsearch.common.unit.TimeValue;
8180
import org.elasticsearch.common.util.BigArrays;
8281
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
8382
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -360,7 +359,7 @@ public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUp
360359
final RepositoryMetadata repositoryMetadataStart = metadata;
361360
getRepositoryData(ActionListener.wrap(repositoryData -> {
362361
final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData);
363-
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) {
362+
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority(), updateTask.timeout()) {
364363

365364
private boolean executedTask = false;
366365

@@ -396,11 +395,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
396395
executeConsistentStateUpdate(createUpdateTask, source, onFailure);
397396
}
398397
}
399-
400-
@Override
401-
public TimeValue timeout() {
402-
return updateTask.timeout();
403-
}
404398
});
405399
}, onFailure));
406400
}

0 commit comments

Comments
 (0)