diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 0dbd56b2dce57..c1dd4a1d375db 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterState.Builder; import org.elasticsearch.cluster.ClusterState.VotingConfiguration; import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator; @@ -42,6 +43,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -64,9 +66,13 @@ import java.util.Optional; import java.util.Random; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import static java.util.Collections.emptySet; +import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_MASTER_NODES_FAILURE_TOLERANCE; import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -104,6 +110,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery @Nullable private Releasable leaderCheckScheduler; private long maxTermSeen; + private final Reconfigurator reconfigurator; private Mode mode; private Optional lastKnownLeader; @@ -111,9 +118,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private JoinHelper.JoinAccumulator joinAccumulator; private Optional currentPublication = Optional.empty(); - public Coordinator(Settings settings, TransportService transportService, AllocationService allocationService, - MasterService masterService, Supplier persistedStateSupplier, - UnicastHostsProvider unicastHostsProvider, ClusterApplier clusterApplier, Random random) { + public Coordinator(Settings settings, ClusterSettings clusterSettings, TransportService transportService, + AllocationService allocationService, MasterService masterService, + Supplier persistedStateSupplier, UnicastHostsProvider unicastHostsProvider, + ClusterApplier clusterApplier, Random random) { super(settings); this.transportService = transportService; this.masterService = masterService; @@ -136,6 +144,7 @@ public Coordinator(Settings settings, TransportService transportService, Allocat this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); this.clusterApplier = clusterApplier; masterService.setClusterStateSupplier(this::getStateForMasterService); + this.reconfigurator = new Reconfigurator(settings, clusterSettings); } private Runnable getOnLeaderFailure() { @@ -269,8 +278,13 @@ private void updateMaxTermSeen(final long term) { logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump", maxTermSeen, currentTerm); } else { - ensureTermAtLeast(getLocalNode(), maxTermSeen); - startElection(); + try { + ensureTermAtLeast(getLocalNode(), maxTermSeen); + startElection(); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("failed to bump term to {}", maxTermSeen), e); + becomeCandidate("updateMaxTermSeen"); + } } } } @@ -524,6 +538,12 @@ public void invariant() { assert lastPublishedNodes.equals(followersChecker.getKnownFollowers()) : lastPublishedNodes + " != " + followersChecker.getKnownFollowers(); } + + assert becomingMaster || activePublication || + coordinationState.get().getLastAcceptedConfiguration().equals(coordinationState.get().getLastCommittedConfiguration()) + : coordinationState.get().getLastAcceptedConfiguration() + " != " + + coordinationState.get().getLastCommittedConfiguration(); + } else if (mode == Mode.FOLLOWER) { assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false"; assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false); @@ -582,6 +602,8 @@ public void setInitialConfiguration(final VotingConfiguration votingConfiguratio MetaData.Builder metaDataBuilder = MetaData.builder(); // automatically generate a UID for the metadata if we need to metaDataBuilder.generateClusterUuidIfNeeded(); // TODO generate UUID in bootstrapping tool? + metaDataBuilder.persistentSettings(Settings.builder().put(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey(), + (votingConfiguration.getNodeIds().size() - 1) / 2).build()); // TODO set this in bootstrapping tool? builder.metaData(metaDataBuilder); coordinationState.get().setInitialState(builder.build()); preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version @@ -589,6 +611,50 @@ public void setInitialConfiguration(final VotingConfiguration votingConfiguratio } } + // Package-private for testing + ClusterState improveConfiguration(ClusterState clusterState) { + assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; + + final Set liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false) + .filter(this::hasJoinVoteFrom).collect(Collectors.toSet()); + final ClusterState.VotingConfiguration newConfig = reconfigurator.reconfigure( + liveNodes, emptySet(), clusterState.getLastAcceptedConfiguration()); + if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) { + assert coordinationState.get().joinVotesHaveQuorumFor(newConfig); + return ClusterState.builder(clusterState).lastAcceptedConfiguration(newConfig).build(); + } + + return clusterState; + } + + private AtomicBoolean reconfigurationTaskScheduled = new AtomicBoolean(); + + private void scheduleReconfigurationIfNeeded() { + assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; + assert mode == Mode.LEADER : mode; + assert currentPublication.isPresent() == false : "Expected no publication in progress"; + + final ClusterState state = getLastAcceptedState(); + if (improveConfiguration(state) != state && reconfigurationTaskScheduled.compareAndSet(false, true)) { + logger.trace("scheduling reconfiguration"); + masterService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask(Priority.URGENT) { + @Override + public ClusterState execute(ClusterState currentState) { + reconfigurationTaskScheduled.set(false); + synchronized (mutex) { + return improveConfiguration(currentState); + } + } + + @Override + public void onFailure(String source, Exception e) { + reconfigurationTaskScheduled.set(false); + logger.debug("reconfiguration failed", e); + } + }); + } + } + // for tests boolean hasJoinVoteFrom(DiscoveryNode localNode) { return coordinationState.get().containsJoinVoteFor(localNode); @@ -599,12 +665,15 @@ private void handleJoin(Join join) { ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(this::handleJoin); if (coordinationState.get().electionWon()) { - // if we have already won the election then the actual join does not matter for election purposes, - // so swallow any exception - try { - coordinationState.get().handleJoin(join); - } catch (CoordinationStateRejectedException e) { - logger.debug(new ParameterizedMessage("failed to add {} - ignoring", join), e); + // If we have already won the election then the actual join does not matter for election purposes, so swallow any exception + final boolean isNewJoin = handleJoinIgnoringExceptions(join); + + // If we haven't completely finished becoming master then there's already a publication scheduled which will, in turn, + // schedule a reconfiguration if needed. It's benign to schedule a reconfiguration anyway, but it might fail if it wins the + // race against the election-winning publication and log a big error message, which we can prevent by checking this here: + final boolean establishedAsMaster = mode == Mode.LEADER && getLastAcceptedState().term() == getCurrentTerm(); + if (isNewJoin && establishedAsMaster && publicationInProgress() == false) { + scheduleReconfigurationIfNeeded(); } } else { coordinationState.get().handleJoin(join); // this might fail and bubble up the exception @@ -612,6 +681,18 @@ private void handleJoin(Join join) { } } + /** + * @return true iff the join was from a new node and was successfully added + */ + private boolean handleJoinIgnoringExceptions(Join join) { + try { + return coordinationState.get().handleJoin(join); + } catch (CoordinationStateRejectedException e) { + logger.debug(new ParameterizedMessage("failed to add {} - ignoring", join), e); + return false; + } + } + public ClusterState getLastAcceptedState() { synchronized (mutex) { return coordinationState.get().getLastAcceptedState(); @@ -904,6 +985,10 @@ public void onSuccess(String source) { logger.debug("publication ended successfully: {}", CoordinatorPublication.this); // trigger term bump if new term was found during publication updateMaxTermSeen(getCurrentTerm()); + + if (mode == Mode.LEADER) { + scheduleReconfigurationIfNeeded(); + } } ackListener.onNodeAck(getLocalNode(), null); publishListener.onResponse(null); @@ -916,8 +1001,7 @@ public void onFailure(Exception e) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; removePublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)"); - FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException( - "publication failed", e); + final FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException("publication failed", e); ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master. publishListener.onFailure(exception); } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java index 64459eadda471..b08d81ac8e457 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java @@ -52,6 +52,7 @@ public class Reconfigurator extends AbstractComponent { Setting.intSetting("cluster.master_nodes_failure_tolerance", 0, 0, Property.NodeScope, Property.Dynamic); // the default is not supposed to be important since we expect to set this setting explicitly at bootstrapping time // TODO contemplate setting the default to something larger than 0 (1? 1<<30?) + // TODO prevent this being set as a transient or a per-node setting? private volatile int masterNodesFailureTolerance; diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 9dad2390032e3..4cf258dbc715e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -28,7 +28,9 @@ import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener; import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState; +import org.elasticsearch.cluster.coordination.Coordinator.Mode; import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.common.Randomness; @@ -83,6 +85,7 @@ import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING; import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING; import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING; +import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_MASTER_NODES_FAILURE_TOLERANCE; import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; @@ -91,10 +94,12 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.startsWith; @TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE") @@ -130,20 +135,258 @@ public void testNodesJoinAfterStableCluster() { cluster.stabilise(); final long currentTerm = cluster.getAnyLeader().coordinator.getCurrentTerm(); - final int newNodesCount = randomIntBetween(1, 2); - cluster.addNodes(newNodesCount); - cluster.stabilise( - // The first pinging discovers the master - defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) - // One message delay to send a join - + DEFAULT_DELAY_VARIABILITY - // Commit a new cluster state with the new node(s). Might be split into multiple commits - + newNodesCount * DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + cluster.addNodesAndStabilise(randomIntBetween(1, 2)); final long newTerm = cluster.getAnyLeader().coordinator.getCurrentTerm(); assertEquals(currentTerm, newTerm); } + public void testExpandsConfigurationWhenGrowingFromOneToThreeNodesAndShrinksOnFailure() { + final Cluster cluster = new Cluster(1); + cluster.runRandomly(); + cluster.stabilise(); + + final ClusterNode leader = cluster.getAnyLeader(); + + assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(0)); + + cluster.addNodesAndStabilise(2); + + { + assertThat(leader.coordinator.getMode(), is(Mode.LEADER)); + final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be all nodes", lastCommittedConfiguration.getNodeIds(), + equalTo(cluster.clusterNodes.stream().map(ClusterNode::getId).collect(Collectors.toSet()))); + } + + final ClusterNode disconnect1 = cluster.getAnyNode(); + logger.info("--> disconnecting {}", disconnect1); + disconnect1.disconnect(); + cluster.stabilise(); + + { + final ClusterNode newLeader = cluster.getAnyLeader(); + final VotingConfiguration lastCommittedConfiguration = newLeader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be 1 node", lastCommittedConfiguration.getNodeIds().size(), equalTo(1)); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect1.getId())); + } + } + + public void testExpandsConfigurationWhenGrowingFromThreeToFiveNodesAndShrinksOnFailure() { + final Cluster cluster = new Cluster(3); + cluster.runRandomly(); + cluster.stabilise(); + + final ClusterNode leader = cluster.getAnyLeader(); + + logger.info("setting fault tolerance to 1"); + leader.submitSetMasterNodesFailureTolerance(1); + cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(1)); + + cluster.addNodesAndStabilise(2); + + { + assertThat(leader.coordinator.getMode(), is(Mode.LEADER)); + final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be all nodes", lastCommittedConfiguration.getNodeIds(), + equalTo(cluster.clusterNodes.stream().map(ClusterNode::getId).collect(Collectors.toSet()))); + } + + final ClusterNode disconnect1 = cluster.getAnyNode(); + final ClusterNode disconnect2 = cluster.getAnyNodeExcept(disconnect1); + logger.info("--> disconnecting {} and {}", disconnect1, disconnect2); + disconnect1.disconnect(); + disconnect2.disconnect(); + cluster.stabilise(); + + { + final ClusterNode newLeader = cluster.getAnyLeader(); + final VotingConfiguration lastCommittedConfiguration = newLeader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be 3 nodes", lastCommittedConfiguration.getNodeIds().size(), equalTo(3)); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect1.getId())); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect2.getId())); + } + + // we still tolerate the loss of one more node here + + final ClusterNode disconnect3 = cluster.getAnyNodeExcept(disconnect1, disconnect2); + logger.info("--> disconnecting {}", disconnect3); + disconnect3.disconnect(); + cluster.stabilise(); + + { + final ClusterNode newLeader = cluster.getAnyLeader(); + final VotingConfiguration lastCommittedConfiguration = newLeader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be 3 nodes", lastCommittedConfiguration.getNodeIds().size(), equalTo(3)); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect1.getId())); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect2.getId())); + assertTrue(lastCommittedConfiguration.getNodeIds().contains(disconnect3.getId())); + } + + // however we do not tolerate the loss of yet another one + + final ClusterNode disconnect4 = cluster.getAnyNodeExcept(disconnect1, disconnect2, disconnect3); + logger.info("--> disconnecting {}", disconnect4); + disconnect4.disconnect(); + cluster.runFor(DEFAULT_STABILISATION_TIME, "allowing time for fault detection"); + + for (final ClusterNode clusterNode : cluster.clusterNodes) { + assertThat(clusterNode.getId() + " should be a candidate", clusterNode.coordinator.getMode(), equalTo(Mode.CANDIDATE)); + } + + // moreover we are still stuck even if two other nodes heal + logger.info("--> healing {} and {}", disconnect1, disconnect2); + disconnect1.heal(); + disconnect2.heal(); + cluster.runFor(DEFAULT_STABILISATION_TIME, "allowing time for fault detection"); + + for (final ClusterNode clusterNode : cluster.clusterNodes) { + assertThat(clusterNode.getId() + " should be a candidate", clusterNode.coordinator.getMode(), equalTo(Mode.CANDIDATE)); + } + + // we require another node to heal to recover + final ClusterNode toHeal = randomBoolean() ? disconnect3 : disconnect4; + logger.info("--> healing {}", toHeal); + toHeal.heal(); + cluster.stabilise(); + } + + public void testCanShrinkFromFiveNodesToThree() { + final Cluster cluster = new Cluster(5); + cluster.runRandomly(); + cluster.stabilise(); + + { + final ClusterNode leader = cluster.getAnyLeader(); + logger.info("setting fault tolerance to 2"); + leader.submitSetMasterNodesFailureTolerance(2); + cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(2)); + } + + final ClusterNode disconnect1 = cluster.getAnyNode(); + final ClusterNode disconnect2 = cluster.getAnyNodeExcept(disconnect1); + + logger.info("--> disconnecting {} and {}", disconnect1, disconnect2); + disconnect1.disconnect(); + disconnect2.disconnect(); + cluster.stabilise(); + + final ClusterNode leader = cluster.getAnyLeader(); + + { + final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be all nodes", lastCommittedConfiguration.getNodeIds(), + equalTo(cluster.clusterNodes.stream().map(ClusterNode::getId).collect(Collectors.toSet()))); + } + + leader.submitSetMasterNodesFailureTolerance(1); + cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY * 2); // allow for a reconfiguration + assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(1)); + + { + final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be 3 nodes", lastCommittedConfiguration.getNodeIds().size(), equalTo(3)); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect1.getId())); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect2.getId())); + } + } + + public void testCanShrinkFromThreeNodesToTwo() { + final Cluster cluster = new Cluster(3); + cluster.runRandomly(); + cluster.stabilise(); + + { + final ClusterNode leader = cluster.getAnyLeader(); + logger.info("setting fault tolerance to 1"); + leader.submitSetMasterNodesFailureTolerance(1); + cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(1)); + } + + final ClusterNode disconnect1 = cluster.getAnyNode(); + + logger.info("--> disconnecting {}", disconnect1); + disconnect1.disconnect(); + cluster.stabilise(); + + final ClusterNode leader = cluster.getAnyLeader(); + + { + final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be all nodes", lastCommittedConfiguration.getNodeIds(), + equalTo(cluster.clusterNodes.stream().map(ClusterNode::getId).collect(Collectors.toSet()))); + } + + leader.submitSetMasterNodesFailureTolerance(0); + cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY * 2); // allow for a reconfiguration + assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(0)); + + { + final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be 1 node", lastCommittedConfiguration.getNodeIds().size(), equalTo(1)); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect1.getId())); + } + } + + public void testDoesNotShrinkConfigurationDueToLossToleranceConfigurationWithThreeNodes() { + final Cluster cluster = new Cluster(3); + cluster.runRandomly(); + cluster.stabilise(); + + cluster.getAnyLeader().submitSetMasterNodesFailureTolerance(1); + cluster.stabilise(DEFAULT_ELECTION_DELAY); + + final ClusterNode disconnect1 = cluster.getAnyNode(); + + logger.info("--> disconnecting {}", disconnect1); + disconnect1.disconnect(); + cluster.stabilise(); + + final ClusterNode disconnect2 = cluster.getAnyNodeExcept(disconnect1); + logger.info("--> disconnecting {}", disconnect2); + disconnect2.disconnect(); + cluster.runFor(DEFAULT_STABILISATION_TIME, "allowing time for fault detection"); + + for (final ClusterNode clusterNode : cluster.clusterNodes) { + assertThat(clusterNode.getId() + " should be a candidate", clusterNode.coordinator.getMode(), equalTo(Mode.CANDIDATE)); + } + + disconnect1.heal(); + cluster.stabilise(); // would not work if disconnect1 were removed from the configuration + } + + public void testDoesNotShrinkConfigurationDueToLossToleranceConfigurationWithFiveNodes() { + final Cluster cluster = new Cluster(5); + cluster.runRandomly(); + cluster.stabilise(); + + cluster.getAnyLeader().submitSetMasterNodesFailureTolerance(2); + cluster.stabilise(DEFAULT_ELECTION_DELAY); + + final ClusterNode disconnect1 = cluster.getAnyNode(); + final ClusterNode disconnect2 = cluster.getAnyNodeExcept(disconnect1); + + logger.info("--> disconnecting {} and {}", disconnect1, disconnect2); + disconnect1.disconnect(); + disconnect2.disconnect(); + cluster.stabilise(); + + final ClusterNode disconnect3 = cluster.getAnyNodeExcept(disconnect1, disconnect2); + logger.info("--> disconnecting {}", disconnect3); + disconnect3.disconnect(); + cluster.runFor(DEFAULT_STABILISATION_TIME, "allowing time for fault detection"); + + for (final ClusterNode clusterNode : cluster.clusterNodes) { + assertThat(clusterNode.getId() + " should be a candidate", clusterNode.coordinator.getMode(), equalTo(Mode.CANDIDATE)); + } + + disconnect1.heal(); + cluster.stabilise(); // would not work if disconnect1 were removed from the configuration + } + public void testLeaderDisconnectionDetectedQuickly() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); @@ -162,9 +405,7 @@ public void testLeaderDisconnectionDetectedQuickly() { // then wait for the exception response + DEFAULT_DELAY_VARIABILITY // then wait for a new election - + DEFAULT_ELECTION_DELAY - // then wait for the old leader's removal to be committed - + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, + + DEFAULT_ELECTION_DELAY, // ALSO the leader may have just sent a follower check, which receives no response // TODO unnecessary if notified of disconnection @@ -172,10 +413,14 @@ public void testLeaderDisconnectionDetectedQuickly() { // wait for the leader to check its followers + defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) // then wait for the exception response - + DEFAULT_DELAY_VARIABILITY - // then wait for the removal to be committed - + DEFAULT_CLUSTER_STATE_UPDATE_DELAY - )); + + DEFAULT_DELAY_VARIABILITY) + + // FINALLY: + + // wait for the removal to be committed + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY + // then wait for the followup reconfiguration + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId()))); } @@ -210,6 +455,8 @@ public void testUnresponsiveLeaderDetectedEventually() { * defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING)) // then wait for the new leader to commit a state without the old leader + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY + // then wait for the followup reconfiguration + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, // ALSO wait for the leader to notice that its followers are unresponsive @@ -241,6 +488,8 @@ public void testFollowerDisconnectionDetectedQuickly() { // then wait for the exception response + DEFAULT_DELAY_VARIABILITY // then wait for the removal to be committed + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY + // then wait for the followup reconfiguration + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, // ALSO the follower may have just sent a leader check, which receives no response @@ -269,6 +518,8 @@ public void testUnresponsiveFollowerDetectedEventually() { (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING)) * defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING) // then wait for the leader to commit a state without the follower + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY + // then wait for the followup reconfiguration + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, // ALSO wait for the follower to notice the leader is unresponsive @@ -338,9 +589,13 @@ public void testAckListenerReceivesNoAckFromHangingFollower() { final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0); + logger.info("--> blocking cluster state application on {}", follower0); follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.HANG); + + logger.info("--> publishing another value"); AckCollector ackCollector = leader.submitValue(randomLong()); cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing value"); + assertTrue("expected immediate ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1)); assertFalse("expected no ack from " + leader, ackCollector.hasAckedSuccessfully(leader)); cluster.stabilise(); @@ -426,14 +681,18 @@ public void testSettingInitialConfigurationTriggersElection() { } cluster.getAnyNode().applyInitialConfiguration(); - cluster.stabilise(defaultMillis( + cluster.stabilise( // the first election should succeed, because only one node knows of the initial configuration and therefore can win a // pre-voting round and proceed to an election, so there cannot be any collisions - ELECTION_INITIAL_TIMEOUT_SETTING) // TODO this wait is unnecessary, we could trigger the election immediately - // Allow two round-trip for pre-voting and voting - + 4 * DEFAULT_DELAY_VARIABILITY - // Then a commit of the new leader's first cluster state - + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + defaultMillis(ELECTION_INITIAL_TIMEOUT_SETTING) // TODO this wait is unnecessary, we could trigger the election immediately + // Allow two round-trip for pre-voting and voting + + 4 * DEFAULT_DELAY_VARIABILITY + // Then a commit of the new leader's first cluster state + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY + // Then allow time for all the other nodes to join, each of which might cause a reconfiguration + + (cluster.size() - 1) * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY + // TODO Investigate whether 4 publications is sufficient due to batching? A bound linear in the number of nodes isn't great. + ); } public void testCannotSetInitialConfigurationTwice() { @@ -533,13 +792,18 @@ class Cluster { Cluster(int initialNodeCount) { deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); - logger.info("--> creating cluster of {} nodes", initialNodeCount); + assertThat(initialNodeCount, greaterThan(0)); - Set initialNodeIds = new HashSet<>(initialNodeCount); - for (int i = 0; i < initialNodeCount; i++) { - initialNodeIds.add(nodeIdFromIndex(i)); + final Set initialConfigurationNodeIds = new HashSet<>(initialNodeCount); + while (initialConfigurationNodeIds.isEmpty()) { + for (int i = 0; i < initialNodeCount; i++) { + if (randomBoolean()) { + initialConfigurationNodeIds.add(nodeIdFromIndex(i)); + } + } } - initialConfiguration = new VotingConfiguration(initialNodeIds); + initialConfiguration = new VotingConfiguration(initialConfigurationNodeIds); + logger.info("--> creating cluster of {} nodes with initial configuration {}", initialNodeCount, initialConfiguration); clusterNodes = new ArrayList<>(initialNodeCount); for (int i = 0; i < initialNodeCount; i++) { @@ -548,6 +812,19 @@ class Cluster { } } + void addNodesAndStabilise(int newNodesCount) { + addNodes(newNodesCount); + stabilise( + // The first pinging discovers the master + defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) + // One message delay to send a join + + DEFAULT_DELAY_VARIABILITY + // Commit a new cluster state with the new node(s). Might be split into multiple commits, and each might need a + // followup reconfiguration + + newNodesCount * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + // TODO Investigate whether 4 publications is sufficient due to batching? A bound linear in the number of nodes isn't great. + } + void addNodes(int newNodesCount) { logger.info("--> adding {} nodes", newNodesCount); @@ -558,6 +835,10 @@ void addNodes(int newNodesCount) { } } + int size() { + return clusterNodes.size(); + } + void runRandomly() { // TODO supporting (preserving?) existing disruptions needs implementing if needed, for now we just forbid it @@ -579,7 +860,7 @@ void runRandomly() { if (randomSteps <= step && finishTime == -1) { finishTime = deterministicTaskQueue.getLatestDeferredExecutionTime(); deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); - logger.debug("----> [runRandomly {}] reducing delay variability and running until [{}]", step, finishTime); + logger.debug("----> [runRandomly {}] reducing delay variability and running until [{}ms]", step, finishTime); } try { @@ -591,6 +872,15 @@ void runRandomly() { thisStep, newValue, clusterNode.getId()); clusterNode.submitValue(newValue); }).run(); + } else if (rarely()) { + final ClusterNode clusterNode = getAnyNodePreferringLeaders(); + final int masterNodeFailureTolerance = randomIntBetween(0, 2); + onNode(clusterNode.getLocalNode(), + () -> { + logger.debug("----> [runRandomly {}] setting master-node fault tolerance to {} on {}", + thisStep, masterNodeFailureTolerance, clusterNode.getId()); + clusterNode.submitSetMasterNodesFailureTolerance(masterNodeFailureTolerance); + }).run(); } else if (rarely()) { final ClusterNode clusterNode = getAnyNode(); onNode(clusterNode.getLocalNode(), () -> { @@ -692,16 +982,67 @@ void stabilise(long stabilisationDurationMillis) { runFor(stabilisationDurationMillis, "stabilising"); fixLag(); - assertUniqueLeaderAndExpectedModes(); + + final ClusterNode leader = getAnyLeader(); + final long leaderTerm = leader.coordinator.getCurrentTerm(); + final Matcher isEqualToLeaderVersion = equalTo(leader.coordinator.getLastAcceptedState().getVersion()); + final String leaderId = leader.getId(); + + assertTrue(leaderId + " exists in its last-applied state", leader.getLastAppliedClusterState().getNodes().nodeExists(leaderId)); + assertThat(leaderId + " has applied its state ", leader.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion); + + for (final ClusterNode clusterNode : clusterNodes) { + final String nodeId = clusterNode.getId(); + assertFalse(nodeId + " should not have an active publication", clusterNode.coordinator.publicationInProgress()); + + if (clusterNode == leader) { + continue; + } + + if (isConnectedPair(leader, clusterNode)) { + assertThat(nodeId + " is a follower of " + leaderId, clusterNode.coordinator.getMode(), is(FOLLOWER)); + assertThat(nodeId + " has the same term as " + leaderId, clusterNode.coordinator.getCurrentTerm(), is(leaderTerm)); + assertTrue(nodeId + " has voted for " + leaderId, leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode())); + assertThat(nodeId + " has the same accepted state as " + leaderId, + clusterNode.coordinator.getLastAcceptedState().getVersion(), isEqualToLeaderVersion); + if (clusterNode.getClusterStateApplyResponse() == ClusterStateApplyResponse.SUCCEED) { + assertThat(nodeId + " has the same applied state as " + leaderId, + clusterNode.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion); + assertTrue(nodeId + " is in its own latest applied state", + clusterNode.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); + } + assertTrue(nodeId + " is in the latest applied state on " + leaderId, + leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); + } else { + assertThat(nodeId + " is not following " + leaderId, clusterNode.coordinator.getMode(), is(CANDIDATE)); + assertFalse(nodeId + " is not in the applied state on " + leaderId, + leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); + } + } + + final Set connectedNodeIds + = clusterNodes.stream().filter(n -> isConnectedPair(leader, n)).map(ClusterNode::getId).collect(Collectors.toSet()); + + assertThat(leader.getLastAppliedClusterState().getNodes().getSize(), equalTo(connectedNodeIds.size())); + + final ClusterState lastAcceptedState = leader.coordinator.getLastAcceptedState(); + final VotingConfiguration lastCommittedConfiguration = lastAcceptedState.getLastCommittedConfiguration(); + assertTrue(connectedNodeIds + " should be a quorum of " + lastCommittedConfiguration, + lastCommittedConfiguration.hasQuorum(connectedNodeIds)); + + assertThat("no reconfiguration is in progress", + lastAcceptedState.getLastCommittedConfiguration(), equalTo(lastAcceptedState.getLastAcceptedConfiguration())); + assertThat("current configuration is already optimal", + leader.improveConfiguration(lastAcceptedState), sameInstance(lastAcceptedState)); } // TODO remove this when lag detection is implemented void fixLag() { final ClusterNode leader = getAnyLeader(); - final long leaderVersion = leader.coordinator.getApplierState().version(); + final long leaderVersion = leader.getLastAppliedClusterState().version(); final long minVersion = clusterNodes.stream() .filter(n -> isConnectedPair(n, leader)) - .map(n -> n.coordinator.getApplierState().version()).min(Long::compare).orElse(Long.MIN_VALUE); + .map(n -> n.getLastAppliedClusterState().version()).min(Long::compare).orElse(Long.MIN_VALUE); assert minVersion >= 0; if (minVersion < leaderVersion) { logger.info("--> fixLag publishing a value to fix lag, leaderVersion={}, minVersion={}", leaderVersion, minVersion); @@ -710,10 +1051,15 @@ void fixLag() { leader.submitValue(randomLong()); } }).run(); + runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY - // may need to bump terms too - + DEFAULT_ELECTION_DELAY, + // may need to bump terms too + + DEFAULT_ELECTION_DELAY, "re-stabilising after lag-fixing publication"); + + if (clusterNodes.stream().anyMatch(n -> n.getClusterStateApplyResponse().equals(ClusterStateApplyResponse.HANG))) { + runFor(defaultMillis(PUBLISH_TIMEOUT_SETTING), "allowing lag-fixing publication to time out"); + } } else { logger.info("--> fixLag found no lag, leader={}, leaderVersion={}, minVersion={}", leader, leaderVersion, minVersion); } @@ -755,42 +1101,6 @@ private boolean isConnectedPair(ClusterNode n1, ClusterNode n2) { && getConnectionStatus(n2.getLocalNode(), n1.getLocalNode()) == ConnectionStatus.CONNECTED); } - private void assertUniqueLeaderAndExpectedModes() { - final ClusterNode leader = getAnyLeader(); - final long leaderTerm = leader.coordinator.getCurrentTerm(); - Matcher isPresentAndEqualToLeaderVersion - = equalTo(leader.coordinator.getLastAcceptedState().getVersion()); - - assertTrue(leader.getLastAppliedClusterState().getNodes().nodeExists(leader.getId())); - assertThat(leader.getLastAppliedClusterState().getVersion(), isPresentAndEqualToLeaderVersion); - - for (final ClusterNode clusterNode : clusterNodes) { - final String nodeId = clusterNode.getId(); - assertFalse(nodeId + " should not have an active publication", clusterNode.coordinator.publicationInProgress()); - - if (clusterNode == leader) { - continue; - } - - if (isConnectedPair(leader, clusterNode)) { - assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER)); - assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm)); - assertTrue(nodeId + " has voted for the leader", leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode())); - // TODO assert that this node's accepted and committed states are the same as the leader's - - assertTrue(nodeId + " is in the leader's applied state", - leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); - } else { - assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE)); - assertFalse(nodeId + " is not in the leader's applied state", - leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); - } - } - - int connectedNodeCount = Math.toIntExact(clusterNodes.stream().filter(n -> isConnectedPair(leader, n)).count()); - assertThat(leader.getLastAppliedClusterState().getNodes().getSize(), equalTo(connectedNodeCount)); - } - ClusterNode getAnyLeader() { List allLeaders = clusterNodes.stream().filter(ClusterNode::isLeader).collect(Collectors.toList()); assertThat("leaders", allLeaders, not(empty())); @@ -839,6 +1149,7 @@ class MockPersistedState extends InMemoryPersistedState { private void possiblyFail(String description) { if (disruptStorage && rarely()) { // TODO revisit this when we've decided how PersistedState should throw exceptions + logger.trace("simulating IO exception [{}]", description); if (randomBoolean()) { throw new UncheckedIOException(new IOException("simulated IO exception [" + description + ']')); } else { @@ -938,8 +1249,9 @@ protected void onBlackholedDuringSend(long requestId, String action, DiscoveryNo transportService = mockTransport.createTransportService( settings, deterministicTaskQueue.getThreadPool(runnable -> onNode(localNode, runnable)), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, null, emptySet()); - coordinator = new Coordinator(settings, transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY), - masterService, this::getPersistedState, Cluster.this::provideUnicastHosts, clusterApplier, Randomness.get()); + coordinator = new Coordinator(settings, clusterSettings, transportService, + ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState, + Cluster.this::provideUnicastHosts, clusterApplier, Randomness.get()); masterService.setClusterStatePublisher(coordinator); transportService.start(); @@ -965,10 +1277,34 @@ boolean isLeader() { return coordinator.getMode() == LEADER; } + ClusterState improveConfiguration(ClusterState currentState) { + synchronized (coordinator.mutex) { + return coordinator.improveConfiguration(currentState); + } + } + void setClusterStateApplyResponse(ClusterStateApplyResponse clusterStateApplyResponse) { this.clusterStateApplyResponse = clusterStateApplyResponse; } + ClusterStateApplyResponse getClusterStateApplyResponse() { + return clusterStateApplyResponse; + } + + void submitSetMasterNodesFailureTolerance(final int masterNodesFaultTolerance) { + submitUpdateTask("set master nodes failure tolerance [" + masterNodesFaultTolerance + "]", cs -> + cs.getLastAcceptedConfiguration().getNodeIds().size() < 2 * masterNodesFaultTolerance + 1 ? cs : + // TODO this rejects invalid updates, but in fact this should be validated elsewhere. Where? + ClusterState.builder(cs).metaData( + MetaData.builder(cs.metaData()) + .persistentSettings(Settings.builder() + .put(cs.metaData().persistentSettings()) + .put(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey(), masterNodesFaultTolerance) + .build()) + .build()) + .build()); + } + AckCollector submitValue(final long value) { return submitUpdateTask("new value [" + value + "]", cs -> setValue(cs, value)); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index dd66ebc9e5a2c..cbb45bd6a21f8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -156,11 +156,12 @@ protected void onSendRequest(long requestId, String action, TransportRequest req } } }; + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); TransportService transportService = capturingTransport.createTransportService(Settings.EMPTY, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> initialState.nodes().getLocalNode(), - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), Collections.emptySet()); - coordinator = new Coordinator(Settings.EMPTY, + clusterSettings, Collections.emptySet()); + coordinator = new Coordinator(Settings.EMPTY, clusterSettings, transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 22f89edaa0376..76369e42281df 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -67,6 +67,7 @@ import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.coordination.Reconfigurator; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -551,15 +552,17 @@ protected final void afterInternal(boolean afterClass) throws Exception { if (cluster() != null) { if (currentClusterScope != Scope.TEST) { MetaData metaData = client().admin().cluster().prepareState().execute().actionGet().getState().getMetaData(); - final Set persistent = metaData.persistentSettings().keySet(); - assertThat("test leaves persistent cluster metadata behind: " + persistent, persistent.size(), equalTo(0)); - final Set transientSettings = new HashSet<>(metaData.transientSettings().keySet()); + + final Set persistentKeys = new HashSet<>(metaData.persistentSettings().keySet()); + persistentKeys.remove(Reconfigurator.CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey()); + assertThat("test leaves persistent cluster metadata behind", persistentKeys, empty()); + + final Set transientKeys = new HashSet<>(metaData.transientSettings().keySet()); if (isInternalCluster() && internalCluster().getAutoManageMinMasterNode()) { // this is set by the test infra - transientSettings.remove(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()); + transientKeys.remove(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()); } - assertThat("test leaves transient cluster metadata behind: " + transientSettings, - transientSettings, empty()); + assertThat("test leaves transient cluster metadata behind", transientKeys, empty()); } ensureClusterSizeConsistency(); ensureClusterStateConsistency(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index 200ec07d7a8eb..c3a5dad035190 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -25,6 +25,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.coordination.Reconfigurator; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -123,8 +124,11 @@ public void tearDown() throws Exception { super.tearDown(); assertAcked(client().admin().indices().prepareDelete("*").get()); MetaData metaData = client().admin().cluster().prepareState().get().getState().getMetaData(); - assertThat("test leaves persistent cluster metadata behind: " + metaData.persistentSettings().keySet(), - metaData.persistentSettings().size(), equalTo(0)); + Settings.Builder unexpectedPersistentSettingsBuilder = Settings.builder().put(metaData.persistentSettings()); + unexpectedPersistentSettingsBuilder.remove(Reconfigurator.CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey()); + Settings unexpectedPersistentSettings = unexpectedPersistentSettingsBuilder.build(); + assertThat("test leaves persistent cluster metadata behind: " + unexpectedPersistentSettings.keySet(), + unexpectedPersistentSettings.size(), equalTo(0)); assertThat("test leaves transient cluster metadata behind: " + metaData.transientSettings().keySet(), metaData.transientSettings().size(), equalTo(0)); if (resetNodeAfterTest()) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java index 67a6f0421f96c..2d296890d930b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java @@ -86,8 +86,8 @@ public Map> getDiscoveryTypes(ThreadPool threadPool, () -> new InMemoryPersistedState(0L, ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)) .nodes(DiscoveryNodes.builder().add(transportService.getLocalNode()) .localNodeId(transportService.getLocalNode().getId()).build()).build()); - return new Coordinator(fixedSettings, transportService, allocationService, masterService, persistedStateSupplier, - hostsProvider, clusterApplier, new Random(Randomness.get().nextLong())); + return new Coordinator(fixedSettings, clusterSettings, transportService, allocationService, masterService, + persistedStateSupplier, hostsProvider, clusterApplier, new Random(Randomness.get().nextLong())); } else { return new TestZenDiscovery(fixedSettings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, clusterSettings, hostsProvider, allocationService);