diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 72e626399878e..529eb32c4a537 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -39,6 +39,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; @@ -184,6 +185,13 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { synchronized (mutex) { final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getMasterNode(); logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode); + + if (sourceNode.equals(getLocalNode()) && mode != Mode.LEADER) { + // Rare case in which we stood down as leader between starting this publication and receiving it ourselves. The publication + // is already failed so there is no point in proceeding. + throw new CoordinationStateRejectedException("no longer leading this publication's term: " + publishRequest); + } + ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term()); final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest); @@ -438,8 +446,10 @@ public void invariant() { = currentPublication.map(Publication::publishedState).orElse(coordinationState.get().getLastAcceptedState()); lastPublishedState.nodes().forEach(lastPublishedNodes::add); assert lastPublishedNodes.remove(getLocalNode()); + assert lastPublishedNodes.equals(knownFollowers) : lastPublishedNodes + " != " + knownFollowers + + " [becomingMaster=" + becomingMaster + ", publicationInProgress=" + publicationInProgress() + "]"; + // TODO instead assert that knownFollowers is updated appropriately at the end of each publication } - assert lastPublishedNodes.equals(knownFollowers) : lastPublishedNodes + " != " + knownFollowers; } else if (mode == Mode.FOLLOWER) { assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false"; assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false); @@ -604,11 +614,6 @@ protected void onCompletion(boolean committed) { @Override public void onResponse(Void ignore) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - assert coordinationState.get().getLastAcceptedTerm() == publishRequest.getAcceptedState().term() - && coordinationState.get().getLastAcceptedVersion() == publishRequest.getAcceptedState().version() - : "onPossibleCompletion: term or version mismatch when publishing [" + this - + "]: current version is now [" + coordinationState.get().getLastAcceptedVersion() - + "] in term [" + coordinationState.get().getLastAcceptedTerm() + "]"; assert committed; // TODO: send to applier @@ -628,7 +633,7 @@ public void onFailure(Exception e) { ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master. publishListener.onFailure(exception); } - }, transportService.getThreadPool().generic()); + }, EsExecutors.newDirectExecutorService()); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 7bcfd7d9fb604..15b274c99f871 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -30,11 +30,14 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportResponseHandler; @@ -56,9 +59,15 @@ public class JoinHelper extends AbstractComponent { public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join"; public static final String START_JOIN_ACTION_NAME = "internal:cluster/coordination/start_join"; + // the timeout for each join attempt + public static final Setting JOIN_TIMEOUT_SETTING = + Setting.timeSetting("cluster.join.timeout", + TimeValue.timeValueMillis(60000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + private final MasterService masterService; private final TransportService transportService; private final JoinTaskExecutor joinTaskExecutor; + private final TimeValue joinTimeout; final Set> pendingOutgoingJoins = ConcurrentCollections.newConcurrentSet(); @@ -68,6 +77,7 @@ public JoinHelper(Settings settings, AllocationService allocationService, Master super(settings); this.masterService = masterService; this.transportService = transportService; + this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) { @Override @@ -130,29 +140,31 @@ public void sendJoinRequest(DiscoveryNode destination, Optional optionalJo final Tuple dedupKey = Tuple.tuple(destination, joinRequest); if (pendingOutgoingJoins.add(dedupKey)) { logger.debug("attempting to join {} with {}", destination, joinRequest); - transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest, new TransportResponseHandler() { - @Override - public Empty read(StreamInput in) { - return Empty.INSTANCE; - } + transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest, + TransportRequestOptions.builder().withTimeout(joinTimeout).build(), + new TransportResponseHandler() { + @Override + public Empty read(StreamInput in) { + return Empty.INSTANCE; + } - @Override - public void handleResponse(Empty response) { - pendingOutgoingJoins.remove(dedupKey); - logger.debug("successfully joined {} with {}", destination, joinRequest); - } + @Override + public void handleResponse(Empty response) { + pendingOutgoingJoins.remove(dedupKey); + logger.debug("successfully joined {} with {}", destination, joinRequest); + } - @Override - public void handleException(TransportException exp) { - pendingOutgoingJoins.remove(dedupKey); - logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp); - } + @Override + public void handleException(TransportException exp) { + pendingOutgoingJoins.remove(dedupKey); + logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp); + } - @Override - public String executor() { - return Names.SAME; - } - }); + @Override + public String executor() { + return Names.SAME; + } + }); } else { logger.debug("already attempting to join {} with request {}, not sending request", destination, joinRequest); } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index ae3f1f0fa0d39..438aec7944f92 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory; +import org.elasticsearch.cluster.coordination.JoinHelper; import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.OperationRouting; @@ -445,10 +446,12 @@ public void apply(Settings value, Settings current, Settings previous) { IndexGraveyard.SETTING_MAX_TOMBSTONES, EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING, + PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING, ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING, ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING, ElectionSchedulerFactory.ELECTION_MAX_TIMEOUT_SETTING, - Coordinator.PUBLISH_TIMEOUT_SETTING + Coordinator.PUBLISH_TIMEOUT_SETTING, + JoinHelper.JOIN_TIMEOUT_SETTING ))); public static List> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList( diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java index 5fb8e9517b26e..971fa1f40acde 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java @@ -82,7 +82,7 @@ protected synchronized void done() { private void notifyListener(ActionListener listener, ExecutorService executorService) { try { - executorService.submit(new Runnable() { + executorService.execute(new Runnable() { @Override public void run() { try { diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index 028519cc1383f..3d32762d53281 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; @@ -59,7 +60,12 @@ public abstract class PeerFinder extends AbstractComponent { Setting.timeSetting("discovery.find_peers_interval", TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); - private final TimeValue findPeersDelay; + public static final Setting DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING = + Setting.timeSetting("discovery.request_peers_timeout", + TimeValue.timeValueMillis(3000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + + private final TimeValue findPeersInterval; + private final TimeValue requestPeersTimeout; private final Object mutex = new Object(); private final TransportService transportService; @@ -75,7 +81,8 @@ public abstract class PeerFinder extends AbstractComponent { public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver) { super(settings); - findPeersDelay = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings); + findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings); + requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings); this.transportService = transportService; this.transportAddressConnector = transportAddressConnector; this.configuredHostsResolver = configuredHostsResolver; @@ -241,7 +248,7 @@ private boolean handleWakeUp() { } }); - transportService.getThreadPool().schedule(findPeersDelay, Names.GENERIC, new AbstractRunnable() { + transportService.getThreadPool().schedule(findPeersInterval, Names.GENERIC, new AbstractRunnable() { @Override public boolean isForceExecution() { return true; @@ -360,9 +367,11 @@ public void onFailure(Exception e) { }); } - private void removePeer() { + void removePeer() { final Peer removed = peersByAddress.remove(transportAddress); - assert removed == Peer.this; + // assert removed == Peer.this : removed + " != " + Peer.this; + // ^ This assertion sometimes trips if we are deactivated and reactivated while a request is in flight. + // TODO be more careful about avoiding multiple active Peer objects for each address } private void requestPeers() { @@ -380,6 +389,7 @@ private void requestPeers() { transportService.sendRequest(discoveryNode, REQUEST_PEERS_ACTION_NAME, new PeersRequest(getLocalNode(), knownNodes), + TransportRequestOptions.builder().withTimeout(requestPeersTimeout).build(), new TransportResponseHandler() { @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 4b72b029dfc7d..a29f1531c2d4a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -47,13 +47,16 @@ import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.transport.TransportService; import org.hamcrest.Matcher; +import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.function.Predicate; @@ -65,6 +68,7 @@ import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER; +import static org.elasticsearch.cluster.coordination.Coordinator.Mode.LEADER; import static org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.DEFAULT_DELAY_VARIABILITY; import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING; import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING; @@ -86,8 +90,14 @@ @TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE") public class CoordinatorTests extends ESTestCase { + @Before + public void resetPortCounterBeforeEachTest() { + resetPortCounter(); + } + public void testCanUpdateClusterStateAfterStabilisation() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + cluster.runRandomly(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); @@ -106,6 +116,7 @@ public void testCanUpdateClusterStateAfterStabilisation() { public void testNodesJoinAfterStableCluster() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + cluster.runRandomly(); cluster.stabilise(); final long currentTerm = cluster.getAnyLeader().coordinator.getCurrentTerm(); @@ -125,6 +136,7 @@ public void testNodesJoinAfterStableCluster() { public void testLeaderDisconnectionDetectedQuickly() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.runRandomly(); cluster.stabilise(); final ClusterNode originalLeader = cluster.getAnyLeader(); @@ -153,18 +165,19 @@ public void testLeaderDisconnectionDetectedQuickly() { + DEFAULT_DELAY_VARIABILITY // then wait for the removal to be committed + DEFAULT_CLUSTER_STATE_UPDATE_DELAY - )); + )); assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId()))); } public void testUnresponsiveLeaderDetectedEventually() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.runRandomly(); cluster.stabilise(); final ClusterNode originalLeader = cluster.getAnyLeader(); - logger.info("--> partitioning leader {}", originalLeader); - originalLeader.partition(); + logger.info("--> blackholing leader {}", originalLeader); + originalLeader.blackhole(); cluster.stabilise(Math.max( // first wait for all the followers to notice the leader has gone @@ -189,6 +202,7 @@ public void testUnresponsiveLeaderDetectedEventually() { public void testFollowerDisconnectionDetectedQuickly() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.runRandomly(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); @@ -220,12 +234,13 @@ public void testFollowerDisconnectionDetectedQuickly() { public void testUnresponsiveFollowerDetectedEventually() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.runRandomly(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower = cluster.getAnyNodeExcept(leader); - logger.info("--> partitioning follower {}", follower); - follower.partition(); + logger.info("--> blackholing follower {}", follower); + follower.blackhole(); cluster.stabilise(Math.max( // wait for the leader to notice that the follower is unresponsive @@ -289,6 +304,7 @@ private static String nodeIdFromIndex(int nodeIndex) { class Cluster { + static final long EXTREME_DELAY_VARIABILITY = 10000L; static final long DEFAULT_DELAY_VARIABILITY = 100L; final List clusterNodes; @@ -299,6 +315,7 @@ class Cluster { private final Set disconnectedNodes = new HashSet<>(); private final Set blackholedNodes = new HashSet<>(); + private final Map committedStatesByVersion = new HashMap<>(); Cluster(int initialNodeCount) { deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); @@ -328,6 +345,105 @@ void addNodes(int newNodesCount) { } } + void runRandomly() { + + // TODO supporting (preserving?) existing disruptions needs implementing if needed, for now we just forbid it + assertThat("may reconnect disconnected nodes, probably unexpected", disconnectedNodes, empty()); + assertThat("may reconnect blackholed nodes, probably unexpected", blackholedNodes, empty()); + + final int randomSteps = scaledRandomIntBetween(10, 10000); + logger.info("--> start of safety phase of at least [{}] steps", randomSteps); + + deterministicTaskQueue.setExecutionDelayVariabilityMillis(EXTREME_DELAY_VARIABILITY); + int step = 0; + long finishTime = -1; + + while (finishTime == -1 || deterministicTaskQueue.getCurrentTimeMillis() <= finishTime) { + step++; + if (randomSteps <= step && finishTime == -1) { + finishTime = deterministicTaskQueue.getLatestDeferredExecutionTime(); + deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); + logger.debug("----> [runRandomly {}] reducing delay variability and running until [{}]", step, finishTime); + } + + try { + if (rarely()) { + final ClusterNode clusterNode = getAnyNodePreferringLeaders(); + final int newValue = randomInt(); + logger.debug("----> [runRandomly {}] proposing new value [{}] to [{}]", step, newValue, clusterNode.getId()); + clusterNode.submitValue(newValue); + } else if (rarely()) { + final ClusterNode clusterNode = getAnyNode(); + logger.debug("----> [runRandomly {}] forcing {} to become candidate", step, clusterNode.getId()); + synchronized (clusterNode.coordinator.mutex) { + clusterNode.coordinator.becomeCandidate("runRandomly"); + } + } else if (rarely()) { + final ClusterNode clusterNode = getAnyNode(); + + switch (randomInt(2)) { + case 0: + if (clusterNode.connect()) { + logger.debug("----> [runRandomly {}] connecting {}", step, clusterNode.getId()); + } + break; + case 1: + if (clusterNode.disconnect()) { + logger.debug("----> [runRandomly {}] disconnecting {}", step, clusterNode.getId()); + } + break; + case 2: + if (clusterNode.blackhole()) { + logger.debug("----> [runRandomly {}] blackholing {}", step, clusterNode.getId()); + } + break; + } + } else { + if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) { + deterministicTaskQueue.advanceTime(); + } else if (deterministicTaskQueue.hasRunnableTasks()) { + deterministicTaskQueue.runRandomTask(); + } + } + + // TODO other random steps: + // - reboot a node + // - abdicate leadership + // - bootstrap + + } catch (CoordinationStateRejectedException ignored) { + // This is ok: it just means a message couldn't currently be handled. + } + + assertConsistentStates(); + } + + disconnectedNodes.clear(); + blackholedNodes.clear(); + } + + private void assertConsistentStates() { + for (final ClusterNode clusterNode : clusterNodes) { + clusterNode.coordinator.invariant(); + } + updateCommittedStates(); + } + + private void updateCommittedStates() { + for (final ClusterNode clusterNode : clusterNodes) { + Optional committedState = clusterNode.coordinator.getLastCommittedState(); + if (committedState.isPresent()) { + ClusterState storedState = committedStatesByVersion.get(committedState.get().getVersion()); + if (storedState == null) { + committedStatesByVersion.put(committedState.get().getVersion(), committedState.get()); + } else { + assertEquals("expected " + committedState.get() + " but got " + storedState, + value(committedState.get()), value(storedState)); + } + } + } + } + void stabilise() { stabilise(DEFAULT_STABILISATION_TIME); } @@ -389,19 +505,20 @@ private void assertUniqueLeaderAndExpectedModes() { final String nodeId = clusterNode.getId(); - if (disconnectedNodes.contains(nodeId) || blackholedNodes.contains(nodeId)) { - assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE)); - } else { + if (isConnectedPair(leader, clusterNode)) { + assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER)); assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm)); - // TODO assert that all nodes have actually voted for the leader in this term + // TODO assert that this node has actually voted for the leader in this term + // TODO assert that this node's accepted and committed states are the same as the leader's - assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER)); - assertThat(nodeId + " is at the same accepted version as the leader", - Optional.of(clusterNode.coordinator.getLastAcceptedState().getVersion()), isPresentAndEqualToLeaderVersion); - assertThat(nodeId + " is at the same committed version as the leader", - clusterNode.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion); - assertThat(clusterNode.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)), + assertThat(nodeId + " is in the leader's committed state", + leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)), equalTo(Optional.of(true))); + } else { + assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE)); + assertThat(nodeId + " is not in the leader's committed state", + leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)), + equalTo(Optional.of(false))); } } @@ -428,6 +545,10 @@ private ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode return connectionStatus; } + ClusterNode getAnyNode() { + return getAnyNodeExcept(); + } + ClusterNode getAnyNodeExcept(ClusterNode... clusterNodes) { Set forbiddenIds = Arrays.stream(clusterNodes).map(ClusterNode::getId).collect(Collectors.toSet()); List acceptableNodes @@ -436,6 +557,16 @@ ClusterNode getAnyNodeExcept(ClusterNode... clusterNodes) { return randomFrom(acceptableNodes); } + ClusterNode getAnyNodePreferringLeaders() { + for (int i = 0; i < 3; i++) { + ClusterNode clusterNode = getAnyNode(); + if (clusterNode.coordinator.getMode() == LEADER) { + return clusterNode; + } + } + return getAnyNode(); + } + class ClusterNode extends AbstractComponent { private final int nodeIndex; private Coordinator coordinator; @@ -538,7 +669,7 @@ DiscoveryNode getLocalNode() { } boolean isLeader() { - return coordinator.getMode() == Coordinator.Mode.LEADER; + return coordinator.getMode() == LEADER; } void submitValue(final long value) { @@ -560,12 +691,25 @@ public String toString() { return localNode.toString(); } - void disconnect() { - disconnectedNodes.add(localNode.getId()); + boolean connect() { + boolean unBlackholed = blackholedNodes.remove(localNode.getId()); + boolean unDisconnected = disconnectedNodes.remove(localNode.getId()); + assert unBlackholed == false || unDisconnected == false; + return unBlackholed || unDisconnected; + } + + boolean disconnect() { + boolean unBlackholed = blackholedNodes.remove(localNode.getId()); + boolean disconnected = disconnectedNodes.add(localNode.getId()); + assert disconnected || unBlackholed == false; + return disconnected; } - void partition() { - blackholedNodes.add(localNode.getId()); + boolean blackhole() { + boolean unDisconnected = disconnectedNodes.remove(localNode.getId()); + boolean blackholed = blackholedNodes.add(localNode.getId()); + assert blackholed || unDisconnected == false; + return blackholed; } } diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java index 0a74300beac58..2e0eb46d7deae 100644 --- a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java @@ -663,6 +663,51 @@ public void testDoesNotMakeMultipleConcurrentConnectionAttemptsToOneAddress() { assertFoundPeers(otherNode); } + public void testTimesOutAndRetriesConnectionsToBlackholedNodes() { + final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); + final DiscoveryNode nodeToFind = newDiscoveryNode("node-to-find"); + + providedAddresses.add(otherNode.getAddress()); + transportAddressConnector.reachableNodes.add(otherNode); + transportAddressConnector.reachableNodes.add(nodeToFind); + + peerFinder.activate(lastAcceptedNodes); + + while (true) { + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); // MockTransportAddressConnector verifies no multiple connection attempts + if (capturingTransport.getCapturedRequestsAndClear().length > 0) { + break; + } + } + + final long timeoutAtMillis = deterministicTaskQueue.getCurrentTimeMillis() + + PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(Settings.EMPTY).millis(); + while (deterministicTaskQueue.getCurrentTimeMillis() < timeoutAtMillis) { + assertFoundPeers(otherNode); + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + } + + // need to wait for the connection to timeout, then for another wakeup, before discovering the peer + final long expectedTime = timeoutAtMillis + PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(Settings.EMPTY).millis(); + + while (deterministicTaskQueue.getCurrentTimeMillis() < expectedTime) { + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + } + + respondToRequests(node -> { + assertThat(node, is(otherNode)); + return new PeersResponse(Optional.empty(), singletonList(nodeToFind), randomNonNegativeLong()); + }); + + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + + assertFoundPeers(nodeToFind, otherNode); + } + public void testReconnectsToDisconnectedNodes() { final DiscoveryNode otherNode = newDiscoveryNode("original-node"); providedAddresses.add(otherNode.getAddress()); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 2bebc1ab244c0..4e46137c2de55 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -50,6 +50,7 @@ public class DeterministicTaskQueue extends AbstractComponent { private long currentTimeMillis; private long nextDeferredTaskExecutionTimeMillis = Long.MAX_VALUE; private long executionDelayVariabilityMillis; + private long latestDeferredExecutionTime; public DeterministicTaskQueue(Settings settings, Random random) { super(settings); @@ -149,6 +150,7 @@ public void scheduleAt(final long executionTimeMillis, final Runnable task) { private void scheduleDeferredTask(DeferredTask deferredTask) { nextDeferredTaskExecutionTimeMillis = Math.min(nextDeferredTaskExecutionTimeMillis, deferredTask.getExecutionTimeMillis()); + latestDeferredExecutionTime = Math.max(latestDeferredExecutionTime, deferredTask.getExecutionTimeMillis()); deferredTasks.add(deferredTask); } @@ -161,6 +163,7 @@ public void advanceTime() { logger.trace("advanceTime: from [{}ms] to [{}ms]", currentTimeMillis, nextDeferredTaskExecutionTimeMillis); currentTimeMillis = nextDeferredTaskExecutionTimeMillis; + assert currentTimeMillis <= latestDeferredExecutionTime : latestDeferredExecutionTime + " < " + currentTimeMillis; nextDeferredTaskExecutionTimeMillis = Long.MAX_VALUE; List remainingDeferredTasks = new ArrayList<>(); @@ -418,6 +421,10 @@ public ScheduledExecutorService scheduler() { }; } + public long getLatestDeferredExecutionTime() { + return latestDeferredExecutionTime; + } + private static class DeferredTask { private final long executionTimeMillis; private final Runnable task; diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java index b12596c8b388d..ae584901bf0bf 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java @@ -115,7 +115,7 @@ public String toString() { protected String getRequestDescription(long requestId, String action, DiscoveryNode destination) { return new ParameterizedMessage("[{}][{}] from {} to {}", - action, requestId, getLocalNode(), destination).getFormattedMessage(); + requestId, action, getLocalNode(), destination).getFormattedMessage(); } protected void onBlackholedDuringSend(long requestId, String action, DiscoveryNode destination) { diff --git a/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java b/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java index 8c341a7710ef9..f3636460e55b6 100644 --- a/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java +++ b/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java @@ -366,10 +366,12 @@ public void testDelayVariabilityAppliesToFutureTasks() { for (int i = 0; i < 100; i++) { deterministicTaskQueue.scheduleAt(delayMillis, () -> {}); } + final long expectedEndTime = deterministicTaskQueue.getLatestDeferredExecutionTime(); final long startTime = deterministicTaskQueue.getCurrentTimeMillis(); deterministicTaskQueue.runAllTasks(); final long elapsedTime = deterministicTaskQueue.getCurrentTimeMillis() - startTime; + assertThat(deterministicTaskQueue.getCurrentTimeMillis(), is(expectedEndTime)); assertThat(elapsedTime, greaterThan(delayMillis)); // fails with negligible probability assertThat(elapsedTime, lessThanOrEqualTo(delayMillis + variabilityMillis)); }