Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.discovery;

import java.nio.file.Path;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand All @@ -33,7 +34,8 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.Bridge;
Expand All @@ -52,17 +54,17 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {

static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places.

private ClusterDiscoveryConfiguration discoveryConfig;
private NodeConfigurationSource discoveryConfig;

@Override
protected Settings nodeSettings(int nodeOrdinal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to see if it's possible to pass in all the settings by implementing this method directly, rather than using the NodeConfigurationSource etc.

For instance it looks like these settings could be here:

            .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly
            .put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly
            .put("discovery.zen.join_timeout", "10s")  // still long to induce failures but to long so test won't time out
            .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly
            .put(TransportService.TCP_CONNECT_TIMEOUT.getKey(), "10s") // Network delay disruption waits for the min between this
//and
                .put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();

I also think these aren't needed and we can just make use of the machinery in ESIntegTestCase to set them appropriately:

                .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), numberOfNodes)
                .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minimumMasterNode)
                .putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "file")

We use non-default settings in very few places; moving the affected tests into their own fixtures so they can override nodeSettings themselves seems worth investigating.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DaveCTurner Let me try :) What makes this a little tricky (requiring more changes maybe?) is that we have this code in org.elasticsearch.discovery.ClusterDisruptionIT#testSearchWithRelocationAndSlowClusterStateProcessing

    /**
     * This test creates a scenario where a primary shard (0 replicas) relocates and is in POST_RECOVERY on the target
     * node but already deleted on the source node. Search request should still work.
     */
    public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Exception {
        // don't use DEFAULT settings (which can cause node disconnects on a slow CI machine)
        configureCluster(Settings.EMPTY, 3, 1);

to override these defaults (though this could just be me not being so fluent in this code yet :)). Maybe do this in a follow up PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that's one of the tests that I think could be in its own fixture since it needs different cluster settings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DaveCTurner wanna handle this here or can we move that to the next PR? (this one is already doing quite a few things)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, a follow-up is ok.

Expand Down Expand Up @@ -116,18 +118,14 @@ protected void beforeIndexDeletion() throws Exception {
}
}

List<String> startCluster(int numberOfNodes) throws ExecutionException, InterruptedException {
List<String> startCluster(int numberOfNodes) {
return startCluster(numberOfNodes, -1);
}

List<String> startCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException {
return startCluster(numberOfNodes, minimumMasterNode, null);
}

List<String> startCluster(int numberOfNodes, int minimumMasterNode, @Nullable int[] unicastHostsOrdinals) throws
ExecutionException, InterruptedException {
configureCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode);
List<String> nodes = internalCluster().startNodes(numberOfNodes);
List<String> startCluster(int numberOfNodes, int minimumMasterNode) {
configureCluster(numberOfNodes, minimumMasterNode);
InternalTestCluster internalCluster = internalCluster();
List<String> nodes = internalCluster.startNodes(numberOfNodes);
ensureStableCluster(numberOfNodes);

// TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results
Expand All @@ -154,20 +152,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

void configureCluster(
int numberOfNodes,
@Nullable int[] unicastHostsOrdinals,
int minimumMasterNode
) throws ExecutionException, InterruptedException {
configureCluster(DEFAULT_SETTINGS, numberOfNodes, unicastHostsOrdinals, minimumMasterNode);
void configureCluster(int numberOfNodes, int minimumMasterNode) {
configureCluster(DEFAULT_SETTINGS, numberOfNodes, minimumMasterNode);
}

void configureCluster(
Settings settings,
int numberOfNodes,
@Nullable int[] unicastHostsOrdinals,
int minimumMasterNode
) throws ExecutionException, InterruptedException {
void configureCluster(Settings settings, int numberOfNodes, int minimumMasterNode) {
if (minimumMasterNode < 0) {
minimumMasterNode = numberOfNodes / 2 + 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, we allow tests to set minimumMasterNode to the "wrong" value only to allow dynamically adding nodes after the cluster has started. If so, it'd be better to allow ESIntegTestCase to manage this number for us.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DaveCTurner Aren't we only ever passing -1 here from org.elasticsearch.discovery.AbstractDisruptionTestCase#startCluster(int, int) when we start the cluster? It seems like this is just a convenience method for not having to set a value by hand here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, let me clarify. I do not think this method should receive an explicit minimumMasterNode value at all, whether -1 or otherwise. In almost all cases, ESIntegTestCase now does a better job of automatically managing this setting than is done here. It was, however, possible that some of the tests were using this feature to set this value to something other than numberOfNodes / 2 + 1, for instance:

In the first two cases this is actually correct because numberOfNodes also includes some data-only nodes. The last case is kinda strange - I think it really can form two clusters, and I'm not sure what it's actually testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DaveCTurner given the short exchange in Slack, wanna keep this around for now maybe?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's do this in a follow-up. NB the action to take here is to make this a cluster with 1 master node and 1 data node, which'd then mean that we can fall back on ESIntegTestCase's management of the cluster configuration.

}
Expand All @@ -177,14 +166,21 @@ void configureCluster(
.put(settings)
.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), numberOfNodes)
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minimumMasterNode)
.putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "file")
.build();

if (discoveryConfig == null) {
if (unicastHostsOrdinals == null) {
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings);
} else {
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings, unicastHostsOrdinals);
}
discoveryConfig = new NodeConfigurationSource() {
@Override
public Settings nodeSettings(final int nodeOrdinal) {
return nodeSettings;
}

@Override
public Path nodeConfigPath(final int nodeOrdinal) {
return null;
}
};
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public void onFailure(Exception e) {
*/
public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Exception {
// don't use DEFAULT settings (which can cause node disconnects on a slow CI machine)
configureCluster(Settings.EMPTY, 3, null, 1);
configureCluster(Settings.EMPTY, 3, 1);
internalCluster().startMasterOnlyNode();
final String node_1 = internalCluster().startDataOnlyNode();

Expand All @@ -390,7 +390,7 @@ public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Excep

public void testIndexImportedFromDataOnlyNodesIfMasterLostDataFolder() throws Exception {
// test for https://github.com/elastic/elasticsearch/issues/8823
configureCluster(2, null, 1);
configureCluster(2, 1);
String masterNode = internalCluster().startMasterOnlyNode(Settings.EMPTY);
internalCluster().startDataOnlyNode(Settings.EMPTY);

Expand Down Expand Up @@ -421,7 +421,7 @@ public void testIndicesDeleted() throws Exception {
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed
.build();
final String idxName = "test";
configureCluster(settings, 3, null, 2);
configureCluster(settings, 3, 2);
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(2);
final String dataNode = internalCluster().startDataOnlyNode();
ensureStableCluster(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@
public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase {

public void testIsolatedUnicastNodes() throws Exception {
List<String> nodes = startCluster(4, -1, new int[]{0});
internalCluster().setHostsListContainsOnlyFirstNode(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has to happen before you start the cluster, or else the cluster will start with full knowledge.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right fixing

List<String> nodes = startCluster(4, -1);
// Figure out what is the elected master node
final String unicastTarget = nodes.get(0);

Expand Down Expand Up @@ -98,7 +99,8 @@ public void testIsolatedUnicastNodes() throws Exception {
* The rejoining node should take this master node and connect.
*/
public void testUnicastSinglePingResponseContainsMaster() throws Exception {
List<String> nodes = startCluster(4, -1, new int[]{0});
internalCluster().setHostsListContainsOnlyFirstNode(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has to happen before you start the cluster, or else the cluster will start with full knowledge.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right fixing

List<String> nodes = startCluster(4, -1);
// Figure out what is the elected master node
final String masterNode = internalCluster().getMasterName();
logger.info("---> legit elected master node={}", masterNode);
Expand Down Expand Up @@ -194,7 +196,7 @@ public void testClusterJoinDespiteOfPublishingIssues() throws Exception {
}

public void testClusterFormingWithASlowNode() throws Exception {
configureCluster(3, null, 2);
configureCluster(3, 2);

SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(random(), 0, 0, 1000, 2000);

Expand All @@ -210,7 +212,7 @@ public void testClusterFormingWithASlowNode() throws Exception {
}

public void testElectMasterWithLatestVersion() throws Exception {
configureCluster(3, null, 2);
configureCluster(3, 2);
final Set<String> nodes = new HashSet<>(internalCluster().startNodes(3));
ensureStableCluster(3);
ServiceDisruptionScheme isolateAllNodes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testDisruptionOnSnapshotInitialization() throws Exception {
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed
.build();
final String idxName = "test";
configureCluster(settings, 4, null, 2);
configureCluster(settings, 4, 2);
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(3);
final String dataNode = internalCluster().startDataOnlyNode();
ensureStableCluster(4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ public final class InternalTestCluster extends TestCluster {
private ServiceDisruptionScheme activeDisruptionScheme;
private Function<Client, Client> clientWrapper;

// If set to true only the first node in the cluster will be made a unicast node
private boolean hostsListContainsOnlyFirstNode;

public InternalTestCluster(
final long clusterSeed,
final Path baseDir,
Expand Down Expand Up @@ -1493,12 +1496,17 @@ private synchronized void startAndPublishNodesAndClients(List<NodeAndClient> nod

private final Object discoveryFileMutex = new Object();

private void rebuildUnicastHostFiles(Collection<NodeAndClient> newNodes) {
private void rebuildUnicastHostFiles(List<NodeAndClient> newNodes) {
// cannot be a synchronized method since it's called on other threads from within synchronized startAndPublishNodesAndClients()
synchronized (discoveryFileMutex) {
try {
List<String> discoveryFileContents = Stream.concat(nodes.values().stream(), newNodes.stream())
.map(nac -> nac.node.injector().getInstance(TransportService.class)).filter(Objects::nonNull)
Stream<NodeAndClient> unicastHosts = Stream.concat(nodes.values().stream(), newNodes.stream());
if (hostsListContainsOnlyFirstNode) {
unicastHosts = unicastHosts.limit(1L);
}
List<String> discoveryFileContents = unicastHosts.map(
nac -> nac.node.injector().getInstance(TransportService.class)
).filter(Objects::nonNull)
.map(TransportService::getLocalNode).filter(Objects::nonNull).filter(DiscoveryNode::isMasterNode)
.map(n -> n.getAddress().toString())
.distinct().collect(Collectors.toList());
Expand Down Expand Up @@ -1926,6 +1934,9 @@ public synchronized int numMasterNodes() {
return filterNodes(nodes, NodeAndClient::isMasterEligible).size();
}

public void setHostsListContainsOnlyFirstNode(boolean hostsListContainsOnlyFirstNode) {
this.hostsListContainsOnlyFirstNode = hostsListContainsOnlyFirstNode;
}

public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
assert activeDisruptionScheme == null :
Expand Down
Loading