Skip to content

Commit 02b67a5

Browse files
committed
Use correct cluster state version for node fault detection (#30810)
Since its introduction in ES 1.4, node fault detection has been using the wrong cluster state version to send as part of the ping request, by using always the constant -1 (ClusterState.UNKNOWN_VERSION). This can, in an unfortunate series of events, lead to a situation where a previous stale master can regain its authority and revert the cluster to an older state. This commit makes NodesFaultDetection use the correct current cluster state for sending ping requests, avoiding the situation where a stale master possibly forces a newer master to step down and rejoin the stale one.
1 parent 9fac131 commit 02b67a5

File tree

3 files changed

+17
-9
lines changed

3 files changed

+17
-9
lines changed

core/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,16 @@ public void onPingReceived(PingRequest pingRequest) {}
6767

6868
private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = newConcurrentMap();
6969

70-
private volatile long clusterStateVersion = ClusterState.UNKNOWN_VERSION;
70+
private final java.util.function.Supplier<ClusterState> clusterStateSupplier;
7171

7272
private volatile DiscoveryNode localNode;
7373

74-
public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
74+
public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService,
75+
java.util.function.Supplier<ClusterState> clusterStateSupplier, ClusterName clusterName) {
7576
super(settings, threadPool, transportService, clusterName);
7677

78+
this.clusterStateSupplier = clusterStateSupplier;
79+
7780
logger.debug("[node ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout,
7881
pingRetryCount);
7982

@@ -213,15 +216,18 @@ private boolean running() {
213216
return NodeFD.this.equals(nodesFD.get(node));
214217
}
215218

219+
private PingRequest newPingRequest() {
220+
return new PingRequest(node, clusterName, localNode, clusterStateSupplier.get().version());
221+
}
222+
216223
@Override
217224
public void run() {
218225
if (!running()) {
219226
return;
220227
}
221-
final PingRequest pingRequest = new PingRequest(node, clusterName, localNode, clusterStateVersion);
222228
final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING)
223229
.withTimeout(pingRetryTimeout).build();
224-
transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, new TransportResponseHandler<PingResponse>() {
230+
transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, new TransportResponseHandler<PingResponse>() {
225231
@Override
226232
public PingResponse newInstance() {
227233
return new PingResponse();
@@ -264,7 +270,7 @@ public void handleException(TransportException exp) {
264270
}
265271
} else {
266272
// resend the request, not reschedule, rely on send timeout
267-
transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, this);
273+
transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, this);
268274
}
269275
}
270276

core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
190190

191191
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, clusterService);
192192
this.masterFD.addListener(new MasterNodeFailureListener());
193-
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterService.getClusterName());
193+
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, this::clusterState, clusterService.getClusterName());
194194
this.nodesFD.addListener(new NodeFaultDetectionListener());
195195

196196
this.publishClusterState =

core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,17 +184,19 @@ public void testNodesFaultDetectionConnectOnDisconnect() throws InterruptedExcep
184184
final Settings pingSettings = Settings.builder()
185185
.put(FaultDetection.CONNECT_ON_NETWORK_DISCONNECT_SETTING.getKey(), shouldRetry)
186186
.put(FaultDetection.PING_INTERVAL_SETTING.getKey(), "5m").build();
187-
ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(buildNodesForA(true)).build();
187+
ClusterState clusterState = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong())
188+
.nodes(buildNodesForA(true)).build();
188189
NodesFaultDetection nodesFDA = new NodesFaultDetection(Settings.builder().put(settingsA).put(pingSettings).build(),
189-
threadPool, serviceA, clusterState.getClusterName());
190+
threadPool, serviceA, () -> clusterState, clusterState.getClusterName());
190191
nodesFDA.setLocalNode(nodeA);
191192
NodesFaultDetection nodesFDB = new NodesFaultDetection(Settings.builder().put(settingsB).put(pingSettings).build(),
192-
threadPool, serviceB, clusterState.getClusterName());
193+
threadPool, serviceB, () -> clusterState, clusterState.getClusterName());
193194
nodesFDB.setLocalNode(nodeB);
194195
final CountDownLatch pingSent = new CountDownLatch(1);
195196
nodesFDB.addListener(new NodesFaultDetection.Listener() {
196197
@Override
197198
public void onPingReceived(NodesFaultDetection.PingRequest pingRequest) {
199+
assertThat(pingRequest.clusterStateVersion(), equalTo(clusterState.version()));
198200
pingSent.countDown();
199201
}
200202
});

0 commit comments

Comments
 (0)