Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public VotingConfiguration(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(nodeIds.toArray(new String[nodeIds.size()]));
out.writeStringArray(nodeIds.toArray(new String[0]));
}

public boolean hasQuorum(Collection<String> votes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ public static class VoteCollection {
private final Set<Join> joins;

public boolean addVote(DiscoveryNode sourceNode) {
return nodes.put(sourceNode.getId(), sourceNode) == null;
return sourceNode.isMasterNode() && nodes.put(sourceNode.getId(), sourceNode) == null;
}

public boolean addJoinVote(Join join) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,11 +851,23 @@ assert localNodeMayWinElection(getLastAcceptedState()) :
ClusterState improveConfiguration(ClusterState clusterState) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";

// exclude any nodes whose ID is in the voting config exclusions list ...
final Stream<String> excludedNodeIds = clusterState.getVotingConfigExclusions().stream().map(VotingConfigExclusion::getNodeId);
// ... and also automatically exclude the node IDs of master-ineligible nodes that were previously master-eligible and are still in
// the voting config. We could exclude all the master-ineligible nodes here, but there could be quite a few of them and that makes
// the logging much harder to follow.
final Stream<String> masterIneligibleNodeIdsInVotingConfig = StreamSupport.stream(clusterState.nodes().spliterator(), false)
.filter(n -> n.isMasterNode() == false
&& (clusterState.getLastAcceptedConfiguration().getNodeIds().contains(n.getId())
|| clusterState.getLastCommittedConfiguration().getNodeIds().contains(n.getId())))
.map(DiscoveryNode::getId);

final Set<DiscoveryNode> liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false)
.filter(this::hasJoinVoteFrom).collect(Collectors.toSet());
.filter(DiscoveryNode::isMasterNode).filter(coordinationState.get()::containsJoinVoteFor).collect(Collectors.toSet());
final VotingConfiguration newConfig = reconfigurator.reconfigure(liveNodes,
clusterState.getVotingConfigExclusions().stream().map(VotingConfigExclusion::getNodeId).collect(Collectors.toSet()),
Stream.concat(masterIneligibleNodeIdsInVotingConfig, excludedNodeIds).collect(Collectors.toSet()),
getLocalNode(), clusterState.getLastAcceptedConfiguration());

if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) {
assert coordinationState.get().joinVotesHaveQuorumFor(newConfig);
return ClusterState.builder(clusterState).metaData(MetaData.builder(clusterState.metaData())
Expand Down Expand Up @@ -893,9 +905,9 @@ public void onFailure(String source, Exception e) {
}
}

// for tests
boolean hasJoinVoteFrom(DiscoveryNode node) {
return coordinationState.get().containsJoinVoteFor(node);
// exposed for tests
boolean missingJoinVoteFrom(DiscoveryNode node) {
return node.isMasterNode() && coordinationState.get().containsJoinVoteFor(node) == false;
}

private void handleJoin(Join join) {
Expand All @@ -904,13 +916,13 @@ private void handleJoin(Join join) {

if (coordinationState.get().electionWon()) {
// If we have already won the election then the actual join does not matter for election purposes, so swallow any exception
final boolean isNewJoin = handleJoinIgnoringExceptions(join);
final boolean isNewJoinFromMasterEligibleNode = handleJoinIgnoringExceptions(join);

// If we haven't completely finished becoming master then there's already a publication scheduled which will, in turn,
// schedule a reconfiguration if needed. It's benign to schedule a reconfiguration anyway, but it might fail if it wins the
// race against the election-winning publication and log a big error message, which we can prevent by checking this here:
final boolean establishedAsMaster = mode == Mode.LEADER && getLastAcceptedState().term() == getCurrentTerm();
if (isNewJoin && establishedAsMaster && publicationInProgress() == false) {
if (isNewJoinFromMasterEligibleNode && establishedAsMaster && publicationInProgress() == false) {
scheduleReconfigurationIfNeeded();
}
} else {
Expand Down Expand Up @@ -1349,7 +1361,7 @@ public void onFailure(Exception e) {
}

private void handleAssociatedJoin(Join join) {
if (join.getTerm() == getCurrentTerm() && hasJoinVoteFrom(join.getSourceNode()) == false) {
if (join.getTerm() == getCurrentTerm() && missingJoinVoteFrom(join.getSourceNode())) {
logger.trace("handling {}", join);
handleJoin(join);
}
Expand Down Expand Up @@ -1387,7 +1399,7 @@ protected void onMissingJoin(DiscoveryNode discoveryNode) {
// The remote node did not include a join vote in its publish response. We do not persist joins, so it could be that the remote
// node voted for us and then rebooted, or it could be that it voted for a different node in this term. If we don't have a copy
// of a join from this node then we assume the latter and bump our term to obtain a vote from this node.
if (hasJoinVoteFrom(discoveryNode) == false) {
if (missingJoinVoteFrom(discoveryNode)) {
final long term = publishRequest.getAcceptedState().term();
logger.debug("onMissingJoin: no join vote from {}, bumping term to exceed {}", discoveryNode, term);
updateMaxTermSeen(term + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,18 @@ void handlePublishResponse(PublishResponse publishResponse) {
if (applyCommitRequest.isPresent()) {
sendApplyCommit();
} else {
Publication.this.handlePublishResponse(discoveryNode, publishResponse).ifPresent(applyCommit -> {
assert applyCommitRequest.isPresent() == false;
applyCommitRequest = Optional.of(applyCommit);
ackListener.onCommit(TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime));
publicationTargets.stream().filter(PublicationTarget::isWaitingForQuorum).forEach(PublicationTarget::sendApplyCommit);
});
try {
Publication.this.handlePublishResponse(discoveryNode, publishResponse).ifPresent(applyCommit -> {
assert applyCommitRequest.isPresent() == false;
applyCommitRequest = Optional.of(applyCommit);
ackListener.onCommit(TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime));
publicationTargets.stream().filter(PublicationTarget::isWaitingForQuorum)
.forEach(PublicationTarget::sendApplyCommit);
});
} catch (Exception e) {
setFailed(e);
onPossibleCommitFailure();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -734,6 +736,11 @@ public void testHandleCommitWithBadVersion() {
public void testVoteCollection() {
final CoordinationState.VoteCollection voteCollection = new CoordinationState.VoteCollection();
assertTrue(voteCollection.isEmpty());

assertFalse(voteCollection.addVote(
new DiscoveryNode("master-ineligible", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT)));
assertTrue(voteCollection.isEmpty());

voteCollection.addVote(node1);
assertFalse(voteCollection.isEmpty());
assertTrue(voteCollection.containsVoteFor(node1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.MockLogAppender;

import java.io.IOException;
Expand Down Expand Up @@ -1206,6 +1207,61 @@ public void assertMatched() {
}
}

public void testReconfiguresToExcludeMasterIneligibleNodesInVotingConfig() {
final Cluster cluster = new Cluster(3);
cluster.runRandomly();
cluster.stabilise();

final ClusterNode chosenNode = cluster.getAnyNode();

assertThat(cluster.getAnyLeader().getLastAppliedClusterState().getLastCommittedConfiguration().getNodeIds(),
hasItem(chosenNode.getId()));
assertThat(cluster.getAnyLeader().getLastAppliedClusterState().getLastAcceptedConfiguration().getNodeIds(),
hasItem(chosenNode.getId()));

final boolean chosenNodeIsLeader = chosenNode == cluster.getAnyLeader();
final long termBeforeRestart = cluster.getAnyNode().coordinator.getCurrentTerm();

logger.info("--> restarting [{}] as a master-ineligible node", chosenNode);

chosenNode.close();
cluster.clusterNodes.replaceAll(cn -> cn == chosenNode ? cn.restartedNode(Function.identity(), Function.identity(),
Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false).build()) : cn);
cluster.stabilise();

if (chosenNodeIsLeader == false) {
assertThat("term did not change", cluster.getAnyNode().coordinator.getCurrentTerm(), is(termBeforeRestart));
}

assertThat(cluster.getAnyLeader().getLastAppliedClusterState().getLastCommittedConfiguration().getNodeIds(),
not(hasItem(chosenNode.getId())));
assertThat(cluster.getAnyLeader().getLastAppliedClusterState().getLastAcceptedConfiguration().getNodeIds(),
not(hasItem(chosenNode.getId())));
}

public void testDoesNotPerformElectionWhenRestartingFollower() {
final Cluster cluster = new Cluster(randomIntBetween(2, 5), false, Settings.EMPTY);
cluster.runRandomly();
cluster.stabilise();

final ClusterNode leader = cluster.getAnyLeader();
final long expectedTerm = leader.coordinator.getCurrentTerm();

if (cluster.clusterNodes.stream().filter(n -> n.getLocalNode().isMasterNode()).count() == 2) {
// in the 2-node case, auto-shrinking the voting configuration is required to reduce the voting configuration down to just the
// leader, otherwise restarting the other master-eligible node triggers an election
leader.submitSetAutoShrinkVotingConfiguration(true);
cluster.stabilise(2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY); // first delay for the setting update, second for the reconfiguration
}

for (final ClusterNode clusterNode : cluster.getAllNodesExcept(leader)) {
logger.info("--> restarting {}", clusterNode);
clusterNode.close();
cluster.clusterNodes.replaceAll(cn ->
cn == clusterNode ? cn.restartedNode(Function.identity(), Function.identity(), Settings.EMPTY) : cn);
cluster.stabilise();
assertThat("term should not change", cluster.getAnyNode().coordinator.getCurrentTerm(), is(expectedTerm));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,16 @@ public void testBecomeFollowerFailsPendingJoin() throws Exception {
}

public void testConcurrentJoining() {
List<DiscoveryNode> nodes = IntStream.rangeClosed(1, randomIntBetween(2, 5))
List<DiscoveryNode> masterNodes = IntStream.rangeClosed(1, randomIntBetween(2, 5))
.mapToObj(nodeId -> newNode(nodeId, true)).collect(Collectors.toList());
List<DiscoveryNode> otherNodes = IntStream.rangeClosed(masterNodes.size() + 1, masterNodes.size() + 1 + randomIntBetween(0, 5))
.mapToObj(nodeId -> newNode(nodeId, false)).collect(Collectors.toList());
List<DiscoveryNode> allNodes = Stream.concat(masterNodes.stream(), otherNodes.stream()).collect(Collectors.toList());

DiscoveryNode localNode = nodes.get(0);
DiscoveryNode localNode = masterNodes.get(0);
VotingConfiguration votingConfiguration = new VotingConfiguration(randomValueOtherThan(singletonList(localNode),
() -> randomSubsetOf(randomIntBetween(1, nodes.size()), nodes)).stream().map(DiscoveryNode::getId).collect(Collectors.toSet()));
() -> randomSubsetOf(randomIntBetween(1, masterNodes.size()), masterNodes)).stream()
.map(DiscoveryNode::getId).collect(Collectors.toSet()));

logger.info("Voting configuration: {}", votingConfiguration);

Expand All @@ -489,7 +493,7 @@ public void testConcurrentJoining() {
// we need at least a quorum of voting nodes with a correct term and worse state
List<DiscoveryNode> successfulNodes;
do {
successfulNodes = randomSubsetOf(nodes);
successfulNodes = randomSubsetOf(allNodes);
} while (votingConfiguration.hasQuorum(successfulNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList()))
== false);

Expand All @@ -499,7 +503,7 @@ public void testConcurrentJoining() {
node -> new JoinRequest(node, Optional.of(new Join(node, localNode, newTerm, initialTerm, initialVersion))))
.collect(Collectors.toList());

List<DiscoveryNode> possiblyUnsuccessfulNodes = new ArrayList<>(nodes);
List<DiscoveryNode> possiblyUnsuccessfulNodes = new ArrayList<>(allNodes);
possiblyUnsuccessfulNodes.removeAll(successfulNodes);

logger.info("Possibly unsuccessful voting nodes: {}", possiblyUnsuccessfulNodes);
Expand Down Expand Up @@ -572,8 +576,8 @@ public void testConcurrentJoining() {

assertTrue(MasterServiceTests.discoveryState(masterService).nodes().isLocalNodeElectedMaster());
for (DiscoveryNode successfulNode : successfulNodes) {
assertTrue(successfulNode.toString(), clusterStateHasNode(successfulNode));
assertTrue(successfulNode.toString(), coordinator.hasJoinVoteFrom(successfulNode));
assertTrue(successfulNode + " joined cluster", clusterStateHasNode(successfulNode));
assertFalse(successfulNode + " voted for master", coordinator.missingJoinVoteFrom(successfulNode));
}
}

Expand Down
Loading