Skip to content

Commit 283b511

Browse files
committed
MasterNodeChangePredicate should use the node instance to detect master change (#25877)
This predicate is used to deal with the intricacies of detecting when a master is reelected/nodes rejoins an existing master. The current implementation is based on nodeIds, which is fine if the master really change. If the nodeId is equal the code falls back to detecting an increment in the cluster state version which happens when a node is re-elected or when the node rejoins. Sadly this doesn't cover the case where the same node is elected after a full restart of all master nodes. In that case we recover the cluster state from disk but the version is reset back to 0. To fix this, the check should be done based on ephemeral IDs which are reset on restart. Fixes #25471
1 parent f0b24ba commit 283b511

File tree

2 files changed

+49
-9
lines changed

2 files changed

+49
-9
lines changed

core/src/main/java/org/elasticsearch/cluster/MasterNodeChangePredicate.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.elasticsearch.cluster;
2121

22+
import org.elasticsearch.cluster.node.DiscoveryNode;
23+
2224
import java.util.function.Predicate;
2325

2426
public final class MasterNodeChangePredicate {
@@ -33,13 +35,14 @@ private MasterNodeChangePredicate() {
3335
*/
3436
public static Predicate<ClusterState> build(ClusterState currentState) {
3537
final long currentVersion = currentState.version();
36-
final String currentMaster = currentState.nodes().getMasterNodeId();
38+
final DiscoveryNode masterNode = currentState.nodes().getMasterNode();
39+
final String currentMasterId = masterNode == null ? null : masterNode.getEphemeralId();
3740
return newState -> {
38-
final String newMaster = newState.nodes().getMasterNodeId();
41+
final DiscoveryNode newMaster = newState.nodes().getMasterNode();
3942
final boolean accept;
4043
if (newMaster == null) {
4144
accept = false;
42-
} else if (newMaster.equals(currentMaster) == false){
45+
} else if (newMaster.getEphemeralId().equals(currentMasterId) == false) {
4346
accept = true;
4447
} else {
4548
accept = newState.version() > currentVersion;

core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.cluster.block.ClusterBlocks;
3737
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3838
import org.elasticsearch.cluster.node.DiscoveryNode;
39+
import org.elasticsearch.cluster.node.DiscoveryNodes;
3940
import org.elasticsearch.cluster.service.ClusterService;
4041
import org.elasticsearch.common.settings.Settings;
4142
import org.elasticsearch.common.transport.LocalTransportAddress;
@@ -299,21 +300,57 @@ public void testDelegateToMaster() throws ExecutionException, InterruptedExcepti
299300
}
300301

301302
public void testDelegateToFailingMaster() throws ExecutionException, InterruptedException {
302-
boolean failsWithConnectTransportException = randomBoolean();
303+
final boolean failsWithConnectTransportException = randomBoolean();
304+
final boolean rejoinSameMaster = failsWithConnectTransportException && randomBoolean();
303305
Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(failsWithConnectTransportException ? 60 : 0));
304-
setState(clusterService, ClusterStateCreationUtils.state(localNode, remoteNode, allNodes));
306+
DiscoveryNode masterNode = this.remoteNode;
307+
setState(clusterService, ClusterState.builder(ClusterStateCreationUtils.state(localNode, masterNode, allNodes))
308+
.version(randomIntBetween(0, 10))); // use a random base version so it can go down when simulating a restart.
305309

306310
PlainActionFuture<Response> listener = new PlainActionFuture<>();
307311
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool).execute(request, listener);
308312

309-
assertThat(transport.capturedRequests().length, equalTo(1));
310-
CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0];
313+
CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
314+
assertThat(capturedRequests.length, equalTo(1));
315+
CapturingTransport.CapturedRequest capturedRequest = capturedRequests[0];
311316
assertTrue(capturedRequest.node.isMasterNode());
312317
assertThat(capturedRequest.request, equalTo(request));
313318
assertThat(capturedRequest.action, equalTo("testAction"));
314319

315-
if (failsWithConnectTransportException) {
316-
transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(remoteNode, "Fake error"));
320+
if (rejoinSameMaster) {
321+
transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(masterNode, "Fake error"));
322+
assertFalse(listener.isDone());
323+
if (randomBoolean()) {
324+
// simulate master node removal
325+
final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterService.state().nodes());
326+
nodesBuilder.masterNodeId(null);
327+
setState(clusterService, ClusterState.builder(clusterService.state()).nodes(nodesBuilder));
328+
}
329+
if (randomBoolean()) {
330+
// reset the same state to increment a version simulating a join of an existing node
331+
// simulating use being disconnected
332+
final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterService.state().nodes());
333+
nodesBuilder.masterNodeId(masterNode.getId());
334+
setState(clusterService, ClusterState.builder(clusterService.state()).nodes(nodesBuilder));
335+
} else {
336+
// simulate master restart followed by a state recovery - this will reset the cluster state version
337+
final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterService.state().nodes());
338+
nodesBuilder.remove(masterNode);
339+
masterNode = new DiscoveryNode(masterNode.getId(), masterNode.getAddress(), masterNode.getVersion());
340+
nodesBuilder.add(masterNode);
341+
nodesBuilder.masterNodeId(masterNode.getId());
342+
final ClusterState.Builder builder = ClusterState.builder(clusterService.state()).nodes(nodesBuilder);
343+
setState(clusterService, builder.version(0));
344+
}
345+
assertFalse(listener.isDone());
346+
capturedRequests = transport.getCapturedRequestsAndClear();
347+
assertThat(capturedRequests.length, equalTo(1));
348+
capturedRequest = capturedRequests[0];
349+
assertTrue(capturedRequest.node.isMasterNode());
350+
assertThat(capturedRequest.request, equalTo(request));
351+
assertThat(capturedRequest.action, equalTo("testAction"));
352+
} else if (failsWithConnectTransportException) {
353+
transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(masterNode, "Fake error"));
317354
assertFalse(listener.isDone());
318355
setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, allNodes));
319356
assertTrue(listener.isDone());

0 commit comments

Comments
 (0)