From 6e96ec80d3d7101576f9432d88bb528f571a5b26 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 18 Oct 2018 14:31:03 +0100 Subject: [PATCH 01/25] Reconfigure cluster as its membership changes As master-eligible nodes join or leave the cluster we should give them votes or take them away, in order to maintain the optimal level of fault-tolerance in the system. #33924 introduced the `Reconfigurator` to calculate the optimal configuration of the cluster, and in this change we add the plumbing needed to actually perform the reconfigurations needed as the cluster grows or shrinks. --- .../cluster/coordination/Coordinator.java | 90 +++++- .../coordination/CoordinatorTests.java | 274 +++++++++++++----- .../cluster/coordination/NodeJoinTests.java | 5 +- .../test/discovery/TestZenDiscovery.java | 4 +- 4 files changed, 289 insertions(+), 84 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 0dbd56b2dce57..fbc9d4724ae12 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterState.Builder; import org.elasticsearch.cluster.ClusterState.VotingConfiguration; import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator; @@ -42,6 +43,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -64,9 +66,13 @@ import java.util.Optional; import java.util.Random; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import static java.util.Collections.emptySet; +import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_MASTER_NODES_FAILURE_TOLERANCE; import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -104,6 +110,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery @Nullable private Releasable leaderCheckScheduler; private long maxTermSeen; + private final Reconfigurator reconfigurator; private Mode mode; private Optional lastKnownLeader; @@ -111,9 +118,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private JoinHelper.JoinAccumulator joinAccumulator; private Optional currentPublication = Optional.empty(); - public Coordinator(Settings settings, TransportService transportService, AllocationService allocationService, - MasterService masterService, Supplier persistedStateSupplier, - UnicastHostsProvider unicastHostsProvider, ClusterApplier clusterApplier, Random random) { + public Coordinator(Settings settings, ClusterSettings clusterSettings, TransportService transportService, + AllocationService allocationService, MasterService masterService, + Supplier persistedStateSupplier, UnicastHostsProvider unicastHostsProvider, + ClusterApplier clusterApplier, Random random) { super(settings); this.transportService = transportService; this.masterService = masterService; @@ -136,6 +144,7 @@ public Coordinator(Settings settings, TransportService transportService, Allocat this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); this.clusterApplier = clusterApplier; masterService.setClusterStateSupplier(this::getStateForMasterService); + this.reconfigurator = new Reconfigurator(settings, clusterSettings); } private Runnable getOnLeaderFailure() { @@ -582,6 +591,8 @@ public void setInitialConfiguration(final VotingConfiguration votingConfiguratio MetaData.Builder metaDataBuilder = MetaData.builder(); // automatically generate a UID for the metadata if we need to metaDataBuilder.generateClusterUuidIfNeeded(); // TODO generate UUID in bootstrapping tool? + metaDataBuilder.persistentSettings(Settings.builder().put(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey(), + (votingConfiguration.getNodeIds().size() - 1) / 2).build()); // TODO set this in bootstrapping tool? builder.metaData(metaDataBuilder); coordinationState.get().setInitialState(builder.build()); preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version @@ -589,6 +600,50 @@ public void setInitialConfiguration(final VotingConfiguration votingConfiguratio } } + // Package-private for testing + ClusterState reconfigureIfPossible(ClusterState clusterState) { + synchronized (mutex) { + if (mode == Mode.LEADER) { + final Set liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false) + .filter(coordinationState.get()::containsJoinVoteFor).collect(Collectors.toSet()); + final ClusterState.VotingConfiguration newConfig = reconfigurator.reconfigure( + liveNodes, emptySet(), clusterState.getLastAcceptedConfiguration()); + if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) { + assert coordinationState.get().joinVotesHaveQuorumFor(newConfig); + return ClusterState.builder(clusterState).lastAcceptedConfiguration(newConfig).build(); + } + } + } + + return clusterState; + } + + private AtomicBoolean reconfigurationTaskScheduled = new AtomicBoolean(); + + private void scheduleReconfigurationIfNeeded() { + assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; + assert mode == Mode.LEADER : mode; + assert currentPublication.isPresent() == false : "Expected no publication in progress"; + + final ClusterState state = getLastAcceptedState(); + if (reconfigureIfPossible(state) != state && reconfigurationTaskScheduled.compareAndSet(false, true)) { + logger.trace("scheduling reconfiguration"); + masterService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + reconfigurationTaskScheduled.set(false); + return reconfigureIfPossible(currentState); + } + + @Override + public void onFailure(String source, Exception e) { + reconfigurationTaskScheduled.set(false); + logger.debug("reconfiguration failed", e); + } + }); + } + } + // for tests boolean hasJoinVoteFrom(DiscoveryNode localNode) { return coordinationState.get().containsJoinVoteFor(localNode); @@ -599,12 +654,10 @@ private void handleJoin(Join join) { ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(this::handleJoin); if (coordinationState.get().electionWon()) { - // if we have already won the election then the actual join does not matter for election purposes, - // so swallow any exception - try { - coordinationState.get().handleJoin(join); - } catch (CoordinationStateRejectedException e) { - logger.debug(new ParameterizedMessage("failed to add {} - ignoring", join), e); + // if we have already won the election then the actual join does not matter for election purposes, so swallow any exception + final boolean isNewJoin = handleJoinIgnoringExceptions(join); + if (isNewJoin && mode == Mode.LEADER && publicationInProgress() == false) { + scheduleReconfigurationIfNeeded(); } } else { coordinationState.get().handleJoin(join); // this might fail and bubble up the exception @@ -612,6 +665,18 @@ private void handleJoin(Join join) { } } + /** + * @return true iff the join was from a new node and was successfully added + */ + private boolean handleJoinIgnoringExceptions(Join join) { + try { + return coordinationState.get().handleJoin(join); + } catch (CoordinationStateRejectedException e) { + logger.debug(new ParameterizedMessage("failed to add {} - ignoring", join), e); + return false; + } + } + public ClusterState getLastAcceptedState() { synchronized (mutex) { return coordinationState.get().getLastAcceptedState(); @@ -904,6 +969,10 @@ public void onSuccess(String source) { logger.debug("publication ended successfully: {}", CoordinatorPublication.this); // trigger term bump if new term was found during publication updateMaxTermSeen(getCurrentTerm()); + + if (mode == Mode.LEADER) { + scheduleReconfigurationIfNeeded(); + } } ackListener.onNodeAck(getLocalNode(), null); publishListener.onResponse(null); @@ -916,8 +985,7 @@ public void onFailure(Exception e) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; removePublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)"); - FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException( - "publication failed", e); + final FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException("publication failed", e); ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master. publishListener.onFailure(exception); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 9dad2390032e3..441299259e819 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -28,7 +28,9 @@ import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener; import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState; +import org.elasticsearch.cluster.coordination.Coordinator.Mode; import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.common.Randomness; @@ -91,10 +93,12 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.startsWith; @TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE") @@ -130,20 +134,93 @@ public void testNodesJoinAfterStableCluster() { cluster.stabilise(); final long currentTerm = cluster.getAnyLeader().coordinator.getCurrentTerm(); - final int newNodesCount = randomIntBetween(1, 2); - cluster.addNodes(newNodesCount); - cluster.stabilise( - // The first pinging discovers the master - defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) - // One message delay to send a join - + DEFAULT_DELAY_VARIABILITY - // Commit a new cluster state with the new node(s). Might be split into multiple commits - + newNodesCount * DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + cluster.addNodesAndStabilise(randomIntBetween(1, 2)); final long newTerm = cluster.getAnyLeader().coordinator.getCurrentTerm(); assertEquals(currentTerm, newTerm); } + public void testExpandsConfigurationWhenNodesJoinAndContractsWhenTheyLeave() { + final Cluster cluster = new Cluster(1); + cluster.runRandomly(); + cluster.stabilise(); + + cluster.addNodesAndStabilise(2); + cluster.addNodesAndStabilise(2); + + final ClusterNode disconnect1 = cluster.getAnyNode(); + final ClusterNode disconnect2 = cluster.getAnyNodeExcept(disconnect1); + + logger.info("--> disconnecting {} and {}", disconnect1, disconnect2); + disconnect1.disconnect(); + disconnect2.disconnect(); + cluster.stabilise(); + + final ClusterNode disconnect3 = cluster.getAnyNodeExcept(disconnect1, disconnect2); + logger.info("--> disconnecting {}", disconnect3); + disconnect3.disconnect(); + cluster.stabilise(); + + VotingConfiguration lastCommittedConfiguration = cluster.getAnyLeader().getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be a single node", lastCommittedConfiguration.getNodeIds().size(), equalTo(1)); + } + + public void testDoesNotShrinkConfigurationDueToLossToleranceConfigurationWithThreeNodes() { + final Cluster cluster = new Cluster(3); + cluster.runRandomly(); + cluster.stabilise(); + + cluster.getAnyLeader().submitSetMasterNodesFailureTolerance(1); + cluster.stabilise(DEFAULT_ELECTION_DELAY); + + final ClusterNode disconnect1 = cluster.getAnyNode(); + + logger.info("--> disconnecting {}", disconnect1); + disconnect1.disconnect(); + cluster.stabilise(); + + final ClusterNode disconnect2 = cluster.getAnyNodeExcept(disconnect1); + logger.info("--> disconnecting {}", disconnect2); + disconnect2.disconnect(); + cluster.runFor(DEFAULT_STABILISATION_TIME, "allowing time for fault detection"); + + for (final ClusterNode clusterNode : cluster.clusterNodes) { + assertThat(clusterNode.getId() + " should be a candidate", clusterNode.coordinator.getMode(), equalTo(Mode.CANDIDATE)); + } + + disconnect1.heal(); + cluster.stabilise(); // would not work if disconnect1 were removed from the configuration + } + + public void testDoesNotShrinkConfigurationDueToLossToleranceConfigurationWithFiveNodes() { + final Cluster cluster = new Cluster(5); + cluster.runRandomly(); + cluster.stabilise(); + + cluster.getAnyLeader().submitSetMasterNodesFailureTolerance(2); + cluster.stabilise(DEFAULT_ELECTION_DELAY); + + final ClusterNode disconnect1 = cluster.getAnyNode(); + final ClusterNode disconnect2 = cluster.getAnyNodeExcept(disconnect1); + + logger.info("--> disconnecting {} and {}", disconnect1, disconnect2); + disconnect1.disconnect(); + disconnect2.disconnect(); + cluster.stabilise(); + + final ClusterNode disconnect3 = cluster.getAnyNodeExcept(disconnect1, disconnect2); + logger.info("--> disconnecting {}", disconnect3); + disconnect3.disconnect(); + cluster.runFor(DEFAULT_STABILISATION_TIME, "allowing time for fault detection"); + + for (final ClusterNode clusterNode : cluster.clusterNodes) { + assertThat(clusterNode.getId() + " should be a candidate", clusterNode.coordinator.getMode(), equalTo(Mode.CANDIDATE)); + } + + disconnect1.heal(); + cluster.stabilise(); // would not work if disconnect1 were removed from the configuration + } + public void testLeaderDisconnectionDetectedQuickly() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); @@ -162,9 +239,7 @@ public void testLeaderDisconnectionDetectedQuickly() { // then wait for the exception response + DEFAULT_DELAY_VARIABILITY // then wait for a new election - + DEFAULT_ELECTION_DELAY - // then wait for the old leader's removal to be committed - + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, + + DEFAULT_ELECTION_DELAY, // ALSO the leader may have just sent a follower check, which receives no response // TODO unnecessary if notified of disconnection @@ -172,10 +247,14 @@ public void testLeaderDisconnectionDetectedQuickly() { // wait for the leader to check its followers + defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) // then wait for the exception response - + DEFAULT_DELAY_VARIABILITY - // then wait for the removal to be committed - + DEFAULT_CLUSTER_STATE_UPDATE_DELAY - )); + + DEFAULT_DELAY_VARIABILITY) + + // FINALLY: + + // wait for the removal to be committed + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY + // then wait for the followup reconfiguration + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId()))); } @@ -210,6 +289,8 @@ public void testUnresponsiveLeaderDetectedEventually() { * defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING)) // then wait for the new leader to commit a state without the old leader + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY + // then wait for the followup reconfiguration + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, // ALSO wait for the leader to notice that its followers are unresponsive @@ -241,6 +322,8 @@ public void testFollowerDisconnectionDetectedQuickly() { // then wait for the exception response + DEFAULT_DELAY_VARIABILITY // then wait for the removal to be committed + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY + // then wait for the followup reconfiguration + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, // ALSO the follower may have just sent a leader check, which receives no response @@ -269,6 +352,8 @@ public void testUnresponsiveFollowerDetectedEventually() { (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING)) * defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING) // then wait for the leader to commit a state without the follower + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY + // then wait for the followup reconfiguration + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, // ALSO wait for the follower to notice the leader is unresponsive @@ -411,7 +496,8 @@ public void testAckListenerReceivesNacksFromFollowerInHigherTerm() { } public void testSettingInitialConfigurationTriggersElection() { - final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + final int nodeCount = randomIntBetween(1, 5); + final Cluster cluster = new Cluster(nodeCount); cluster.runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + randomLongBetween(0, 60000), "initial discovery phase"); for (final ClusterNode clusterNode : cluster.clusterNodes) { final String nodeId = clusterNode.getId(); @@ -426,14 +512,17 @@ public void testSettingInitialConfigurationTriggersElection() { } cluster.getAnyNode().applyInitialConfiguration(); - cluster.stabilise(defaultMillis( + cluster.stabilise( // the first election should succeed, because only one node knows of the initial configuration and therefore can win a // pre-voting round and proceed to an election, so there cannot be any collisions - ELECTION_INITIAL_TIMEOUT_SETTING) // TODO this wait is unnecessary, we could trigger the election immediately - // Allow two round-trip for pre-voting and voting - + 4 * DEFAULT_DELAY_VARIABILITY - // Then a commit of the new leader's first cluster state - + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + defaultMillis(ELECTION_INITIAL_TIMEOUT_SETTING) // TODO this wait is unnecessary, we could trigger the election immediately + // Allow two round-trip for pre-voting and voting + + 4 * DEFAULT_DELAY_VARIABILITY + // Then a commit of the new leader's first cluster state + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY + // Then allow time for all the other nodes to join, each of which might cause a reconfiguration + + (nodeCount - 1) * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY + ); } public void testCannotSetInitialConfigurationTwice() { @@ -533,13 +622,18 @@ class Cluster { Cluster(int initialNodeCount) { deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); - logger.info("--> creating cluster of {} nodes", initialNodeCount); + assertThat(initialNodeCount, greaterThan(0)); - Set initialNodeIds = new HashSet<>(initialNodeCount); - for (int i = 0; i < initialNodeCount; i++) { - initialNodeIds.add(nodeIdFromIndex(i)); + final Set initialConfigurationNodeIds = new HashSet<>(initialNodeCount); + while (initialConfigurationNodeIds.isEmpty()) { + for (int i = 0; i < initialNodeCount; i++) { + if (randomBoolean()) { + initialConfigurationNodeIds.add(nodeIdFromIndex(i)); + } + } } - initialConfiguration = new VotingConfiguration(initialNodeIds); + initialConfiguration = new VotingConfiguration(initialConfigurationNodeIds); + logger.info("--> creating cluster of {} nodes with initial configuration {}", initialNodeCount, initialConfiguration); clusterNodes = new ArrayList<>(initialNodeCount); for (int i = 0; i < initialNodeCount; i++) { @@ -548,6 +642,17 @@ class Cluster { } } + void addNodesAndStabilise(int newNodesCount) { + addNodes(newNodesCount); + stabilise( + // The first pinging discovers the master + defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) + // One message delay to send a join + + DEFAULT_DELAY_VARIABILITY + // Commit a new cluster state with the new node(s). Might be split into multiple commits + + newNodesCount * DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + } + void addNodes(int newNodesCount) { logger.info("--> adding {} nodes", newNodesCount); @@ -591,6 +696,15 @@ void runRandomly() { thisStep, newValue, clusterNode.getId()); clusterNode.submitValue(newValue); }).run(); + } else if (rarely()) { + final ClusterNode clusterNode = getAnyNodePreferringLeaders(); + final int masterNodeFailureTolerance = randomIntBetween(0, 2); + onNode(clusterNode.getLocalNode(), + () -> { + logger.debug("----> [runRandomly {}] setting master-node fault tolerance to {} on {}", + thisStep, masterNodeFailureTolerance, clusterNode.getId()); + clusterNode.submitSetMasterNodesFailureTolerance(masterNodeFailureTolerance); + }).run(); } else if (rarely()) { final ClusterNode clusterNode = getAnyNode(); onNode(clusterNode.getLocalNode(), () -> { @@ -692,7 +806,51 @@ void stabilise(long stabilisationDurationMillis) { runFor(stabilisationDurationMillis, "stabilising"); fixLag(); - assertUniqueLeaderAndExpectedModes(); + + final ClusterNode leader = getAnyLeader(); + final long leaderTerm = leader.coordinator.getCurrentTerm(); + final Matcher isPresentAndEqualToLeaderVersion = equalTo(leader.coordinator.getLastAcceptedState().getVersion()); + + assertTrue(leader.getLastAppliedClusterState().getNodes().nodeExists(leader.getId())); + assertThat(leader.getLastAppliedClusterState().getVersion(), isPresentAndEqualToLeaderVersion); + + for (final ClusterNode clusterNode : clusterNodes) { + final String nodeId = clusterNode.getId(); + assertFalse(nodeId + " should not have an active publication", clusterNode.coordinator.publicationInProgress()); + + if (clusterNode == leader) { + continue; + } + + if (isConnectedPair(leader, clusterNode)) { + assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER)); + assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm)); + assertTrue(nodeId + " has voted for the leader", leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode())); + // TODO assert that this node's accepted and committed states are the same as the leader's + + assertTrue(nodeId + " is in the leader's applied state", + leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); + } else { + assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE)); + assertFalse(nodeId + " is not in the leader's applied state", + leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); + } + } + + final Set connectedNodeIds + = clusterNodes.stream().filter(n -> isConnectedPair(leader, n)).map(ClusterNode::getId).collect(Collectors.toSet()); + + assertThat(leader.getLastAppliedClusterState().getNodes().getSize(), equalTo(connectedNodeIds.size())); + + final ClusterState lastAcceptedState = leader.coordinator.getLastAcceptedState(); + final VotingConfiguration lastCommittedConfiguration = lastAcceptedState.getLastCommittedConfiguration(); + assertTrue(connectedNodeIds + " should be a quorum of " + lastCommittedConfiguration, + lastCommittedConfiguration.hasQuorum(connectedNodeIds)); + + assertThat("no reconfiguration is in progress", + lastAcceptedState.getLastCommittedConfiguration(), equalTo(lastAcceptedState.getLastAcceptedConfiguration())); + assertThat("current configuration is already optimal", + leader.coordinator.reconfigureIfPossible(lastAcceptedState), sameInstance(lastAcceptedState)); } // TODO remove this when lag detection is implemented @@ -711,8 +869,8 @@ void fixLag() { } }).run(); runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY - // may need to bump terms too - + DEFAULT_ELECTION_DELAY, + // may need to bump terms too + + DEFAULT_ELECTION_DELAY, "re-stabilising after lag-fixing publication"); } else { logger.info("--> fixLag found no lag, leader={}, leaderVersion={}, minVersion={}", leader, leaderVersion, minVersion); @@ -755,42 +913,6 @@ private boolean isConnectedPair(ClusterNode n1, ClusterNode n2) { && getConnectionStatus(n2.getLocalNode(), n1.getLocalNode()) == ConnectionStatus.CONNECTED); } - private void assertUniqueLeaderAndExpectedModes() { - final ClusterNode leader = getAnyLeader(); - final long leaderTerm = leader.coordinator.getCurrentTerm(); - Matcher isPresentAndEqualToLeaderVersion - = equalTo(leader.coordinator.getLastAcceptedState().getVersion()); - - assertTrue(leader.getLastAppliedClusterState().getNodes().nodeExists(leader.getId())); - assertThat(leader.getLastAppliedClusterState().getVersion(), isPresentAndEqualToLeaderVersion); - - for (final ClusterNode clusterNode : clusterNodes) { - final String nodeId = clusterNode.getId(); - assertFalse(nodeId + " should not have an active publication", clusterNode.coordinator.publicationInProgress()); - - if (clusterNode == leader) { - continue; - } - - if (isConnectedPair(leader, clusterNode)) { - assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER)); - assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm)); - assertTrue(nodeId + " has voted for the leader", leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode())); - // TODO assert that this node's accepted and committed states are the same as the leader's - - assertTrue(nodeId + " is in the leader's applied state", - leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); - } else { - assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE)); - assertFalse(nodeId + " is not in the leader's applied state", - leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); - } - } - - int connectedNodeCount = Math.toIntExact(clusterNodes.stream().filter(n -> isConnectedPair(leader, n)).count()); - assertThat(leader.getLastAppliedClusterState().getNodes().getSize(), equalTo(connectedNodeCount)); - } - ClusterNode getAnyLeader() { List allLeaders = clusterNodes.stream().filter(ClusterNode::isLeader).collect(Collectors.toList()); assertThat("leaders", allLeaders, not(empty())); @@ -938,8 +1060,9 @@ protected void onBlackholedDuringSend(long requestId, String action, DiscoveryNo transportService = mockTransport.createTransportService( settings, deterministicTaskQueue.getThreadPool(runnable -> onNode(localNode, runnable)), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, null, emptySet()); - coordinator = new Coordinator(settings, transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY), - masterService, this::getPersistedState, Cluster.this::provideUnicastHosts, clusterApplier, Randomness.get()); + coordinator = new Coordinator(settings, clusterSettings, transportService, + ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState, + Cluster.this::provideUnicastHosts, clusterApplier, Randomness.get()); masterService.setClusterStatePublisher(coordinator); transportService.start(); @@ -969,6 +1092,19 @@ void setClusterStateApplyResponse(ClusterStateApplyResponse clusterStateApplyRes this.clusterStateApplyResponse = clusterStateApplyResponse; } + void submitSetMasterNodesFailureTolerance(final int masterNodesFaultTolerance) { + submitUpdateTask("set master nodes failure tolerance [" + masterNodesFaultTolerance + "]", cs -> + cs.getLastAcceptedConfiguration().getNodeIds().size() < 2 * masterNodesFaultTolerance + 1 ? cs : + ClusterState.builder(cs).metaData( + MetaData.builder(cs.metaData()) + .persistentSettings(Settings.builder() + .put(cs.metaData().persistentSettings()) + .put(Reconfigurator.CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey(), masterNodesFaultTolerance) + .build()) + .build()) + .build()); + } + AckCollector submitValue(final long value) { return submitUpdateTask("new value [" + value + "]", cs -> setValue(cs, value)); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index dd66ebc9e5a2c..cbb45bd6a21f8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -156,11 +156,12 @@ protected void onSendRequest(long requestId, String action, TransportRequest req } } }; + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); TransportService transportService = capturingTransport.createTransportService(Settings.EMPTY, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> initialState.nodes().getLocalNode(), - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), Collections.emptySet()); - coordinator = new Coordinator(Settings.EMPTY, + clusterSettings, Collections.emptySet()); + coordinator = new Coordinator(Settings.EMPTY, clusterSettings, transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java index 67a6f0421f96c..2d296890d930b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java @@ -86,8 +86,8 @@ public Map> getDiscoveryTypes(ThreadPool threadPool, () -> new InMemoryPersistedState(0L, ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)) .nodes(DiscoveryNodes.builder().add(transportService.getLocalNode()) .localNodeId(transportService.getLocalNode().getId()).build()).build()); - return new Coordinator(fixedSettings, transportService, allocationService, masterService, persistedStateSupplier, - hostsProvider, clusterApplier, new Random(Randomness.get().nextLong())); + return new Coordinator(fixedSettings, clusterSettings, transportService, allocationService, masterService, + persistedStateSupplier, hostsProvider, clusterApplier, new Random(Randomness.get().nextLong())); } else { return new TestZenDiscovery(fixedSettings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, clusterSettings, hostsProvider, allocationService); From 54eca68255f2a08c80f25743b68cecc808659f61 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 18 Oct 2018 14:42:37 +0100 Subject: [PATCH 02/25] Adding each node might take two publications due to reconfiguration --- .../elasticsearch/cluster/coordination/CoordinatorTests.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 441299259e819..4801a46c36fdd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -649,8 +649,9 @@ void addNodesAndStabilise(int newNodesCount) { defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) // One message delay to send a join + DEFAULT_DELAY_VARIABILITY - // Commit a new cluster state with the new node(s). Might be split into multiple commits - + newNodesCount * DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + // Commit a new cluster state with the new node(s). Might be split into multiple commits, and each might need a + // followup reconfiguration + + newNodesCount * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY); } void addNodes(int newNodesCount) { From 96fba05d20d199af157c54ff6f3c41f35957c800 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 18 Oct 2018 14:43:33 +0100 Subject: [PATCH 03/25] TODO --- .../org/elasticsearch/cluster/coordination/CoordinatorTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 4801a46c36fdd..83f5d8073db02 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -1096,6 +1096,7 @@ void setClusterStateApplyResponse(ClusterStateApplyResponse clusterStateApplyRes void submitSetMasterNodesFailureTolerance(final int masterNodesFaultTolerance) { submitUpdateTask("set master nodes failure tolerance [" + masterNodesFaultTolerance + "]", cs -> cs.getLastAcceptedConfiguration().getNodeIds().size() < 2 * masterNodesFaultTolerance + 1 ? cs : + // TODO this rejects invalid updates, but in fact this should be validated elsewhere. Where? ClusterState.builder(cs).metaData( MetaData.builder(cs.metaData()) .persistentSettings(Settings.builder() From 8d50dc7b33d2406efc078fd06ec88a1de67d9384 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 18 Oct 2018 14:54:32 +0100 Subject: [PATCH 04/25] Assert specifics about the configuration --- .../coordination/CoordinatorTests.java | 33 +++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 83f5d8073db02..2528bfd55d77b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -145,8 +145,23 @@ public void testExpandsConfigurationWhenNodesJoinAndContractsWhenTheyLeave() { cluster.runRandomly(); cluster.stabilise(); + final ClusterNode leader = cluster.getAnyLeader(); + cluster.addNodesAndStabilise(2); + { + assertThat(leader.coordinator.getMode(), is(Mode.LEADER)); + final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be all nodes", lastCommittedConfiguration.getNodeIds(), + equalTo(cluster.clusterNodes.stream().map(ClusterNode::getId).collect(Collectors.toSet()))); + } + cluster.addNodesAndStabilise(2); + { + assertThat(leader.coordinator.getMode(), is(Mode.LEADER)); + final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be all nodes", lastCommittedConfiguration.getNodeIds(), + equalTo(cluster.clusterNodes.stream().map(ClusterNode::getId).collect(Collectors.toSet()))); + } final ClusterNode disconnect1 = cluster.getAnyNode(); final ClusterNode disconnect2 = cluster.getAnyNodeExcept(disconnect1); @@ -156,13 +171,27 @@ public void testExpandsConfigurationWhenNodesJoinAndContractsWhenTheyLeave() { disconnect2.disconnect(); cluster.stabilise(); + { + final ClusterNode newLeader = cluster.getAnyLeader(); + final VotingConfiguration lastCommittedConfiguration = newLeader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be 3 nodes", lastCommittedConfiguration.getNodeIds().size(), equalTo(3)); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect1.getId())); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect2.getId())); + } + final ClusterNode disconnect3 = cluster.getAnyNodeExcept(disconnect1, disconnect2); logger.info("--> disconnecting {}", disconnect3); disconnect3.disconnect(); cluster.stabilise(); - VotingConfiguration lastCommittedConfiguration = cluster.getAnyLeader().getLastAppliedClusterState().getLastCommittedConfiguration(); - assertThat(lastCommittedConfiguration + " should be a single node", lastCommittedConfiguration.getNodeIds().size(), equalTo(1)); + { + final ClusterNode newLeader = cluster.getAnyLeader(); + final VotingConfiguration lastCommittedConfiguration = newLeader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be 1 node", lastCommittedConfiguration.getNodeIds().size(), equalTo(1)); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect1.getId())); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect2.getId())); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect3.getId())); + } } public void testDoesNotShrinkConfigurationDueToLossToleranceConfigurationWithThreeNodes() { From 81c80944851a777a4e19219810de88126e811dc5 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 08:51:24 +0100 Subject: [PATCH 05/25] Use local method --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index fbc9d4724ae12..bba353574ddd9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -605,7 +605,7 @@ ClusterState reconfigureIfPossible(ClusterState clusterState) { synchronized (mutex) { if (mode == Mode.LEADER) { final Set liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false) - .filter(coordinationState.get()::containsJoinVoteFor).collect(Collectors.toSet()); + .filter(this::hasJoinVoteFrom).collect(Collectors.toSet()); final ClusterState.VotingConfiguration newConfig = reconfigurator.reconfigure( liveNodes, emptySet(), clusterState.getLastAcceptedConfiguration()); if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) { From 447f26fbb7f4678a72769b924062025b6c643dcd Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 09:02:44 +0100 Subject: [PATCH 06/25] Rename to improveConfiguration, always calculates better configuration --- .../cluster/coordination/Coordinator.java | 28 +++++++++---------- .../coordination/CoordinatorTests.java | 8 +++++- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index bba353574ddd9..691140fbda5d8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -601,18 +601,16 @@ public void setInitialConfiguration(final VotingConfiguration votingConfiguratio } // Package-private for testing - ClusterState reconfigureIfPossible(ClusterState clusterState) { - synchronized (mutex) { - if (mode == Mode.LEADER) { - final Set liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false) - .filter(this::hasJoinVoteFrom).collect(Collectors.toSet()); - final ClusterState.VotingConfiguration newConfig = reconfigurator.reconfigure( - liveNodes, emptySet(), clusterState.getLastAcceptedConfiguration()); - if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) { - assert coordinationState.get().joinVotesHaveQuorumFor(newConfig); - return ClusterState.builder(clusterState).lastAcceptedConfiguration(newConfig).build(); - } - } + ClusterState improveConfiguration(ClusterState clusterState) { + assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; + + final Set liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false) + .filter(this::hasJoinVoteFrom).collect(Collectors.toSet()); + final ClusterState.VotingConfiguration newConfig = reconfigurator.reconfigure( + liveNodes, emptySet(), clusterState.getLastAcceptedConfiguration()); + if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) { + assert coordinationState.get().joinVotesHaveQuorumFor(newConfig); + return ClusterState.builder(clusterState).lastAcceptedConfiguration(newConfig).build(); } return clusterState; @@ -626,13 +624,15 @@ private void scheduleReconfigurationIfNeeded() { assert currentPublication.isPresent() == false : "Expected no publication in progress"; final ClusterState state = getLastAcceptedState(); - if (reconfigureIfPossible(state) != state && reconfigurationTaskScheduled.compareAndSet(false, true)) { + if (improveConfiguration(state) != state && reconfigurationTaskScheduled.compareAndSet(false, true)) { logger.trace("scheduling reconfiguration"); masterService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { reconfigurationTaskScheduled.set(false); - return reconfigureIfPossible(currentState); + synchronized (mutex) { + return improveConfiguration(currentState); + } } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 2528bfd55d77b..d3a8654798621 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -880,7 +880,7 @@ void stabilise(long stabilisationDurationMillis) { assertThat("no reconfiguration is in progress", lastAcceptedState.getLastCommittedConfiguration(), equalTo(lastAcceptedState.getLastAcceptedConfiguration())); assertThat("current configuration is already optimal", - leader.coordinator.reconfigureIfPossible(lastAcceptedState), sameInstance(lastAcceptedState)); + leader.improveConfiguration(lastAcceptedState), sameInstance(lastAcceptedState)); } // TODO remove this when lag detection is implemented @@ -1118,6 +1118,12 @@ boolean isLeader() { return coordinator.getMode() == LEADER; } + ClusterState improveConfiguration(ClusterState currentState) { + synchronized (coordinator.mutex) { + return coordinator.improveConfiguration(currentState); + } + } + void setClusterStateApplyResponse(ClusterStateApplyResponse clusterStateApplyResponse) { this.clusterStateApplyResponse = clusterStateApplyResponse; } From a6f50e3f4a00ed87b600ef325af463c3fbafa3c6 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 09:09:47 +0100 Subject: [PATCH 07/25] Cluster#size() --- .../cluster/coordination/CoordinatorTests.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index d3a8654798621..990be1e3664e4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -525,8 +525,7 @@ public void testAckListenerReceivesNacksFromFollowerInHigherTerm() { } public void testSettingInitialConfigurationTriggersElection() { - final int nodeCount = randomIntBetween(1, 5); - final Cluster cluster = new Cluster(nodeCount); + final Cluster cluster = new Cluster(randomIntBetween(1, 5)); cluster.runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + randomLongBetween(0, 60000), "initial discovery phase"); for (final ClusterNode clusterNode : cluster.clusterNodes) { final String nodeId = clusterNode.getId(); @@ -550,7 +549,7 @@ public void testSettingInitialConfigurationTriggersElection() { // Then a commit of the new leader's first cluster state + DEFAULT_CLUSTER_STATE_UPDATE_DELAY // Then allow time for all the other nodes to join, each of which might cause a reconfiguration - + (nodeCount - 1) * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY + + (cluster.size() - 1) * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY ); } @@ -693,6 +692,10 @@ void addNodes(int newNodesCount) { } } + int size() { + return clusterNodes.size(); + } + void runRandomly() { // TODO supporting (preserving?) existing disruptions needs implementing if needed, for now we just forbid it From 4b0986f857257b18db46de6f760460a58b7cdd04 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 09:14:59 +0100 Subject: [PATCH 08/25] Add assertions, better messages --- .../coordination/CoordinatorTests.java | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 990be1e3664e4..5396e476c0601 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -842,10 +842,11 @@ void stabilise(long stabilisationDurationMillis) { final ClusterNode leader = getAnyLeader(); final long leaderTerm = leader.coordinator.getCurrentTerm(); - final Matcher isPresentAndEqualToLeaderVersion = equalTo(leader.coordinator.getLastAcceptedState().getVersion()); + final Matcher isEqualToLeaderVersion = equalTo(leader.coordinator.getLastAcceptedState().getVersion()); + final String leaderId = leader.getId(); - assertTrue(leader.getLastAppliedClusterState().getNodes().nodeExists(leader.getId())); - assertThat(leader.getLastAppliedClusterState().getVersion(), isPresentAndEqualToLeaderVersion); + assertTrue(leader.getLastAppliedClusterState().getNodes().nodeExists(leaderId)); + assertThat(leader.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion); for (final ClusterNode clusterNode : clusterNodes) { final String nodeId = clusterNode.getId(); @@ -856,16 +857,20 @@ void stabilise(long stabilisationDurationMillis) { } if (isConnectedPair(leader, clusterNode)) { - assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER)); - assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm)); - assertTrue(nodeId + " has voted for the leader", leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode())); - // TODO assert that this node's accepted and committed states are the same as the leader's - - assertTrue(nodeId + " is in the leader's applied state", + assertThat(nodeId + " is a follower of " + leaderId, clusterNode.coordinator.getMode(), is(FOLLOWER)); + assertThat(nodeId + " has the same term as " + leaderId, clusterNode.coordinator.getCurrentTerm(), is(leaderTerm)); + assertTrue(nodeId + " has voted for " + leaderId, leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode())); + assertThat(nodeId + " has the same accepted state as " + leaderId, + clusterNode.coordinator.getLastAcceptedState().getVersion(), isEqualToLeaderVersion); + assertThat(nodeId + " has the same applied state as " + leaderId, + clusterNode.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion); + assertTrue(nodeId + " is in its own latest applied state", + clusterNode.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); + assertTrue(nodeId + " is in the latest applied state on " + leaderId, leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); } else { - assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE)); - assertFalse(nodeId + " is not in the leader's applied state", + assertThat(nodeId + " is not following " + leaderId, clusterNode.coordinator.getMode(), is(CANDIDATE)); + assertFalse(nodeId + " is not in the applied state on " + leaderId, leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); } } From 50abb94f5507fe3474b7121beea139ca55e91240 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 10:03:36 +0100 Subject: [PATCH 09/25] Assert that last-committed and last-accepted states are equal --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 691140fbda5d8..cc200b10fc9ce 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -533,6 +533,12 @@ public void invariant() { assert lastPublishedNodes.equals(followersChecker.getKnownFollowers()) : lastPublishedNodes + " != " + followersChecker.getKnownFollowers(); } + + assert becomingMaster || activePublication || + coordinationState.get().getLastAcceptedConfiguration().equals(coordinationState.get().getLastCommittedConfiguration()) + : coordinationState.get().getLastAcceptedConfiguration() + " != " + + coordinationState.get().getLastCommittedConfiguration(); + } else if (mode == Mode.FOLLOWER) { assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false"; assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false); From 4fd70fd9d2c61b5d1a092d57017e6bb09e51f3bb Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 10:52:21 +0100 Subject: [PATCH 10/25] Better log message --- .../elasticsearch/cluster/coordination/CoordinatorTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 5396e476c0601..a7ffda338e24c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -717,7 +717,7 @@ void runRandomly() { if (randomSteps <= step && finishTime == -1) { finishTime = deterministicTaskQueue.getLatestDeferredExecutionTime(); deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); - logger.debug("----> [runRandomly {}] reducing delay variability and running until [{}]", step, finishTime); + logger.debug("----> [runRandomly {}] reducing delay variability and running until [{}ms]", step, finishTime); } try { From 37d95bb08650428829f60fa9fb215fd647fda556 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 11:05:20 +0100 Subject: [PATCH 11/25] log when IO exceptions are simulated --- .../org/elasticsearch/cluster/coordination/CoordinatorTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index a7ffda338e24c..77bb8e79c0daa 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -999,6 +999,7 @@ class MockPersistedState extends InMemoryPersistedState { private void possiblyFail(String description) { if (disruptStorage && rarely()) { // TODO revisit this when we've decided how PersistedState should throw exceptions + logger.trace("simulating IO exception [{}]", description); if (randomBoolean()) { throw new UncheckedIOException(new IOException("simulated IO exception [" + description + ']')); } else { From b53cf0c42bc663ae3f737240f59409097998afac Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 11:28:10 +0100 Subject: [PATCH 12/25] Handle failure when bumping term by standing down --- .../elasticsearch/cluster/coordination/Coordinator.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index cc200b10fc9ce..3c5eb6ec752b2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -278,8 +278,13 @@ private void updateMaxTermSeen(final long term) { logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump", maxTermSeen, currentTerm); } else { - ensureTermAtLeast(getLocalNode(), maxTermSeen); - startElection(); + try { + ensureTermAtLeast(getLocalNode(), maxTermSeen); + startElection(); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("failed to bump term to {}", maxTermSeen), e); + becomeCandidate("updateMaxTermSeen"); + } } } } From aea29ed4a6df56a55ec8479c67626c786659b157 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 12:13:17 +0100 Subject: [PATCH 13/25] TODO --- .../org/elasticsearch/cluster/coordination/Reconfigurator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java index 64459eadda471..b08d81ac8e457 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java @@ -52,6 +52,7 @@ public class Reconfigurator extends AbstractComponent { Setting.intSetting("cluster.master_nodes_failure_tolerance", 0, 0, Property.NodeScope, Property.Dynamic); // the default is not supposed to be important since we expect to set this setting explicitly at bootstrapping time // TODO contemplate setting the default to something larger than 0 (1? 1<<30?) + // TODO prevent this being set as a transient or a per-node setting? private volatile int masterNodesFailureTolerance; From 9cfc15e6aa4fe3024879dd746e8111dade67738a Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 12:14:57 +0100 Subject: [PATCH 14/25] TODO --- .../org/elasticsearch/cluster/coordination/CoordinatorTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 77bb8e79c0daa..d18fedb839eb6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -680,6 +680,7 @@ void addNodesAndStabilise(int newNodesCount) { // Commit a new cluster state with the new node(s). Might be split into multiple commits, and each might need a // followup reconfiguration + newNodesCount * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + // TODO Investigate whether 4 publications is sufficient due to batching? A bound linear in the number of nodes isn't great. } void addNodes(int newNodesCount) { From dbdaf0b26e23f2ebc11fa31438be6a16dfe1167c Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 12:15:25 +0100 Subject: [PATCH 15/25] TODO --- .../org/elasticsearch/cluster/coordination/CoordinatorTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index d18fedb839eb6..1d3163cfa30a1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -550,6 +550,7 @@ public void testSettingInitialConfigurationTriggersElection() { + DEFAULT_CLUSTER_STATE_UPDATE_DELAY // Then allow time for all the other nodes to join, each of which might cause a reconfiguration + (cluster.size() - 1) * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY + // TODO Investigate whether 4 publications is sufficient due to batching? A bound linear in the number of nodes isn't great. ); } From 5692417e2e6965df65dad6fbb694b5398c01d620 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 12:44:40 +0100 Subject: [PATCH 16/25] Finer-grained tests --- .../coordination/CoordinatorTests.java | 145 +++++++++++++++++- 1 file changed, 140 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 1d3163cfa30a1..c024078e7dcec 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -85,6 +85,7 @@ import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING; import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING; import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING; +import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_MASTER_NODES_FAILURE_TOLERANCE; import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; @@ -140,14 +141,17 @@ public void testNodesJoinAfterStableCluster() { assertEquals(currentTerm, newTerm); } - public void testExpandsConfigurationWhenNodesJoinAndContractsWhenTheyLeave() { + public void testExpandsConfigurationWhenGrowingFromOneToThreeNodesAndShrinksOnFailure() { final Cluster cluster = new Cluster(1); cluster.runRandomly(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); + assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(0)); + cluster.addNodesAndStabilise(2); + { assertThat(leader.coordinator.getMode(), is(Mode.LEADER)); final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration(); @@ -155,7 +159,33 @@ public void testExpandsConfigurationWhenNodesJoinAndContractsWhenTheyLeave() { equalTo(cluster.clusterNodes.stream().map(ClusterNode::getId).collect(Collectors.toSet()))); } + final ClusterNode disconnect1 = cluster.getAnyNode(); + logger.info("--> disconnecting {}", disconnect1); + disconnect1.disconnect(); + cluster.stabilise(); + + { + final ClusterNode newLeader = cluster.getAnyLeader(); + final VotingConfiguration lastCommittedConfiguration = newLeader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be 1 node", lastCommittedConfiguration.getNodeIds().size(), equalTo(1)); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect1.getId())); + } + } + + public void testExpandsConfigurationWhenGrowingFromThreeToFiveNodesAndShrinksOnFailure() { + final Cluster cluster = new Cluster(3); + cluster.runRandomly(); + cluster.stabilise(); + + final ClusterNode leader = cluster.getAnyLeader(); + + logger.info("setting fault tolerance to 1"); + leader.submitSetMasterNodesFailureTolerance(1); + cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(1)); + cluster.addNodesAndStabilise(2); + { assertThat(leader.coordinator.getMode(), is(Mode.LEADER)); final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration(); @@ -165,7 +195,6 @@ public void testExpandsConfigurationWhenNodesJoinAndContractsWhenTheyLeave() { final ClusterNode disconnect1 = cluster.getAnyNode(); final ClusterNode disconnect2 = cluster.getAnyNodeExcept(disconnect1); - logger.info("--> disconnecting {} and {}", disconnect1, disconnect2); disconnect1.disconnect(); disconnect2.disconnect(); @@ -179,6 +208,8 @@ public void testExpandsConfigurationWhenNodesJoinAndContractsWhenTheyLeave() { assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect2.getId())); } + // we still tolerate the loss of one more node here + final ClusterNode disconnect3 = cluster.getAnyNodeExcept(disconnect1, disconnect2); logger.info("--> disconnecting {}", disconnect3); disconnect3.disconnect(); @@ -187,10 +218,114 @@ public void testExpandsConfigurationWhenNodesJoinAndContractsWhenTheyLeave() { { final ClusterNode newLeader = cluster.getAnyLeader(); final VotingConfiguration lastCommittedConfiguration = newLeader.getLastAppliedClusterState().getLastCommittedConfiguration(); - assertThat(lastCommittedConfiguration + " should be 1 node", lastCommittedConfiguration.getNodeIds().size(), equalTo(1)); + assertThat(lastCommittedConfiguration + " should be 3 nodes", lastCommittedConfiguration.getNodeIds().size(), equalTo(3)); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect1.getId())); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect2.getId())); + assertTrue(lastCommittedConfiguration.getNodeIds().contains(disconnect3.getId())); + } + + // however we do not tolerate the loss of yet another one + + final ClusterNode disconnect4 = cluster.getAnyNodeExcept(disconnect1, disconnect2, disconnect3); + logger.info("--> disconnecting {}", disconnect4); + disconnect4.disconnect(); + cluster.runFor(DEFAULT_STABILISATION_TIME, "allowing time for fault detection"); + + for (final ClusterNode clusterNode : cluster.clusterNodes) { + assertThat(clusterNode.getId() + " should be a candidate", clusterNode.coordinator.getMode(), equalTo(Mode.CANDIDATE)); + } + + // moreover we are still stuck even if two other nodes heal + logger.info("--> healing {} and {}", disconnect1, disconnect2); + disconnect1.heal(); + disconnect2.heal(); + cluster.runFor(DEFAULT_STABILISATION_TIME, "allowing time for fault detection"); + + for (final ClusterNode clusterNode : cluster.clusterNodes) { + assertThat(clusterNode.getId() + " should be a candidate", clusterNode.coordinator.getMode(), equalTo(Mode.CANDIDATE)); + } + + // we require another node to heal to recover + final ClusterNode toHeal = randomBoolean() ? disconnect3 : disconnect4; + logger.info("--> healing {}", toHeal); + toHeal.heal(); + cluster.stabilise(); + } + + public void testCanShrinkFromFiveNodesToThree() { + final Cluster cluster = new Cluster(5); + cluster.runRandomly(); + cluster.stabilise(); + + final ClusterNode leader = cluster.getAnyLeader(); + + logger.info("setting fault tolerance to 2"); + leader.submitSetMasterNodesFailureTolerance(2); + cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(2)); + + final ClusterNode disconnect1 = cluster.getAnyNode(); + final ClusterNode disconnect2 = cluster.getAnyNodeExcept(disconnect1); + + logger.info("--> disconnecting {} and {}", disconnect1, disconnect2); + disconnect1.disconnect(); + disconnect2.disconnect(); + cluster.stabilise(); + + { + assertThat(leader.coordinator.getMode(), is(Mode.LEADER)); + final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be all nodes", lastCommittedConfiguration.getNodeIds(), + equalTo(cluster.clusterNodes.stream().map(ClusterNode::getId).collect(Collectors.toSet()))); + } + + cluster.getAnyLeader().submitSetMasterNodesFailureTolerance(1); + cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY * 2); // allow for a reconfiguration + assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(1)); + + { + final ClusterNode newLeader = cluster.getAnyLeader(); + final VotingConfiguration lastCommittedConfiguration = newLeader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be 3 nodes", lastCommittedConfiguration.getNodeIds().size(), equalTo(3)); assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect1.getId())); assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect2.getId())); - assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect3.getId())); + } + } + + public void testCanShrinkFromThreeNodesToTwo() { + final Cluster cluster = new Cluster(3); + cluster.runRandomly(); + cluster.stabilise(); + + final ClusterNode leader = cluster.getAnyLeader(); + + logger.info("setting fault tolerance to 1"); + leader.submitSetMasterNodesFailureTolerance(1); + cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(1)); + + final ClusterNode disconnect1 = cluster.getAnyNode(); + + logger.info("--> disconnecting {}", disconnect1); + disconnect1.disconnect(); + cluster.stabilise(); + + { + assertThat(leader.coordinator.getMode(), is(Mode.LEADER)); + final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be all nodes", lastCommittedConfiguration.getNodeIds(), + equalTo(cluster.clusterNodes.stream().map(ClusterNode::getId).collect(Collectors.toSet()))); + } + + cluster.getAnyLeader().submitSetMasterNodesFailureTolerance(0); + cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY * 2); // allow for a reconfiguration + assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(0)); + + { + final ClusterNode newLeader = cluster.getAnyLeader(); + final VotingConfiguration lastCommittedConfiguration = newLeader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be 1 node", lastCommittedConfiguration.getNodeIds().size(), equalTo(1)); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect1.getId())); } } @@ -1147,7 +1282,7 @@ void submitSetMasterNodesFailureTolerance(final int masterNodesFaultTolerance) { MetaData.builder(cs.metaData()) .persistentSettings(Settings.builder() .put(cs.metaData().persistentSettings()) - .put(Reconfigurator.CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey(), masterNodesFaultTolerance) + .put(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey(), masterNodesFaultTolerance) .build()) .build()) .build()); From 17ff4095ffb96360b189c63b036a929149ab1066 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 12:56:46 +0100 Subject: [PATCH 17/25] URGENT --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 3c5eb6ec752b2..957751746a58e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -637,7 +637,7 @@ private void scheduleReconfigurationIfNeeded() { final ClusterState state = getLastAcceptedState(); if (improveConfiguration(state) != state && reconfigurationTaskScheduled.compareAndSet(false, true)) { logger.trace("scheduling reconfiguration"); - masterService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask() { + masterService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { reconfigurationTaskScheduled.set(false); From c9b976ecd1f520c3f8210bd1d91c354aba86f10f Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 13:00:15 +0100 Subject: [PATCH 18/25] Do not schedule reconfiguration if still becoming master --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 957751746a58e..8b76473e88252 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -667,7 +667,8 @@ private void handleJoin(Join join) { if (coordinationState.get().electionWon()) { // if we have already won the election then the actual join does not matter for election purposes, so swallow any exception final boolean isNewJoin = handleJoinIgnoringExceptions(join); - if (isNewJoin && mode == Mode.LEADER && publicationInProgress() == false) { + final boolean establishedAsMaster = mode == Mode.LEADER && getLastAcceptedState().term() == getCurrentTerm(); + if (isNewJoin && establishedAsMaster && publicationInProgress() == false) { scheduleReconfigurationIfNeeded(); } } else { From 1a8a3ae667fd2efd08fbd68e42292988624708f6 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 13:06:05 +0100 Subject: [PATCH 19/25] Comments --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 8b76473e88252..c1dd4a1d375db 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -665,8 +665,12 @@ private void handleJoin(Join join) { ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(this::handleJoin); if (coordinationState.get().electionWon()) { - // if we have already won the election then the actual join does not matter for election purposes, so swallow any exception + // If we have already won the election then the actual join does not matter for election purposes, so swallow any exception final boolean isNewJoin = handleJoinIgnoringExceptions(join); + + // If we haven't completely finished becoming master then there's already a publication scheduled which will, in turn, + // schedule a reconfiguration if needed. It's benign to schedule a reconfiguration anyway, but it might fail if it wins the + // race against the election-winning publication and log a big error message, which we can prevent by checking this here: final boolean establishedAsMaster = mode == Mode.LEADER && getLastAcceptedState().term() == getCurrentTerm(); if (isNewJoin && establishedAsMaster && publicationInProgress() == false) { scheduleReconfigurationIfNeeded(); From a33840eaa751c00e04d89bcd51a67e965db8f4d5 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 13:13:54 +0100 Subject: [PATCH 20/25] Only expect correct state if the cluster applier is working --- .../cluster/coordination/CoordinatorTests.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index c024078e7dcec..ad4b595ccdf3c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -999,10 +999,12 @@ void stabilise(long stabilisationDurationMillis) { assertTrue(nodeId + " has voted for " + leaderId, leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode())); assertThat(nodeId + " has the same accepted state as " + leaderId, clusterNode.coordinator.getLastAcceptedState().getVersion(), isEqualToLeaderVersion); - assertThat(nodeId + " has the same applied state as " + leaderId, - clusterNode.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion); - assertTrue(nodeId + " is in its own latest applied state", - clusterNode.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); + if (clusterNode.getClusterStateApplyResponse() == ClusterStateApplyResponse.SUCCEED) { + assertThat(nodeId + " has the same applied state as " + leaderId, + clusterNode.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion); + assertTrue(nodeId + " is in its own latest applied state", + clusterNode.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); + } assertTrue(nodeId + " is in the latest applied state on " + leaderId, leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); } else { @@ -1274,6 +1276,10 @@ void setClusterStateApplyResponse(ClusterStateApplyResponse clusterStateApplyRes this.clusterStateApplyResponse = clusterStateApplyResponse; } + ClusterStateApplyResponse getClusterStateApplyResponse() { + return clusterStateApplyResponse; + } + void submitSetMasterNodesFailureTolerance(final int masterNodesFaultTolerance) { submitUpdateTask("set master nodes failure tolerance [" + masterNodesFaultTolerance + "]", cs -> cs.getLastAcceptedConfiguration().getNodeIds().size() < 2 * masterNodesFaultTolerance + 1 ? cs : From 1eb28a22d7a0e42d3d2357d575bfde975d6de524 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 13:21:51 +0100 Subject: [PATCH 21/25] It's ok to leave the failure tolerance setting behind --- .../java/org/elasticsearch/test/ESSingleNodeTestCase.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index 200ec07d7a8eb..c3a5dad035190 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -25,6 +25,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.coordination.Reconfigurator; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -123,8 +124,11 @@ public void tearDown() throws Exception { super.tearDown(); assertAcked(client().admin().indices().prepareDelete("*").get()); MetaData metaData = client().admin().cluster().prepareState().get().getState().getMetaData(); - assertThat("test leaves persistent cluster metadata behind: " + metaData.persistentSettings().keySet(), - metaData.persistentSettings().size(), equalTo(0)); + Settings.Builder unexpectedPersistentSettingsBuilder = Settings.builder().put(metaData.persistentSettings()); + unexpectedPersistentSettingsBuilder.remove(Reconfigurator.CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey()); + Settings unexpectedPersistentSettings = unexpectedPersistentSettingsBuilder.build(); + assertThat("test leaves persistent cluster metadata behind: " + unexpectedPersistentSettings.keySet(), + unexpectedPersistentSettings.size(), equalTo(0)); assertThat("test leaves transient cluster metadata behind: " + metaData.transientSettings().keySet(), metaData.transientSettings().size(), equalTo(0)); if (resetNodeAfterTest()) { From 015785c05685d99259d11d173f8caf62362bb01d Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 13:44:34 +0100 Subject: [PATCH 22/25] The leader might be disconnected --- .../coordination/CoordinatorTests.java | 42 ++++++++++--------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index ad4b595ccdf3c..9167aeb470451 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -257,12 +257,13 @@ public void testCanShrinkFromFiveNodesToThree() { cluster.runRandomly(); cluster.stabilise(); - final ClusterNode leader = cluster.getAnyLeader(); - - logger.info("setting fault tolerance to 2"); - leader.submitSetMasterNodesFailureTolerance(2); - cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); - assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(2)); + { + final ClusterNode leader = cluster.getAnyLeader(); + logger.info("setting fault tolerance to 2"); + leader.submitSetMasterNodesFailureTolerance(2); + cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(2)); + } final ClusterNode disconnect1 = cluster.getAnyNode(); final ClusterNode disconnect2 = cluster.getAnyNodeExcept(disconnect1); @@ -272,20 +273,20 @@ public void testCanShrinkFromFiveNodesToThree() { disconnect2.disconnect(); cluster.stabilise(); + final ClusterNode leader = cluster.getAnyLeader(); + { - assertThat(leader.coordinator.getMode(), is(Mode.LEADER)); final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration(); assertThat(lastCommittedConfiguration + " should be all nodes", lastCommittedConfiguration.getNodeIds(), equalTo(cluster.clusterNodes.stream().map(ClusterNode::getId).collect(Collectors.toSet()))); } - cluster.getAnyLeader().submitSetMasterNodesFailureTolerance(1); + leader.submitSetMasterNodesFailureTolerance(1); cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY * 2); // allow for a reconfiguration assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(1)); { - final ClusterNode newLeader = cluster.getAnyLeader(); - final VotingConfiguration lastCommittedConfiguration = newLeader.getLastAppliedClusterState().getLastCommittedConfiguration(); + final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration(); assertThat(lastCommittedConfiguration + " should be 3 nodes", lastCommittedConfiguration.getNodeIds().size(), equalTo(3)); assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect1.getId())); assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect2.getId())); @@ -297,12 +298,13 @@ public void testCanShrinkFromThreeNodesToTwo() { cluster.runRandomly(); cluster.stabilise(); - final ClusterNode leader = cluster.getAnyLeader(); - - logger.info("setting fault tolerance to 1"); - leader.submitSetMasterNodesFailureTolerance(1); - cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); - assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(1)); + { + final ClusterNode leader = cluster.getAnyLeader(); + logger.info("setting fault tolerance to 1"); + leader.submitSetMasterNodesFailureTolerance(1); + cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(1)); + } final ClusterNode disconnect1 = cluster.getAnyNode(); @@ -310,20 +312,20 @@ public void testCanShrinkFromThreeNodesToTwo() { disconnect1.disconnect(); cluster.stabilise(); + final ClusterNode leader = cluster.getAnyLeader(); + { - assertThat(leader.coordinator.getMode(), is(Mode.LEADER)); final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration(); assertThat(lastCommittedConfiguration + " should be all nodes", lastCommittedConfiguration.getNodeIds(), equalTo(cluster.clusterNodes.stream().map(ClusterNode::getId).collect(Collectors.toSet()))); } - cluster.getAnyLeader().submitSetMasterNodesFailureTolerance(0); + leader.submitSetMasterNodesFailureTolerance(0); cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY * 2); // allow for a reconfiguration assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(0)); { - final ClusterNode newLeader = cluster.getAnyLeader(); - final VotingConfiguration lastCommittedConfiguration = newLeader.getLastAppliedClusterState().getLastCommittedConfiguration(); + final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration(); assertThat(lastCommittedConfiguration + " should be 1 node", lastCommittedConfiguration.getNodeIds().size(), equalTo(1)); assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect1.getId())); } From 3c95c5f7b88592061f5bba405e5b4e1cf07ed966 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 14:25:48 +0100 Subject: [PATCH 23/25] Lag detection doesn't fix the situation where the applier failed --- .../elasticsearch/cluster/coordination/CoordinatorTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 9167aeb470451..547381680e4b7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -1035,10 +1035,10 @@ void stabilise(long stabilisationDurationMillis) { // TODO remove this when lag detection is implemented void fixLag() { final ClusterNode leader = getAnyLeader(); - final long leaderVersion = leader.coordinator.getApplierState().version(); + final long leaderVersion = leader.getLastAppliedClusterState().version(); final long minVersion = clusterNodes.stream() .filter(n -> isConnectedPair(n, leader)) - .map(n -> n.coordinator.getApplierState().version()).min(Long::compare).orElse(Long.MIN_VALUE); + .map(n -> n.getLastAppliedClusterState().version()).min(Long::compare).orElse(Long.MIN_VALUE); assert minVersion >= 0; if (minVersion < leaderVersion) { logger.info("--> fixLag publishing a value to fix lag, leaderVersion={}, minVersion={}", leaderVersion, minVersion); From d3975a96dfda0a5a33647506d18b94309e679b1c Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 14:59:54 +0100 Subject: [PATCH 24/25] Allow lag-fixing to take longer if any cluster-appliers are hanging --- .../cluster/coordination/CoordinatorTests.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 547381680e4b7..4cf258dbc715e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -589,9 +589,13 @@ public void testAckListenerReceivesNoAckFromHangingFollower() { final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0); + logger.info("--> blocking cluster state application on {}", follower0); follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.HANG); + + logger.info("--> publishing another value"); AckCollector ackCollector = leader.submitValue(randomLong()); cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing value"); + assertTrue("expected immediate ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1)); assertFalse("expected no ack from " + leader, ackCollector.hasAckedSuccessfully(leader)); cluster.stabilise(); @@ -984,8 +988,8 @@ void stabilise(long stabilisationDurationMillis) { final Matcher isEqualToLeaderVersion = equalTo(leader.coordinator.getLastAcceptedState().getVersion()); final String leaderId = leader.getId(); - assertTrue(leader.getLastAppliedClusterState().getNodes().nodeExists(leaderId)); - assertThat(leader.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion); + assertTrue(leaderId + " exists in its last-applied state", leader.getLastAppliedClusterState().getNodes().nodeExists(leaderId)); + assertThat(leaderId + " has applied its state ", leader.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion); for (final ClusterNode clusterNode : clusterNodes) { final String nodeId = clusterNode.getId(); @@ -1047,10 +1051,15 @@ void fixLag() { leader.submitValue(randomLong()); } }).run(); + runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY // may need to bump terms too + DEFAULT_ELECTION_DELAY, "re-stabilising after lag-fixing publication"); + + if (clusterNodes.stream().anyMatch(n -> n.getClusterStateApplyResponse().equals(ClusterStateApplyResponse.HANG))) { + runFor(defaultMillis(PUBLISH_TIMEOUT_SETTING), "allowing lag-fixing publication to time out"); + } } else { logger.info("--> fixLag found no lag, leader={}, leaderVersion={}, minVersion={}", leader, leaderVersion, minVersion); } From 56fac1772734750ad7d6c2bf1c9c09790ca80527 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 19 Oct 2018 15:23:42 +0100 Subject: [PATCH 25/25] Ignore leftover fault-tolerance setting in ITs --- .../org/elasticsearch/test/ESIntegTestCase.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 22f89edaa0376..76369e42281df 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -67,6 +67,7 @@ import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.coordination.Reconfigurator; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -551,15 +552,17 @@ protected final void afterInternal(boolean afterClass) throws Exception { if (cluster() != null) { if (currentClusterScope != Scope.TEST) { MetaData metaData = client().admin().cluster().prepareState().execute().actionGet().getState().getMetaData(); - final Set persistent = metaData.persistentSettings().keySet(); - assertThat("test leaves persistent cluster metadata behind: " + persistent, persistent.size(), equalTo(0)); - final Set transientSettings = new HashSet<>(metaData.transientSettings().keySet()); + + final Set persistentKeys = new HashSet<>(metaData.persistentSettings().keySet()); + persistentKeys.remove(Reconfigurator.CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey()); + assertThat("test leaves persistent cluster metadata behind", persistentKeys, empty()); + + final Set transientKeys = new HashSet<>(metaData.transientSettings().keySet()); if (isInternalCluster() && internalCluster().getAutoManageMinMasterNode()) { // this is set by the test infra - transientSettings.remove(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()); + transientKeys.remove(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()); } - assertThat("test leaves transient cluster metadata behind: " + transientSettings, - transientSettings, empty()); + assertThat("test leaves transient cluster metadata behind", transientKeys, empty()); } ensureClusterSizeConsistency(); ensureClusterStateConsistency();