Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.cluster;

import org.elasticsearch.cluster.node.DiscoveryNode;

import java.util.function.Predicate;

public final class MasterNodeChangePredicate {
Expand All @@ -33,13 +35,14 @@ private MasterNodeChangePredicate() {
*/
public static Predicate<ClusterState> build(ClusterState currentState) {
final long currentVersion = currentState.version();
final String currentMaster = currentState.nodes().getMasterNodeId();
final DiscoveryNode masterNode = currentState.nodes().getMasterNode();
final String currentMasterId = masterNode == null ? null : masterNode.getEphemeralId();
return newState -> {
final String newMaster = newState.nodes().getMasterNodeId();
final DiscoveryNode newMaster = newState.nodes().getMasterNode();
final boolean accept;
if (newMaster == null) {
accept = false;
} else if (newMaster.equals(currentMaster) == false){
} else if (newMaster.getEphemeralId().equals(currentMasterId) == false) {
accept = true;
} else {
accept = newState.version() > currentVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -301,7 +302,9 @@ public void testDelegateToFailingMaster() throws ExecutionException, Interrupted
boolean failsWithConnectTransportException = randomBoolean();
boolean rejoinSameMaster = failsWithConnectTransportException && randomBoolean();
Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(failsWithConnectTransportException ? 60 : 0));
setState(clusterService, ClusterStateCreationUtils.state(localNode, remoteNode, allNodes));
DiscoveryNode masterNode = this.remoteNode;
setState(clusterService, ClusterState.builder(ClusterStateCreationUtils.state(localNode, masterNode, allNodes))
.version(randomIntBetween(0, 10))); // use a random base version so it can go down when simulating a restart.

PlainActionFuture<Response> listener = new PlainActionFuture<>();
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool).execute(request, listener);
Expand All @@ -314,10 +317,30 @@ public void testDelegateToFailingMaster() throws ExecutionException, Interrupted
assertThat(capturedRequest.action, equalTo("testAction"));

if (rejoinSameMaster) {
transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(remoteNode, "Fake error"));
transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(masterNode, "Fake error"));
assertFalse(listener.isDone());
// reset the same state to increment a version simulating a join of an existing node
setState(clusterService, clusterService.state());
if (randomBoolean()) {
// simulate master node removal
final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterService.state().nodes());
nodesBuilder.masterNodeId(null);
setState(clusterService, ClusterState.builder(clusterService.state()).nodes(nodesBuilder));
}
if (randomBoolean()) {
// reset the same state to increment a version simulating a join of an existing node
// simulating use being disconnected
final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterService.state().nodes());
nodesBuilder.masterNodeId(masterNode.getId());
setState(clusterService, ClusterState.builder(clusterService.state()).nodes(nodesBuilder));
} else {
// simulate master restart followed by a state recovery - this will reset the cluster state version
final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterService.state().nodes());
nodesBuilder.remove(masterNode);
masterNode = new DiscoveryNode(masterNode.getId(), masterNode.getAddress(), masterNode.getVersion());
nodesBuilder.add(masterNode);
nodesBuilder.masterNodeId(masterNode.getId());
final ClusterState.Builder builder = ClusterState.builder(clusterService.state()).nodes(nodesBuilder);
setState(clusterService, builder.version(0));
}
assertFalse(listener.isDone());
capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests.length, equalTo(1));
Expand All @@ -326,7 +349,7 @@ public void testDelegateToFailingMaster() throws ExecutionException, Interrupted
assertThat(capturedRequest.request, equalTo(request));
assertThat(capturedRequest.action, equalTo("testAction"));
} else if (failsWithConnectTransportException) {
transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(remoteNode, "Fake error"));
transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(masterNode, "Fake error"));
assertFalse(listener.isDone());
setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, allNodes));
assertTrue(listener.isDone());
Expand Down