Skip to content

Commit 34f4ca7

Browse files
committed
[Cluster] Refactored ClusterStateUpdateTask protection against execution on a non master
Previous implementation used a marker interface and had no explicit failure call back for the case update task was run on a non master (i.e., the master stepped down after it was submitted). That lead to a couple of instance of checks. This approach moves ClusterStateUpdateTask from an interface to an abstract class, which allows adding a flag to indicate whether it should only run on master nodes (defaults to true). It also adds an explicit onNoLongerMaster call back to allow different error handling for that case. This also removed the need for the NoLongerMaster. Closes #7511
1 parent 596a4a0 commit 34f4ca7

13 files changed

+124
-81
lines changed

src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,16 +137,17 @@ protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
137137
return new ClusterUpdateSettingsResponse(updateSettingsAcked && acknowledged, transientUpdates.build(), persistentUpdates.build());
138138
}
139139

140+
@Override
141+
public void onNoLongerMaster(String source) {
142+
logger.debug("failed to preform reroute after cluster settings were updated - current node is no longer a master");
143+
listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, transientUpdates.build(), persistentUpdates.build()));
144+
}
145+
140146
@Override
141147
public void onFailure(String source, Throwable t) {
142148
//if the reroute fails we only log
143-
if (t instanceof ClusterService.NoLongerMasterException) {
144-
logger.debug("failed to preform reroute after cluster settings were updated - current node is no longer a master");
145-
listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, transientUpdates.build(), persistentUpdates.build()));
146-
} else {
147-
logger.debug("failed to perform [{}]", t, source);
148-
listener.onFailure(new ElasticsearchException("reroute after update settings failed", t));
149-
}
149+
logger.debug("failed to perform [{}]", t, source);
150+
listener.onFailure(new ElasticsearchException("reroute after update settings failed", t));
150151
}
151152

152153
@Override

src/main/java/org/elasticsearch/action/bench/BenchmarkService.java

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,11 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
6666
/**
6767
* Constructs a service component for running benchmarks
6868
*
69-
* @param settings Settings
70-
* @param clusterService Cluster service
71-
* @param threadPool Thread pool
72-
* @param client Client
73-
* @param transportService Transport service
69+
* @param settings Settings
70+
* @param clusterService Cluster service
71+
* @param threadPool Thread pool
72+
* @param client Client
73+
* @param transportService Transport service
7474
*/
7575
@Inject
7676
public BenchmarkService(Settings settings, ClusterService clusterService, ThreadPool threadPool,
@@ -86,19 +86,22 @@ public BenchmarkService(Settings settings, ClusterService clusterService, Thread
8686
}
8787

8888
@Override
89-
protected void doStart() throws ElasticsearchException { }
89+
protected void doStart() throws ElasticsearchException {
90+
}
9091

9192
@Override
92-
protected void doStop() throws ElasticsearchException { }
93+
protected void doStop() throws ElasticsearchException {
94+
}
9395

9496
@Override
95-
protected void doClose() throws ElasticsearchException { }
97+
protected void doClose() throws ElasticsearchException {
98+
}
9699

97100
/**
98101
* Lists actively running benchmarks on the cluster
99102
*
100-
* @param request Status request
101-
* @param listener Response listener
103+
* @param request Status request
104+
* @param listener Response listener
102105
*/
103106
public void listBenchmarks(final BenchmarkStatusRequest request, final ActionListener<BenchmarkStatusResponse> listener) {
104107

@@ -171,8 +174,8 @@ public void onFailure(Throwable t) {
171174
/**
172175
* Executes benchmarks on the cluster
173176
*
174-
* @param request Benchmark request
175-
* @param listener Response listener
177+
* @param request Benchmark request
178+
* @param listener Response listener
176179
*/
177180
public void startBenchmark(final BenchmarkRequest request, final ActionListener<BenchmarkResponse> listener) {
178181

@@ -228,7 +231,7 @@ public void onFailure(Throwable t) {
228231
listener.onFailure(t);
229232
}
230233
}, (benchmarkResponse.state() != BenchmarkResponse.State.ABORTED) &&
231-
(benchmarkResponse.state() != BenchmarkResponse.State.FAILED)));
234+
(benchmarkResponse.state() != BenchmarkResponse.State.FAILED)));
232235
}
233236

234237
private final boolean isBenchmarkNode(DiscoveryNode node) {
@@ -403,6 +406,7 @@ protected CountDownAsyncHandler(int size) {
403406
}
404407

405408
public abstract T newInstance();
409+
406410
protected abstract void sendResponse();
407411

408412
@Override
@@ -593,7 +597,7 @@ public ClusterState execute(ClusterState currentState) {
593597

594598
if (bmd != null) {
595599
for (BenchmarkMetaData.Entry entry : bmd.entries()) {
596-
if (request.benchmarkName().equals(entry.benchmarkId())){
600+
if (request.benchmarkName().equals(entry.benchmarkId())) {
597601
if (entry.state() != BenchmarkMetaData.State.SUCCESS && entry.state() != BenchmarkMetaData.State.FAILED) {
598602
throw new ElasticsearchException("A benchmark with ID [" + request.benchmarkName() + "] is already running in state [" + entry.state() + "]");
599603
}
@@ -648,7 +652,7 @@ public FinishBenchmarkTask(String reason, String benchmarkId, BenchmarkStateList
648652
@Override
649653
protected BenchmarkMetaData.Entry process(BenchmarkMetaData.Entry entry) {
650654
BenchmarkMetaData.State state = entry.state();
651-
assert state == BenchmarkMetaData.State.STARTED || state == BenchmarkMetaData.State.ABORTED : "Expected state: STARTED or ABORTED but was: " + entry.state();
655+
assert state == BenchmarkMetaData.State.STARTED || state == BenchmarkMetaData.State.ABORTED : "Expected state: STARTED or ABORTED but was: " + entry.state();
652656
if (success) {
653657
return new BenchmarkMetaData.Entry(entry, BenchmarkMetaData.State.SUCCESS);
654658
} else {
@@ -661,7 +665,7 @@ public final class AbortBenchmarkTask extends UpdateBenchmarkStateTask {
661665
private final String[] patterns;
662666

663667
public AbortBenchmarkTask(String[] patterns, BenchmarkStateListener listener) {
664-
super("abort_benchmark", null , listener);
668+
super("abort_benchmark", null, listener);
665669
this.patterns = patterns;
666670
}
667671

@@ -675,7 +679,7 @@ protected BenchmarkMetaData.Entry process(BenchmarkMetaData.Entry entry) {
675679
}
676680
}
677681

678-
public abstract class UpdateBenchmarkStateTask implements ProcessedClusterStateUpdateTask {
682+
public abstract class UpdateBenchmarkStateTask extends ProcessedClusterStateUpdateTask {
679683

680684
private final String reason;
681685
protected final String benchmarkId;
@@ -702,7 +706,7 @@ public ClusterState execute(ClusterState currentState) {
702706
ImmutableList.Builder<BenchmarkMetaData.Entry> builder = new ImmutableList.Builder<BenchmarkMetaData.Entry>();
703707
for (BenchmarkMetaData.Entry e : bmd.entries()) {
704708
if (benchmarkId == null || match(e)) {
705-
e = process(e) ;
709+
e = process(e);
706710
instances.add(e);
707711
}
708712
// Don't keep finished benchmarks around in cluster state
@@ -741,7 +745,7 @@ public String reason() {
741745
}
742746
}
743747

744-
public abstract class BenchmarkStateChangeAction<R extends MasterNodeOperationRequest> implements TimeoutClusterStateUpdateTask {
748+
public abstract class BenchmarkStateChangeAction<R extends MasterNodeOperationRequest> extends TimeoutClusterStateUpdateTask {
745749
protected final R request;
746750

747751
public BenchmarkStateChangeAction(R request) {

src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
2929
* all the nodes have acknowledged a cluster state update request
3030
*/
31-
public abstract class AckedClusterStateUpdateTask<Response> implements TimeoutClusterStateUpdateTask {
31+
public abstract class AckedClusterStateUpdateTask<Response> extends TimeoutClusterStateUpdateTask {
3232

3333
private final ActionListener<Response> listener;
3434
private final AckedRequest request;
@@ -40,6 +40,7 @@ protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener<Respo
4040

4141
/**
4242
* Called to determine which nodes the acknowledgement is expected from
43+
*
4344
* @param discoveryNode a node
4445
* @return true if the node is expected to send ack back, false otherwise
4546
*/
@@ -50,6 +51,7 @@ public boolean mustAck(DiscoveryNode discoveryNode) {
5051
/**
5152
* Called once all the nodes have acknowledged the cluster state update request. Must be
5253
* very lightweight execution, since it gets executed on the cluster service thread.
54+
*
5355
* @param t optional error that might have been thrown
5456
*/
5557
public void onAllNodesAcked(@Nullable Throwable t) {

src/main/java/org/elasticsearch/cluster/ClusterService.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -111,15 +111,4 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
111111
*/
112112
List<PendingClusterTask> pendingTasks();
113113

114-
/**
115-
* an exception to indicate a {@link org.elasticsearch.cluster.ClusterStateUpdateTask} was not executed as
116-
* the current node is no longer master
117-
*/
118-
public static class NoLongerMasterException extends ElasticsearchIllegalStateException {
119-
120-
public NoLongerMasterException(String msg) {
121-
super(msg);
122-
}
123-
124-
}
125114
}

src/main/java/org/elasticsearch/cluster/ClusterStateNonMasterUpdateTask.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,10 @@
2323
* This is a marker interface to indicate that the task should be executed
2424
* even if the current node is not a master.
2525
*/
26-
public interface ClusterStateNonMasterUpdateTask extends ClusterStateUpdateTask {
26+
public abstract class ClusterStateNonMasterUpdateTask extends ClusterStateUpdateTask {
27+
28+
@Override
29+
public boolean runOnlyOnMaster() {
30+
return false;
31+
}
2732
}

src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,37 @@
1919

2020
package org.elasticsearch.cluster;
2121

22+
import org.elasticsearch.common.Nullable;
23+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
24+
2225
/**
2326
* A task that can update the cluster state.
2427
*/
25-
public interface ClusterStateUpdateTask {
28+
abstract public class ClusterStateUpdateTask {
2629

2730
/**
2831
* Update the cluster state based on the current state. Return the *same instance* if no state
2932
* should be changed.
3033
*/
31-
ClusterState execute(ClusterState currentState) throws Exception;
34+
abstract public ClusterState execute(ClusterState currentState) throws Exception;
3235

3336
/**
3437
* A callback called when execute fails.
3538
*/
36-
void onFailure(String source, Throwable t);
39+
abstract public void onFailure(String source, @Nullable Throwable t);
40+
41+
42+
/**
43+
* indicates whether this task should only run if current node is master
44+
*/
45+
public boolean runOnlyOnMaster() {
46+
return true;
47+
}
48+
49+
/**
50+
* called when the task was rejected because the local node is no longer master
51+
*/
52+
public void onNoLongerMaster(String source) {
53+
onFailure(source, new EsRejectedExecutionException("no longer master. source: [" + source + "]"));
54+
}
3755
}

src/main/java/org/elasticsearch/cluster/ProcessedClusterStateNonMasterUpdateTask.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,13 @@
1919
package org.elasticsearch.cluster;
2020

2121
/**
22-
* A combination interface between {@link org.elasticsearch.cluster.ProcessedClusterStateUpdateTask} and
22+
* A combination between {@link org.elasticsearch.cluster.ProcessedClusterStateUpdateTask} and
2323
* {@link org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask} to allow easy creation of anonymous classes
2424
*/
25-
public interface ProcessedClusterStateNonMasterUpdateTask extends ProcessedClusterStateUpdateTask, ClusterStateNonMasterUpdateTask {
25+
abstract public class ProcessedClusterStateNonMasterUpdateTask extends ProcessedClusterStateUpdateTask {
26+
27+
@Override
28+
public boolean runOnlyOnMaster() {
29+
return false;
30+
}
2631
}

src/main/java/org/elasticsearch/cluster/ProcessedClusterStateUpdateTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@
2323
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
2424
* the cluster state update has been processed.
2525
*/
26-
public interface ProcessedClusterStateUpdateTask extends ClusterStateUpdateTask {
26+
public abstract class ProcessedClusterStateUpdateTask extends ClusterStateUpdateTask {
2727

2828
/**
2929
* Called when the result of the {@link #execute(ClusterState)} have been processed
3030
* properly by all listeners.
3131
*/
32-
void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState);
32+
public abstract void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState);
3333
}

src/main/java/org/elasticsearch/cluster/TimeoutClusterStateUpdateTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@
2525
* An extension interface to {@link org.elasticsearch.cluster.ClusterStateUpdateTask} that allows to associate
2626
* a timeout.
2727
*/
28-
public interface TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask {
28+
abstract public class TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask {
2929

3030
/**
3131
* If the cluster state update task wasn't processed by the provided timeout, call
3232
* {@link #onFailure(String, Throwable)}
3333
*/
34-
TimeValue timeout();
34+
abstract public TimeValue timeout();
3535
}

src/main/java/org/elasticsearch/cluster/routing/RoutingService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,12 +149,15 @@ public ClusterState execute(ClusterState currentState) {
149149
return ClusterState.builder(currentState).routingResult(routingResult).build();
150150
}
151151

152+
@Override
153+
public void onNoLongerMaster(String source) {
154+
// no biggie
155+
}
156+
152157
@Override
153158
public void onFailure(String source, Throwable t) {
154-
if (!(t instanceof ClusterService.NoLongerMasterException)) {
155159
ClusterState state = clusterService.state();
156160
logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
157-
}
158161
}
159162
});
160163
routingTableDirty = false;

0 commit comments

Comments
 (0)