Skip to content

Commit cc51a93

Browse files
authored
Avoid counting votes from master-ineligible nodes (#43688)
Today if a master-eligible node is converted to a master-ineligible node it may remain in the voting configuration, meaning that the master node may count its publish responses as an indication that it has properly persisted the cluster state. However master-ineligible nodes do not properly persist the cluster state, so it is not safe to count these votes. This change adjusts `CoordinationState` to take account of this from a safety point of view, and also adjusts the `Coordinator` to prevent such nodes from joining the cluster. Instead, it triggers a reconfiguration to remove from the voting configuration a node that now appears to be master-ineligible before processing its join.
1 parent 4af1649 commit cc51a93

File tree

9 files changed

+189
-36
lines changed

9 files changed

+189
-36
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationMetaData.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ public VotingConfiguration(StreamInput in) throws IOException {
340340

341341
@Override
342342
public void writeTo(StreamOutput out) throws IOException {
343-
out.writeStringArray(nodeIds.toArray(new String[nodeIds.size()]));
343+
out.writeStringArray(nodeIds.toArray(new String[0]));
344344
}
345345

346346
public boolean hasQuorum(Collection<String> votes) {

server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,7 @@ public static class VoteCollection {
503503
private final Set<Join> joins;
504504

505505
public boolean addVote(DiscoveryNode sourceNode) {
506-
return nodes.put(sourceNode.getId(), sourceNode) == null;
506+
return sourceNode.isMasterNode() && nodes.put(sourceNode.getId(), sourceNode) == null;
507507
}
508508

509509
public boolean addJoinVote(Join join) {

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -851,11 +851,23 @@ assert localNodeMayWinElection(getLastAcceptedState()) :
851851
ClusterState improveConfiguration(ClusterState clusterState) {
852852
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
853853

854+
// exclude any nodes whose ID is in the voting config exclusions list ...
855+
final Stream<String> excludedNodeIds = clusterState.getVotingConfigExclusions().stream().map(VotingConfigExclusion::getNodeId);
856+
// ... and also automatically exclude the node IDs of master-ineligible nodes that were previously master-eligible and are still in
857+
// the voting config. We could exclude all the master-ineligible nodes here, but there could be quite a few of them and that makes
858+
// the logging much harder to follow.
859+
final Stream<String> masterIneligibleNodeIdsInVotingConfig = StreamSupport.stream(clusterState.nodes().spliterator(), false)
860+
.filter(n -> n.isMasterNode() == false
861+
&& (clusterState.getLastAcceptedConfiguration().getNodeIds().contains(n.getId())
862+
|| clusterState.getLastCommittedConfiguration().getNodeIds().contains(n.getId())))
863+
.map(DiscoveryNode::getId);
864+
854865
final Set<DiscoveryNode> liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false)
855-
.filter(this::hasJoinVoteFrom).collect(Collectors.toSet());
866+
.filter(DiscoveryNode::isMasterNode).filter(coordinationState.get()::containsJoinVoteFor).collect(Collectors.toSet());
856867
final VotingConfiguration newConfig = reconfigurator.reconfigure(liveNodes,
857-
clusterState.getVotingConfigExclusions().stream().map(VotingConfigExclusion::getNodeId).collect(Collectors.toSet()),
868+
Stream.concat(masterIneligibleNodeIdsInVotingConfig, excludedNodeIds).collect(Collectors.toSet()),
858869
getLocalNode(), clusterState.getLastAcceptedConfiguration());
870+
859871
if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) {
860872
assert coordinationState.get().joinVotesHaveQuorumFor(newConfig);
861873
return ClusterState.builder(clusterState).metaData(MetaData.builder(clusterState.metaData())
@@ -893,9 +905,9 @@ public void onFailure(String source, Exception e) {
893905
}
894906
}
895907

896-
// for tests
897-
boolean hasJoinVoteFrom(DiscoveryNode node) {
898-
return coordinationState.get().containsJoinVoteFor(node);
908+
// exposed for tests
909+
boolean missingJoinVoteFrom(DiscoveryNode node) {
910+
return node.isMasterNode() && coordinationState.get().containsJoinVoteFor(node) == false;
899911
}
900912

901913
private void handleJoin(Join join) {
@@ -904,13 +916,13 @@ private void handleJoin(Join join) {
904916

905917
if (coordinationState.get().electionWon()) {
906918
// If we have already won the election then the actual join does not matter for election purposes, so swallow any exception
907-
final boolean isNewJoin = handleJoinIgnoringExceptions(join);
919+
final boolean isNewJoinFromMasterEligibleNode = handleJoinIgnoringExceptions(join);
908920

909921
// If we haven't completely finished becoming master then there's already a publication scheduled which will, in turn,
910922
// schedule a reconfiguration if needed. It's benign to schedule a reconfiguration anyway, but it might fail if it wins the
911923
// race against the election-winning publication and log a big error message, which we can prevent by checking this here:
912924
final boolean establishedAsMaster = mode == Mode.LEADER && getLastAcceptedState().term() == getCurrentTerm();
913-
if (isNewJoin && establishedAsMaster && publicationInProgress() == false) {
925+
if (isNewJoinFromMasterEligibleNode && establishedAsMaster && publicationInProgress() == false) {
914926
scheduleReconfigurationIfNeeded();
915927
}
916928
} else {
@@ -1349,7 +1361,7 @@ public void onFailure(Exception e) {
13491361
}
13501362

13511363
private void handleAssociatedJoin(Join join) {
1352-
if (join.getTerm() == getCurrentTerm() && hasJoinVoteFrom(join.getSourceNode()) == false) {
1364+
if (join.getTerm() == getCurrentTerm() && missingJoinVoteFrom(join.getSourceNode())) {
13531365
logger.trace("handling {}", join);
13541366
handleJoin(join);
13551367
}
@@ -1387,7 +1399,7 @@ protected void onMissingJoin(DiscoveryNode discoveryNode) {
13871399
// The remote node did not include a join vote in its publish response. We do not persist joins, so it could be that the remote
13881400
// node voted for us and then rebooted, or it could be that it voted for a different node in this term. If we don't have a copy
13891401
// of a join from this node then we assume the latter and bump our term to obtain a vote from this node.
1390-
if (hasJoinVoteFrom(discoveryNode) == false) {
1402+
if (missingJoinVoteFrom(discoveryNode)) {
13911403
final long term = publishRequest.getAcceptedState().term();
13921404
logger.debug("onMissingJoin: no join vote from {}, bumping term to exceed {}", discoveryNode, term);
13931405
updateMaxTermSeen(term + 1);

server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -239,12 +239,18 @@ void handlePublishResponse(PublishResponse publishResponse) {
239239
if (applyCommitRequest.isPresent()) {
240240
sendApplyCommit();
241241
} else {
242-
Publication.this.handlePublishResponse(discoveryNode, publishResponse).ifPresent(applyCommit -> {
243-
assert applyCommitRequest.isPresent() == false;
244-
applyCommitRequest = Optional.of(applyCommit);
245-
ackListener.onCommit(TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime));
246-
publicationTargets.stream().filter(PublicationTarget::isWaitingForQuorum).forEach(PublicationTarget::sendApplyCommit);
247-
});
242+
try {
243+
Publication.this.handlePublishResponse(discoveryNode, publishResponse).ifPresent(applyCommit -> {
244+
assert applyCommitRequest.isPresent() == false;
245+
applyCommitRequest = Optional.of(applyCommit);
246+
ackListener.onCommit(TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime));
247+
publicationTargets.stream().filter(PublicationTarget::isWaitingForQuorum)
248+
.forEach(PublicationTarget::sendApplyCommit);
249+
});
250+
} catch (Exception e) {
251+
setFailed(e);
252+
onPossibleCommitFailure();
253+
}
248254
}
249255
}
250256

server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import java.util.stream.Collectors;
4242
import java.util.stream.IntStream;
4343

44+
import static java.util.Collections.emptyMap;
45+
import static java.util.Collections.emptySet;
4446
import static org.hamcrest.Matchers.containsString;
4547
import static org.hamcrest.Matchers.equalTo;
4648

@@ -734,6 +736,11 @@ public void testHandleCommitWithBadVersion() {
734736
public void testVoteCollection() {
735737
final CoordinationState.VoteCollection voteCollection = new CoordinationState.VoteCollection();
736738
assertTrue(voteCollection.isEmpty());
739+
740+
assertFalse(voteCollection.addVote(
741+
new DiscoveryNode("master-ineligible", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT)));
742+
assertTrue(voteCollection.isEmpty());
743+
737744
voteCollection.addVote(node1);
738745
assertFalse(voteCollection.isEmpty());
739746
assertTrue(voteCollection.containsVoteFor(node1));

server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.common.xcontent.XContentBuilder;
4242
import org.elasticsearch.discovery.DiscoveryModule;
4343
import org.elasticsearch.gateway.GatewayService;
44+
import org.elasticsearch.node.Node;
4445
import org.elasticsearch.test.MockLogAppender;
4546

4647
import java.io.IOException;
@@ -1206,6 +1207,61 @@ public void assertMatched() {
12061207
}
12071208
}
12081209

1210+
public void testReconfiguresToExcludeMasterIneligibleNodesInVotingConfig() {
1211+
final Cluster cluster = new Cluster(3);
1212+
cluster.runRandomly();
1213+
cluster.stabilise();
1214+
1215+
final ClusterNode chosenNode = cluster.getAnyNode();
1216+
1217+
assertThat(cluster.getAnyLeader().getLastAppliedClusterState().getLastCommittedConfiguration().getNodeIds(),
1218+
hasItem(chosenNode.getId()));
1219+
assertThat(cluster.getAnyLeader().getLastAppliedClusterState().getLastAcceptedConfiguration().getNodeIds(),
1220+
hasItem(chosenNode.getId()));
1221+
1222+
final boolean chosenNodeIsLeader = chosenNode == cluster.getAnyLeader();
1223+
final long termBeforeRestart = cluster.getAnyNode().coordinator.getCurrentTerm();
12091224

1225+
logger.info("--> restarting [{}] as a master-ineligible node", chosenNode);
1226+
1227+
chosenNode.close();
1228+
cluster.clusterNodes.replaceAll(cn -> cn == chosenNode ? cn.restartedNode(Function.identity(), Function.identity(),
1229+
Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false).build()) : cn);
1230+
cluster.stabilise();
1231+
1232+
if (chosenNodeIsLeader == false) {
1233+
assertThat("term did not change", cluster.getAnyNode().coordinator.getCurrentTerm(), is(termBeforeRestart));
1234+
}
1235+
1236+
assertThat(cluster.getAnyLeader().getLastAppliedClusterState().getLastCommittedConfiguration().getNodeIds(),
1237+
not(hasItem(chosenNode.getId())));
1238+
assertThat(cluster.getAnyLeader().getLastAppliedClusterState().getLastAcceptedConfiguration().getNodeIds(),
1239+
not(hasItem(chosenNode.getId())));
1240+
}
1241+
1242+
public void testDoesNotPerformElectionWhenRestartingFollower() {
1243+
final Cluster cluster = new Cluster(randomIntBetween(2, 5), false, Settings.EMPTY);
1244+
cluster.runRandomly();
1245+
cluster.stabilise();
1246+
1247+
final ClusterNode leader = cluster.getAnyLeader();
1248+
final long expectedTerm = leader.coordinator.getCurrentTerm();
1249+
1250+
if (cluster.clusterNodes.stream().filter(n -> n.getLocalNode().isMasterNode()).count() == 2) {
1251+
// in the 2-node case, auto-shrinking the voting configuration is required to reduce the voting configuration down to just the
1252+
// leader, otherwise restarting the other master-eligible node triggers an election
1253+
leader.submitSetAutoShrinkVotingConfiguration(true);
1254+
cluster.stabilise(2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY); // first delay for the setting update, second for the reconfiguration
1255+
}
1256+
1257+
for (final ClusterNode clusterNode : cluster.getAllNodesExcept(leader)) {
1258+
logger.info("--> restarting {}", clusterNode);
1259+
clusterNode.close();
1260+
cluster.clusterNodes.replaceAll(cn ->
1261+
cn == clusterNode ? cn.restartedNode(Function.identity(), Function.identity(), Settings.EMPTY) : cn);
1262+
cluster.stabilise();
1263+
assertThat("term should not change", cluster.getAnyNode().coordinator.getCurrentTerm(), is(expectedTerm));
1264+
}
1265+
}
12101266

12111267
}

server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -472,12 +472,16 @@ public void testBecomeFollowerFailsPendingJoin() throws Exception {
472472
}
473473

474474
public void testConcurrentJoining() {
475-
List<DiscoveryNode> nodes = IntStream.rangeClosed(1, randomIntBetween(2, 5))
475+
List<DiscoveryNode> masterNodes = IntStream.rangeClosed(1, randomIntBetween(2, 5))
476476
.mapToObj(nodeId -> newNode(nodeId, true)).collect(Collectors.toList());
477+
List<DiscoveryNode> otherNodes = IntStream.rangeClosed(masterNodes.size() + 1, masterNodes.size() + 1 + randomIntBetween(0, 5))
478+
.mapToObj(nodeId -> newNode(nodeId, false)).collect(Collectors.toList());
479+
List<DiscoveryNode> allNodes = Stream.concat(masterNodes.stream(), otherNodes.stream()).collect(Collectors.toList());
477480

478-
DiscoveryNode localNode = nodes.get(0);
481+
DiscoveryNode localNode = masterNodes.get(0);
479482
VotingConfiguration votingConfiguration = new VotingConfiguration(randomValueOtherThan(singletonList(localNode),
480-
() -> randomSubsetOf(randomIntBetween(1, nodes.size()), nodes)).stream().map(DiscoveryNode::getId).collect(Collectors.toSet()));
483+
() -> randomSubsetOf(randomIntBetween(1, masterNodes.size()), masterNodes)).stream()
484+
.map(DiscoveryNode::getId).collect(Collectors.toSet()));
481485

482486
logger.info("Voting configuration: {}", votingConfiguration);
483487

@@ -489,7 +493,7 @@ public void testConcurrentJoining() {
489493
// we need at least a quorum of voting nodes with a correct term and worse state
490494
List<DiscoveryNode> successfulNodes;
491495
do {
492-
successfulNodes = randomSubsetOf(nodes);
496+
successfulNodes = randomSubsetOf(allNodes);
493497
} while (votingConfiguration.hasQuorum(successfulNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList()))
494498
== false);
495499

@@ -499,7 +503,7 @@ public void testConcurrentJoining() {
499503
node -> new JoinRequest(node, Optional.of(new Join(node, localNode, newTerm, initialTerm, initialVersion))))
500504
.collect(Collectors.toList());
501505

502-
List<DiscoveryNode> possiblyUnsuccessfulNodes = new ArrayList<>(nodes);
506+
List<DiscoveryNode> possiblyUnsuccessfulNodes = new ArrayList<>(allNodes);
503507
possiblyUnsuccessfulNodes.removeAll(successfulNodes);
504508

505509
logger.info("Possibly unsuccessful voting nodes: {}", possiblyUnsuccessfulNodes);
@@ -572,8 +576,8 @@ public void testConcurrentJoining() {
572576

573577
assertTrue(MasterServiceTests.discoveryState(masterService).nodes().isLocalNodeElectedMaster());
574578
for (DiscoveryNode successfulNode : successfulNodes) {
575-
assertTrue(successfulNode.toString(), clusterStateHasNode(successfulNode));
576-
assertTrue(successfulNode.toString(), coordinator.hasJoinVoteFrom(successfulNode));
579+
assertTrue(successfulNode + " joined cluster", clusterStateHasNode(successfulNode));
580+
assertFalse(successfulNode + " voted for master", coordinator.missingJoinVoteFrom(successfulNode));
577581
}
578582
}
579583

0 commit comments

Comments
 (0)