From 57a24adfd75a1cb115ab99aac917dbf45179d0d0 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sun, 9 Sep 2018 13:44:56 +0200 Subject: [PATCH 01/10] Use file-based discovery not MockUncasedHostsProvider Today we use a special unicast hosts provider, the `MockUncasedHostsProvider`, in many integration tests, to deal with the dynamic nature of the allocation of ports to nodes. However #33241 allows us to use file-based discovery to achieve the same goal, so the special test-only `MockUncasedHostsProvider` is no longer required. This change removes `MockUncasedHostProvider` and replaces it with file-based discovery in tests based on `EsIntegTestCase`. --- .../discovery/DiscoveryModule.java | 2 +- .../java/org/elasticsearch/node/Node.java | 6 + .../cluster/ClusterInfoServiceIT.java | 7 +- .../zen/SettingsBasedHostProviderIT.java | 94 ++++++++++++++ .../search/SearchCancellationIT.java | 5 +- .../java/org/elasticsearch/node/MockNode.java | 18 ++- .../elasticsearch/test/ESIntegTestCase.java | 11 +- .../test/ESSingleNodeTestCase.java | 2 + .../test/InternalTestCluster.java | 121 +++++++++++++----- .../discovery/MockUncasedHostProvider.java | 91 ------------- .../test/discovery/TestZenDiscovery.java | 34 ----- 11 files changed, 220 insertions(+), 171 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java delete mode 100644 test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java diff --git a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index f34798605d784..91f4e6151594d 100644 --- a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -131,7 +131,7 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic if (discoverySupplier == null) { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); } - Loggers.getLogger(getClass(), settings).info("using discovery type [{}]", discoveryType); + Loggers.getLogger(getClass(), settings).info("using discovery type [{}] and host providers {}", discoveryType, hostsProviderNames); discovery = Objects.requireNonNull(discoverySupplier.get()); } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index c2ef6d12331fe..53b592d627b69 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -695,6 +695,8 @@ public Node start() throws NodeValidationException { assert localNodeFactory.getNode() != null; assert transportService.getLocalNode().equals(localNodeFactory.getNode()) : "transportService has a different local node than the factory provided"; + onTransportServiceStarted(); + final MetaData onDiskMetadata; try { // we load the global state here (the persistent part of the cluster state stored on disk) to @@ -771,6 +773,10 @@ public void onTimeout(TimeValue timeout) { return this; } + // For notifying tests that discovery can be configured + protected void onTransportServiceStarted() { + } + private Node stop() { if (!lifecycle.moveToStopped()) { return this; diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index b17a0cc5418e9..bdc1731b784b7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -113,6 +113,7 @@ public void blockActions(String... actions) { @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) // manual collection or upon cluster forming. .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2) .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.getKey(), "1s") @@ -121,8 +122,7 @@ protected Settings nodeSettings(int nodeOrdinal) { @Override protected Collection> nodePlugins() { - return Arrays.asList(TestPlugin.class, - MockTransportService.TestPlugin.class); + return Arrays.asList(TestPlugin.class, MockTransportService.TestPlugin.class); } public void testClusterInfoServiceCollectsInformation() throws Exception { @@ -172,7 +172,7 @@ public void testClusterInfoServiceCollectsInformation() throws Exception { } } - public void testClusterInfoServiceInformationClearOnError() throws InterruptedException, ExecutionException { + public void testClusterInfoServiceInformationClearOnError() { internalCluster().startNodes(2, // manually control publishing Settings.builder().put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m").build()); @@ -186,7 +186,6 @@ public void testClusterInfoServiceInformationClearOnError() throws InterruptedEx assertThat("some usages are populated", info.getNodeLeastAvailableDiskUsages().size(), Matchers.equalTo(2)); assertThat("some shard sizes are populated", info.shardSizes.size(), greaterThan(0)); - MockTransportService mockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, internalTestCluster.getMasterName()); final AtomicBoolean timeout = new AtomicBoolean(false); diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java b/server/src/test/java/org/elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java new file mode 100644 index 0000000000000..0a2a06b932ca3 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java @@ -0,0 +1,94 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.zen; + +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.Settings.Builder; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.test.ESIntegTestCase; +import org.junit.Before; + +import java.util.function.Consumer; + +import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING; +import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; +import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.LIMIT_LOCAL_PORTS_COUNT; +import static org.elasticsearch.transport.TcpTransport.PORT; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) +public class SettingsBasedHostProviderIT extends ESIntegTestCase { + + private Consumer configureDiscovery; + + @Before + public void setDefaultDiscoveryConfiguration() { + configureDiscovery = b -> { + }; + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal)); + + // super.nodeSettings enables file-based discovery, but here we disable it again so we can test the static list: + if (randomBoolean()) { + builder.putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey()); + } else { + builder.remove(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey()); + } + + builder.remove(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()); + + configureDiscovery.accept(builder); + return builder.build(); + } + + public void testClusterFormsWithSingleSeedHostInSettings() { + final String seedNodeName = internalCluster().startNode(); + final NodesInfoResponse nodesInfoResponse + = client(seedNodeName).admin().cluster().nodesInfo(new NodesInfoRequest("_local")).actionGet(); + final String seedNodeAddress = nodesInfoResponse.getNodes().get(0).getTransport().getAddress().publishAddress().toString(); + configureDiscovery = b -> b.putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey(), seedNodeAddress); + logger.info("--> using seed node address {}", seedNodeAddress); + + int extraNodes = randomIntBetween(1, 5); + internalCluster().startNodes(extraNodes); + + ensureStableCluster(extraNodes + 1); + } + + public void testClusterFormsByScanningPorts() { + final String seedNodeName = internalCluster().startNode(); + final NodesInfoResponse nodesInfoResponse + = client(seedNodeName).admin().cluster().nodesInfo(new NodesInfoRequest("_local")).actionGet(); + final int seedNodePort = nodesInfoResponse.getNodes().get(0).getTransport().getAddress().publishAddress().getPort(); + final int minPort = randomIntBetween(seedNodePort - LIMIT_LOCAL_PORTS_COUNT + 1, seedNodePort - 1); + final String portSpec = minPort + "-" + seedNodePort; + + logger.info("--> using port specification [{}]", portSpec); + + configureDiscovery = b -> b.put(PORT.getKey(), portSpec); + + internalCluster().startNode(); + ensureStableCluster(2); + } +} diff --git a/server/src/test/java/org/elasticsearch/search/SearchCancellationIT.java b/server/src/test/java/org/elasticsearch/search/SearchCancellationIT.java index 0294f9f67f88c..2e28d16c71dcd 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchCancellationIT.java +++ b/server/src/test/java/org/elasticsearch/search/SearchCancellationIT.java @@ -69,7 +69,10 @@ protected Collection> nodePlugins() { protected Settings nodeSettings(int nodeOrdinal) { boolean lowLevelCancellation = randomBoolean(); logger.info("Using lowLevelCancellation: {}", lowLevelCancellation); - return Settings.builder().put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), lowLevelCancellation).build(); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), lowLevelCancellation) + .build(); } private void indexTestData() { diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index 67d91e97e1661..855065d084748 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -67,6 +67,7 @@ public class MockNode extends Node { private final Collection> classpathPlugins; + private final Runnable onTransportServiceStarted; public MockNode(final Settings settings, final Collection> classpathPlugins) { this(settings, classpathPlugins, true); @@ -76,26 +77,30 @@ public MockNode( final Settings settings, final Collection> classpathPlugins, final boolean forbidPrivateIndexSettings) { - this(settings, classpathPlugins, null, forbidPrivateIndexSettings); + this(settings, classpathPlugins, null, forbidPrivateIndexSettings, () -> {}); } public MockNode( final Settings settings, final Collection> classpathPlugins, final Path configPath, - final boolean forbidPrivateIndexSettings) { + final boolean forbidPrivateIndexSettings, + final Runnable onTransportServiceStarted) { this( InternalSettingsPreparer.prepareEnvironment(settings, Collections.emptyMap(), configPath), classpathPlugins, - forbidPrivateIndexSettings); + forbidPrivateIndexSettings, + onTransportServiceStarted); } private MockNode( final Environment environment, final Collection> classpathPlugins, - final boolean forbidPrivateIndexSettings) { + final boolean forbidPrivateIndexSettings, + final Runnable onTransportServiceStarted) { super(environment, classpathPlugins, forbidPrivateIndexSettings); this.classpathPlugins = classpathPlugins; + this.onTransportServiceStarted = onTransportServiceStarted; } /** @@ -179,4 +184,9 @@ protected HttpServerTransport newHttpTransport(NetworkModule networkModule) { protected void registerDerivedNodeNameWithLogger(String nodeName) { // Nothing to do because test uses the thread name } + + @Override + protected void onTransportServiceStarted() { + onTransportServiceStarted.run(); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 68a862c109d98..ed49bafe6fda4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -204,6 +204,8 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.common.util.CollectionUtils.eagerPartition; +import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING; +import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.XContentTestUtils.convertToMap; import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder; @@ -1806,7 +1808,9 @@ protected Settings nodeSettings(int nodeOrdinal) { // wait short time for other active shards before actually deleting, default 30s not needed in tests .put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT.getKey(), new TimeValue(1, TimeUnit.SECONDS)) // randomly enable low-level search cancellation to make sure it does not alter results - .put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), randomBoolean()); + .put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), randomBoolean()) + .putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes + .putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "file"); if (rarely()) { // Sometimes adjust the minimum search thread pool size, causing // QueueResizingEsThreadPoolExecutor to be used instead of a regular @@ -1919,7 +1923,7 @@ protected NodeConfigurationSource getNodeConfigSource() { networkSettings.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType()); } - NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() { + return new NodeConfigurationSource() { @Override public Settings nodeSettings(int nodeOrdinal) { return Settings.builder() @@ -1953,7 +1957,6 @@ public Collection> transportClientPlugins() { return Collections.unmodifiableCollection(plugins); } }; - return nodeConfigurationSource; } /** @@ -2027,7 +2030,7 @@ protected Collection> getMockPlugins() { public static final class TestSeedPlugin extends Plugin { @Override public List> getSettings() { - return Arrays.asList(INDEX_TEST_SEED_SETTING); + return Collections.singletonList(INDEX_TEST_SEED_SETTING); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index d73520f91b3e3..bcaa4e8303f53 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -62,6 +62,7 @@ import java.util.Collection; import java.util.Collections; +import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -197,6 +198,7 @@ private Node newNode() { // turning on the real memory circuit breaker leads to spurious test failures. As have no full control over heap usage, we // turn it off for these tests. .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false) + .putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes .put(nodeSettings()) // allow test cases to provide their own settings or override these .build(); Collection> plugins = getPlugins(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 3c46acd0fbe73..a352a76e47b5c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -102,6 +102,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.InetSocketAddress; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; @@ -113,9 +114,11 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -127,10 +130,12 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.Collections.emptyList; import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY; import static org.apache.lucene.util.LuceneTestCase.rarely; import static org.elasticsearch.discovery.DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING; import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; +import static org.elasticsearch.discovery.zen.FileBasedUnicastHostsProvider.UNICAST_HOSTS_FILE; import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.elasticsearch.test.ESTestCase.awaitBusy; import static org.elasticsearch.test.ESTestCase.getTestTransportType; @@ -140,6 +145,7 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -485,11 +491,13 @@ private void ensureOpen() { private synchronized NodeAndClient getOrBuildRandomNode() { ensureOpen(); - NodeAndClient randomNodeAndClient = getRandomNodeAndClient(); + final NodeAndClient randomNodeAndClient = getRandomNodeAndClient(); if (randomNodeAndClient != null) { return randomNodeAndClient; } - NodeAndClient buildNode = buildNode(1); + final int ord = nextNodeId.getAndIncrement(); + final Runnable onTransportServiceStarted = () -> {}; // do not create unicast host file for this one node. + final NodeAndClient buildNode = buildNode(ord, random.nextLong(), null, false, 1, onTransportServiceStarted); buildNode.startNode(); publishNode(buildNode); return buildNode; @@ -561,20 +569,11 @@ public synchronized void ensureAtMostNumDataNodes(int n) throws IOException { * * @param settings the settings to use * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed + * @param onTransportServiceStarted callback to run when transport service is started */ - private NodeAndClient buildNode(Settings settings, int defaultMinMasterNodes) { - int ord = nextNodeId.getAndIncrement(); - return buildNode(ord, random.nextLong(), settings, false, defaultMinMasterNodes); - } - - /** - * builds a new node with default settings - * - * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed - */ - private NodeAndClient buildNode(int defaultMinMasterNodes) { + private NodeAndClient buildNode(Settings settings, int defaultMinMasterNodes, Runnable onTransportServiceStarted) { int ord = nextNodeId.getAndIncrement(); - return buildNode(ord, random.nextLong(), null, false, defaultMinMasterNodes); + return buildNode(ord, random.nextLong(), settings, false, defaultMinMasterNodes, onTransportServiceStarted); } /** @@ -586,15 +585,17 @@ private NodeAndClient buildNode(int defaultMinMasterNodes) { * @param reuseExisting if a node with the same name is already part of {@link #nodes}, no new node will be built and * the method will return the existing one * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed + * @param onTransportServiceStarted callback to run when transport service is started */ private NodeAndClient buildNode(int nodeId, long seed, Settings settings, - boolean reuseExisting, int defaultMinMasterNodes) { + boolean reuseExisting, int defaultMinMasterNodes, Runnable onTransportServiceStarted) { assert Thread.holdsLock(this); ensureOpen(); settings = getSettings(nodeId, seed, settings); Collection> plugins = getPlugins(); String name = buildNodeName(nodeId, settings); if (reuseExisting && nodes.containsKey(name)) { + onTransportServiceStarted.run(); // reusing an existing node implies its transport service already started return nodes.get(name); } else { assert reuseExisting == true || nodes.containsKey(name) == false : @@ -629,7 +630,8 @@ private NodeAndClient buildNode(int nodeId, long seed, Settings settings, finalSettings.build(), plugins, nodeConfigurationSource.nodeConfigPath(nodeId), - forbidPrivateIndexSettings); + forbidPrivateIndexSettings, + onTransportServiceStarted); try { IOUtils.close(secureSettings); } catch (IOException e) { @@ -906,14 +908,15 @@ void restart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterN if (!node.isClosed()) { closeNode(); } - recreateNodeOnRestart(callback, clearDataIfNeeded, minMasterNodes); + recreateNodeOnRestart(callback, clearDataIfNeeded, minMasterNodes, () -> rebuildUnicastHostFiles(emptyList())); startNode(); } /** * rebuilds a new node object using the current node settings and starts it */ - void recreateNodeOnRestart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes) throws Exception { + void recreateNodeOnRestart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes, + Runnable onTransportServiceStarted) throws Exception { assert callback != null; Settings callbackSettings = callback.onNodeStopped(name); Settings.Builder newSettings = Settings.builder(); @@ -927,7 +930,7 @@ void recreateNodeOnRestart(RestartCallback callback, boolean clearDataIfNeeded, if (clearDataIfNeeded) { clearDataIfNeeded(callback); } - createNewNode(newSettings.build()); + createNewNode(newSettings.build(), onTransportServiceStarted); // make sure cached client points to new node resetClient(); } @@ -943,7 +946,7 @@ private void clearDataIfNeeded(RestartCallback callback) throws IOException { } } - private void createNewNode(final Settings newSettings) { + private void createNewNode(final Settings newSettings, final Runnable onTransportServiceStarted) { final long newIdSeed = NodeEnvironment.NODE_ID_SEED_SETTING.get(node.settings()) + 1; // use a new seed to make sure we have new node id Settings finalSettings = Settings.builder().put(node.originalSettings()).put(newSettings).put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), newIdSeed).build(); if (DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(finalSettings) == false) { @@ -951,7 +954,7 @@ private void createNewNode(final Settings newSettings) { " is not configured after restart of [" + name + "]"); } Collection> plugins = node.getClasspathPlugins(); - node = new MockNode(finalSettings, plugins); + node = new MockNode(finalSettings, plugins, null, true, onTransportServiceStarted); markNodeDataDirsAsNotEligableForWipe(node); } @@ -968,6 +971,31 @@ public void close() throws IOException { public static final String TRANSPORT_CLIENT_PREFIX = "transport_client_"; + private class TransportServiceStartupCountDown { + private final int initialCount; + private final CountDownLatch countDownLatch; + + TransportServiceStartupCountDown(int count) { + initialCount = count; + countDownLatch = new CountDownLatch(count); + } + + void countDown() { + logger.info("transport service started: {} of {} remaining", countDownLatch.getCount() - 1, initialCount); + countDownLatch.countDown(); + } + + void await() { + logger.info("waiting for all transport services to start: {} of {} remaining", countDownLatch.getCount(), initialCount); + try { + assertTrue("transport service startup timed out", countDownLatch.await(30L, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + logger.info("all {} transport services started", initialCount); + } + } + static class TransportClientFactory { private final boolean sniff; private final Settings settings; @@ -1054,11 +1082,13 @@ private synchronized void reset(boolean wipeData) throws IOException { final int numberOfMasterNodes = numSharedDedicatedMasterNodes > 0 ? numSharedDedicatedMasterNodes : numSharedDataNodes; final int defaultMinMasterNodes = (numberOfMasterNodes / 2) + 1; final List toStartAndPublish = new ArrayList<>(); // we want to start nodes in one go due to min master nodes + final TransportServiceStartupCountDown transportServiceStartupCountDown = new TransportServiceStartupCountDown(newSize); for (int i = 0; i < numSharedDedicatedMasterNodes; i++) { final Settings.Builder settings = Settings.builder(); settings.put(Node.NODE_MASTER_SETTING.getKey(), true); settings.put(Node.NODE_DATA_SETTING.getKey(), false); - NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes); + NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes, + transportServiceStartupCountDown::countDown); toStartAndPublish.add(nodeAndClient); } for (int i = numSharedDedicatedMasterNodes; i < numSharedDedicatedMasterNodes + numSharedDataNodes; i++) { @@ -1068,18 +1098,20 @@ private synchronized void reset(boolean wipeData) throws IOException { settings.put(Node.NODE_MASTER_SETTING.getKey(), false).build(); settings.put(Node.NODE_DATA_SETTING.getKey(), true).build(); } - NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes); + NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes, + transportServiceStartupCountDown::countDown); toStartAndPublish.add(nodeAndClient); } for (int i = numSharedDedicatedMasterNodes + numSharedDataNodes; i < numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes; i++) { final Builder settings = Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false) .put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_INGEST_SETTING.getKey(), false); - NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes); + NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes, + transportServiceStartupCountDown::countDown); toStartAndPublish.add(nodeAndClient); } - startAndPublishNodesAndClients(toStartAndPublish); + startAndPublishNodesAndClients(toStartAndPublish, transportServiceStartupCountDown::await); nextNodeId.set(newSize); assert size() == newSize; @@ -1414,7 +1446,7 @@ public synchronized void stopRandomNonMasterNode() throws IOException { } } - private synchronized void startAndPublishNodesAndClients(List nodeAndClients) { + private synchronized void startAndPublishNodesAndClients(List nodeAndClients, Runnable awaitTransportServicesStarted) { if (nodeAndClients.size() > 0) { final int newMasters = (int) nodeAndClients.stream().filter(NodeAndClient::isMasterEligible) .filter(nac -> nodes.containsKey(nac.name) == false) // filter out old masters @@ -1426,6 +1458,10 @@ private synchronized void startAndPublishNodesAndClients(List nod updateMinMasterNodes(currentMasters + newMasters); } List> futures = nodeAndClients.stream().map(node -> executor.submit(node::startNode)).collect(Collectors.toList()); + + awaitTransportServicesStarted.run(); + rebuildUnicastHostFiles(nodeAndClients); + try { for (Future future : futures) { future.get(); @@ -1446,6 +1482,25 @@ private synchronized void startAndPublishNodesAndClients(List nod } } + private void rebuildUnicastHostFiles(Collection newNodes) { + try { + List discoveryFileContents = Stream.concat(nodes.values().stream(), newNodes.stream()) + .map(nac -> nac.node.injector().getInstance(TransportService.class)).filter(Objects::nonNull) + .map(TransportService::getLocalNode).filter(Objects::nonNull).filter(DiscoveryNode::isMasterNode) + .map(n -> n.getAddress().toString()) + .distinct().collect(Collectors.toList()); + Set configPaths = Stream.concat(nodes.values().stream(), newNodes.stream()) + .map(nac -> nac.node.getEnvironment().configFile()).collect(Collectors.toSet()); + logger.info("configuring discovery with {} at {}", discoveryFileContents, configPaths); + for (final Path configPath : configPaths) { + Files.createDirectories(configPath); + Files.write(configPath.resolve(UNICAST_HOSTS_FILE), discoveryFileContents); // TODO do we need to do this atomically? + } + } catch (IOException e) { + throw new AssertionError("failed to configure file-based discovery", e); + } + } + private synchronized void stopNodesAndClient(NodeAndClient nodeAndClient) throws IOException { stopNodesAndClients(Collections.singleton(nodeAndClient)); } @@ -1615,14 +1670,15 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception } assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == 0; - // do two rounds to minimize pinging (mock zen pings pings with no delay and can create a lot of logs) + final TransportServiceStartupCountDown transportServiceStartupCountDown = new TransportServiceStartupCountDown(startUpOrder.size()); for (NodeAndClient nodeAndClient : startUpOrder) { logger.info("resetting node [{}] ", nodeAndClient.name); // we already cleared data folders, before starting nodes up - nodeAndClient.recreateNodeOnRestart(callback, false, autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1); + nodeAndClient.recreateNodeOnRestart(callback, false, autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1, + transportServiceStartupCountDown::countDown); } - startAndPublishNodesAndClients(startUpOrder); + startAndPublishNodesAndClients(startUpOrder, transportServiceStartupCountDown::await); if (callback.validateClusterForming()) { validateClusterFormed(); @@ -1739,10 +1795,11 @@ public synchronized List startNodes(Settings... settings) { defaultMinMasterNodes = -1; } List nodes = new ArrayList<>(); - for (Settings nodeSettings: settings) { - nodes.add(buildNode(nodeSettings, defaultMinMasterNodes)); + TransportServiceStartupCountDown transportServiceStartupCountDown = new TransportServiceStartupCountDown(settings.length); + for (Settings nodeSettings : settings) { + nodes.add(buildNode(nodeSettings, defaultMinMasterNodes, transportServiceStartupCountDown::countDown)); } - startAndPublishNodesAndClients(nodes); + startAndPublishNodesAndClients(nodes, transportServiceStartupCountDown::await); if (autoManageMinMasterNodes) { validateClusterFormed(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java deleted file mode 100644 index dc9304637cdca..0000000000000 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.test.discovery; - -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.discovery.zen.UnicastHostsProvider; - -import java.io.Closeable; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -/** - * A {@link UnicastHostsProvider} implementation which returns results based on a static in-memory map. This allows running - * with nodes that only determine their transport address at runtime, which is the default behavior of - * {@link org.elasticsearch.test.InternalTestCluster} - */ -public final class MockUncasedHostProvider implements UnicastHostsProvider, Closeable { - - static final Map> activeNodesPerCluster = new HashMap<>(); - - - private final Supplier localNodeSupplier; - private final ClusterName clusterName; - - public MockUncasedHostProvider(Supplier localNodeSupplier, ClusterName clusterName) { - this.localNodeSupplier = localNodeSupplier; - this.clusterName = clusterName; - synchronized (activeNodesPerCluster) { - getActiveNodesForCurrentCluster().add(this); - } - } - - @Override - public List buildDynamicHosts(HostsResolver hostsResolver) { - final DiscoveryNode localNode = getNode(); - assert localNode != null; - synchronized (activeNodesPerCluster) { - Set activeNodes = getActiveNodesForCurrentCluster(); - return activeNodes.stream() - .map(MockUncasedHostProvider::getNode) - .filter(Objects::nonNull) - .filter(n -> localNode.equals(n) == false) - .map(DiscoveryNode::getAddress) - .collect(Collectors.toList()); - } - } - - @Nullable - private DiscoveryNode getNode() { - return localNodeSupplier.get(); - } - - private Set getActiveNodesForCurrentCluster() { - assert Thread.holdsLock(activeNodesPerCluster); - return activeNodesPerCluster.computeIfAbsent(clusterName, - clusterName -> ConcurrentCollections.newConcurrentSet()); - } - - @Override - public void close() { - synchronized (activeNodesPerCluster) { - boolean found = getActiveNodesForCurrentCluster().remove(this); - assert found; - } - } -} diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java index 5387a659aa274..2c8305b4e12bb 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java @@ -19,13 +19,10 @@ package org.elasticsearch.test.discovery; -import org.apache.lucene.util.SetOnce; -import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -39,7 +36,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -59,7 +55,6 @@ public class TestZenDiscovery extends ZenDiscovery { /** A plugin which installs mock discovery and configures it to be used. */ public static class TestPlugin extends Plugin implements DiscoveryPlugin { protected final Settings settings; - private final SetOnce unicastHostProvider = new SetOnce<>(); public TestPlugin(Settings settings) { this.settings = settings; } @@ -78,26 +73,6 @@ public Map> getDiscoveryTypes(ThreadPool threadPool, clusterApplier, clusterSettings, hostsProvider, allocationService)); } - @Override - public Map> getZenHostsProviders(TransportService transportService, - NetworkService networkService) { - final Supplier supplier; - if (USE_MOCK_PINGS.get(settings)) { - // we have to return something in order for the unicast host provider setting to resolve to something. It will never be used - supplier = () -> hostsResolver -> { - throw new UnsupportedOperationException(); - }; - } else { - supplier = () -> { - unicastHostProvider.set( - new MockUncasedHostProvider(transportService::getLocalNode, ClusterName.CLUSTER_NAME_SETTING.get(settings)) - ); - return unicastHostProvider.get(); - }; - } - return Collections.singletonMap("test-zen", supplier); - } - @Override public List> getSettings() { return Collections.singletonList(USE_MOCK_PINGS); @@ -107,18 +82,9 @@ public List> getSettings() { public Settings additionalSettings() { return Settings.builder() .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "test-zen") - .put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "test-zen") .putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()) .build(); } - - @Override - public void close() throws IOException { - super.close(); - if (unicastHostProvider.get() != null) { - unicastHostProvider.get().close(); - } - } } private TestZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService, From 721a1795d2fc3d40cb587b77767fbcd902584613 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 10 Sep 2018 10:54:42 +0200 Subject: [PATCH 02/10] Imports --- .../java/org/elasticsearch/cluster/ClusterInfoServiceIT.java | 1 - .../elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java | 1 - 2 files changed, 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index bdc1731b784b7..d0a7f91a86c32 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -53,7 +53,6 @@ import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.emptySet; diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java b/server/src/test/java/org/elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java index 0a2a06b932ca3..d08f9f226362d 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings.Builder; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESIntegTestCase; import org.junit.Before; From 924768fb4a0598a7432b14251b6bbf2ab8d5522a Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 11 Sep 2018 21:36:45 +0200 Subject: [PATCH 03/10] Configure file-based discovery in security test --- .../java/org/elasticsearch/license/LicensingTests.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java index 7a35b0bc422ae..c408b825ee84e 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java @@ -8,6 +8,7 @@ import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsIndices; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse; @@ -52,8 +53,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Set; +import java.util.stream.Collectors; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.discovery.zen.FileBasedUnicastHostsProvider.UNICAST_HOSTS_FILE; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -278,9 +282,13 @@ public void testNodeJoinWithoutSecurityExplicitlyEnabled() throws Exception { enableLicensing(mode); ensureGreen(); + final Set unicastHostsFileLines = internalCluster().masterClient().admin().cluster().nodesInfo(new NodesInfoRequest()) + .get().getNodes().stream().map(n -> n.getTransport().getAddress().publishAddress().toString()).collect(Collectors.toSet()); + Path home = createTempDir(); Path conf = home.resolve("config"); Files.createDirectories(conf); + Files.write(conf.resolve(UNICAST_HOSTS_FILE), unicastHostsFileLines); Settings nodeSettings = Settings.builder() .put(nodeSettings(maxNumberOfNodes() - 1).filter(s -> "xpack.security.enabled".equals(s) == false)) .put("node.name", "my-test-node") @@ -291,7 +299,7 @@ public void testNodeJoinWithoutSecurityExplicitlyEnabled() throws Exception { .put("path.home", home) .put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false) .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "test-zen") - .put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "test-zen") + .putList(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "file") .build(); Collection> mockPlugins = Arrays.asList(LocalStateSecurity.class, TestZenDiscovery.TestPlugin.class, MockHttpTransport.TestPlugin.class); From 4adf96589805adb190a232153b905bdba86f8189 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 11 Sep 2018 21:49:08 +0200 Subject: [PATCH 04/10] Use LifecycleListener rather than onTransportServiceStarted --- .../main/java/org/elasticsearch/node/Node.java | 5 ----- .../java/org/elasticsearch/node/MockNode.java | 18 ++++-------------- .../test/InternalTestCluster.java | 18 +++++++++++++++--- 3 files changed, 19 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 630cd18f7f222..d843799379204 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -705,7 +705,6 @@ public Node start() throws NodeValidationException { assert localNodeFactory.getNode() != null; assert transportService.getLocalNode().equals(localNodeFactory.getNode()) : "transportService has a different local node than the factory provided"; - onTransportServiceStarted(); final MetaData onDiskMetadata; try { @@ -783,10 +782,6 @@ public void onTimeout(TimeValue timeout) { return this; } - // For notifying tests that discovery can be configured - protected void onTransportServiceStarted() { - } - private Node stop() { if (!lifecycle.moveToStopped()) { return this; diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index 855065d084748..67d91e97e1661 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -67,7 +67,6 @@ public class MockNode extends Node { private final Collection> classpathPlugins; - private final Runnable onTransportServiceStarted; public MockNode(final Settings settings, final Collection> classpathPlugins) { this(settings, classpathPlugins, true); @@ -77,30 +76,26 @@ public MockNode( final Settings settings, final Collection> classpathPlugins, final boolean forbidPrivateIndexSettings) { - this(settings, classpathPlugins, null, forbidPrivateIndexSettings, () -> {}); + this(settings, classpathPlugins, null, forbidPrivateIndexSettings); } public MockNode( final Settings settings, final Collection> classpathPlugins, final Path configPath, - final boolean forbidPrivateIndexSettings, - final Runnable onTransportServiceStarted) { + final boolean forbidPrivateIndexSettings) { this( InternalSettingsPreparer.prepareEnvironment(settings, Collections.emptyMap(), configPath), classpathPlugins, - forbidPrivateIndexSettings, - onTransportServiceStarted); + forbidPrivateIndexSettings); } private MockNode( final Environment environment, final Collection> classpathPlugins, - final boolean forbidPrivateIndexSettings, - final Runnable onTransportServiceStarted) { + final boolean forbidPrivateIndexSettings) { super(environment, classpathPlugins, forbidPrivateIndexSettings); this.classpathPlugins = classpathPlugins; - this.onTransportServiceStarted = onTransportServiceStarted; } /** @@ -184,9 +179,4 @@ protected HttpServerTransport newHttpTransport(NetworkModule networkModule) { protected void registerDerivedNodeNameWithLogger(String nodeName) { // Nothing to do because test uses the thread name } - - @Override - protected void onTransportServiceStarted() { - onTransportServiceStarted.run(); - } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index a352a76e47b5c..da607d0335735 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -47,6 +47,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.lease.Releasables; @@ -630,8 +631,13 @@ private NodeAndClient buildNode(int nodeId, long seed, Settings settings, finalSettings.build(), plugins, nodeConfigurationSource.nodeConfigPath(nodeId), - forbidPrivateIndexSettings, - onTransportServiceStarted); + forbidPrivateIndexSettings); + node.injector().getInstance(TransportService.class).addLifecycleListener(new LifecycleListener() { + @Override + public void afterStart() { + onTransportServiceStarted.run(); + } + }); try { IOUtils.close(secureSettings); } catch (IOException e) { @@ -954,7 +960,13 @@ private void createNewNode(final Settings newSettings, final Runnable onTranspor " is not configured after restart of [" + name + "]"); } Collection> plugins = node.getClasspathPlugins(); - node = new MockNode(finalSettings, plugins, null, true, onTransportServiceStarted); + node = new MockNode(finalSettings, plugins); + node.injector().getInstance(TransportService.class).addLifecycleListener(new LifecycleListener() { + @Override + public void afterStart() { + onTransportServiceStarted.run(); + } + }); markNodeDataDirsAsNotEligableForWipe(node); } From 82a53294f192b65aeba2efaaebd9dc6083e2b0f0 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 12 Sep 2018 13:28:29 +0200 Subject: [PATCH 05/10] Whitespace --- server/src/main/java/org/elasticsearch/node/Node.java | 1 - .../java/org/elasticsearch/cluster/ClusterInfoServiceIT.java | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index d843799379204..67c3894ddf40a 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -705,7 +705,6 @@ public Node start() throws NodeValidationException { assert localNodeFactory.getNode() != null; assert transportService.getLocalNode().equals(localNodeFactory.getNode()) : "transportService has a different local node than the factory provided"; - final MetaData onDiskMetadata; try { // we load the global state here (the persistent part of the cluster state stored on disk) to diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index d0a7f91a86c32..1a0e964ef7740 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -185,6 +185,7 @@ public void testClusterInfoServiceInformationClearOnError() { assertThat("some usages are populated", info.getNodeLeastAvailableDiskUsages().size(), Matchers.equalTo(2)); assertThat("some shard sizes are populated", info.shardSizes.size(), greaterThan(0)); + MockTransportService mockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, internalTestCluster.getMasterName()); final AtomicBoolean timeout = new AtomicBoolean(false); From edf1feb4ee6959be32b8d9471ad444a1230561f0 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 12 Sep 2018 13:41:42 +0200 Subject: [PATCH 06/10] Just pass the settings directly to the nodes as they are started --- .../zen/SettingsBasedHostProviderIT.java | 26 +++++-------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java b/server/src/test/java/org/elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java index d08f9f226362d..429950bf8530a 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java @@ -22,11 +22,7 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.settings.Settings.Builder; import org.elasticsearch.test.ESIntegTestCase; -import org.junit.Before; - -import java.util.function.Consumer; import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING; import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; @@ -36,14 +32,6 @@ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) public class SettingsBasedHostProviderIT extends ESIntegTestCase { - private Consumer configureDiscovery; - - @Before - public void setDefaultDiscoveryConfiguration() { - configureDiscovery = b -> { - }; - } - @Override protected Settings nodeSettings(int nodeOrdinal) { Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal)); @@ -55,9 +43,9 @@ protected Settings nodeSettings(int nodeOrdinal) { builder.remove(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey()); } + // super.nodeSettings sets this to an empty list, which disables any search for other nodes, but here we want this to happen: builder.remove(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()); - configureDiscovery.accept(builder); return builder.build(); } @@ -66,16 +54,19 @@ public void testClusterFormsWithSingleSeedHostInSettings() { final NodesInfoResponse nodesInfoResponse = client(seedNodeName).admin().cluster().nodesInfo(new NodesInfoRequest("_local")).actionGet(); final String seedNodeAddress = nodesInfoResponse.getNodes().get(0).getTransport().getAddress().publishAddress().toString(); - configureDiscovery = b -> b.putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey(), seedNodeAddress); logger.info("--> using seed node address {}", seedNodeAddress); int extraNodes = randomIntBetween(1, 5); - internalCluster().startNodes(extraNodes); + internalCluster().startNodes(extraNodes, + Settings.builder().putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey(), seedNodeAddress).build()); ensureStableCluster(extraNodes + 1); } public void testClusterFormsByScanningPorts() { + // This test will fail if all 4 ports just less than the one used by the first node are already bound by something else. It's hard + // to know how often this might happen in reality, so let's try it and see. + final String seedNodeName = internalCluster().startNode(); final NodesInfoResponse nodesInfoResponse = client(seedNodeName).admin().cluster().nodesInfo(new NodesInfoRequest("_local")).actionGet(); @@ -84,10 +75,7 @@ public void testClusterFormsByScanningPorts() { final String portSpec = minPort + "-" + seedNodePort; logger.info("--> using port specification [{}]", portSpec); - - configureDiscovery = b -> b.put(PORT.getKey(), portSpec); - - internalCluster().startNode(); + internalCluster().startNode(Settings.builder().put(PORT.getKey(), portSpec)); ensureStableCluster(2); } } From 13b6046d0c78698e700ba2ce2ef7c5e9956ac010 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 12 Sep 2018 13:42:32 +0200 Subject: [PATCH 07/10] TODO resolved (no we don't) --- .../main/java/org/elasticsearch/test/InternalTestCluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index da607d0335735..ae5bf3205e4b1 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1506,7 +1506,7 @@ private void rebuildUnicastHostFiles(Collection newNodes) { logger.info("configuring discovery with {} at {}", discoveryFileContents, configPaths); for (final Path configPath : configPaths) { Files.createDirectories(configPath); - Files.write(configPath.resolve(UNICAST_HOSTS_FILE), discoveryFileContents); // TODO do we need to do this atomically? + Files.write(configPath.resolve(UNICAST_HOSTS_FILE), discoveryFileContents); } } catch (IOException e) { throw new AssertionError("failed to configure file-based discovery", e); From 2947a7a125c6662fbda8371c67a138d3ad5cf613 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 12 Sep 2018 13:49:30 +0200 Subject: [PATCH 08/10] No need for a file here --- .../org/elasticsearch/license/LicensingTests.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java index c408b825ee84e..7c470cb442eea 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java @@ -53,11 +53,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Set; +import java.util.List; import java.util.stream.Collectors; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.discovery.zen.FileBasedUnicastHostsProvider.UNICAST_HOSTS_FILE; +import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -282,13 +282,13 @@ public void testNodeJoinWithoutSecurityExplicitlyEnabled() throws Exception { enableLicensing(mode); ensureGreen(); - final Set unicastHostsFileLines = internalCluster().masterClient().admin().cluster().nodesInfo(new NodesInfoRequest()) - .get().getNodes().stream().map(n -> n.getTransport().getAddress().publishAddress().toString()).collect(Collectors.toSet()); + final List unicastHostsList = internalCluster().masterClient().admin().cluster().nodesInfo(new NodesInfoRequest()).get() + .getNodes().stream().map(n -> n.getTransport().getAddress().publishAddress().toString()).distinct() + .collect(Collectors.toList()); Path home = createTempDir(); Path conf = home.resolve("config"); Files.createDirectories(conf); - Files.write(conf.resolve(UNICAST_HOSTS_FILE), unicastHostsFileLines); Settings nodeSettings = Settings.builder() .put(nodeSettings(maxNumberOfNodes() - 1).filter(s -> "xpack.security.enabled".equals(s) == false)) .put("node.name", "my-test-node") @@ -299,7 +299,8 @@ public void testNodeJoinWithoutSecurityExplicitlyEnabled() throws Exception { .put("path.home", home) .put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false) .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "test-zen") - .putList(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "file") + .putList(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey()) + .putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey(), unicastHostsList) .build(); Collection> mockPlugins = Arrays.asList(LocalStateSecurity.class, TestZenDiscovery.TestPlugin.class, MockHttpTransport.TestPlugin.class); From 31c1d7de21428140290166209f29d7ef1a20382f Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 12 Sep 2018 14:12:09 +0200 Subject: [PATCH 09/10] Rebuild the discovery file as each transport service restarts --- .../test/InternalTestCluster.java | 89 +++++++------------ 1 file changed, 31 insertions(+), 58 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index ae5bf3205e4b1..68b8f616ce5bd 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -119,7 +119,6 @@ import java.util.Random; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -146,7 +145,6 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -223,6 +221,8 @@ public final class InternalTestCluster extends TestCluster { private ServiceDisruptionScheme activeDisruptionScheme; private Function clientWrapper; + private final Object discoveryFileMutex = new Object(); + public InternalTestCluster( final long clusterSeed, final Path baseDir, @@ -983,31 +983,6 @@ public void close() throws IOException { public static final String TRANSPORT_CLIENT_PREFIX = "transport_client_"; - private class TransportServiceStartupCountDown { - private final int initialCount; - private final CountDownLatch countDownLatch; - - TransportServiceStartupCountDown(int count) { - initialCount = count; - countDownLatch = new CountDownLatch(count); - } - - void countDown() { - logger.info("transport service started: {} of {} remaining", countDownLatch.getCount() - 1, initialCount); - countDownLatch.countDown(); - } - - void await() { - logger.info("waiting for all transport services to start: {} of {} remaining", countDownLatch.getCount(), initialCount); - try { - assertTrue("transport service startup timed out", countDownLatch.await(30L, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - throw new AssertionError(e); - } - logger.info("all {} transport services started", initialCount); - } - } - static class TransportClientFactory { private final boolean sniff; private final Settings settings; @@ -1094,13 +1069,13 @@ private synchronized void reset(boolean wipeData) throws IOException { final int numberOfMasterNodes = numSharedDedicatedMasterNodes > 0 ? numSharedDedicatedMasterNodes : numSharedDataNodes; final int defaultMinMasterNodes = (numberOfMasterNodes / 2) + 1; final List toStartAndPublish = new ArrayList<>(); // we want to start nodes in one go due to min master nodes - final TransportServiceStartupCountDown transportServiceStartupCountDown = new TransportServiceStartupCountDown(newSize); + final Runnable onTransportServiceStarted = () -> rebuildUnicastHostFiles(toStartAndPublish); for (int i = 0; i < numSharedDedicatedMasterNodes; i++) { final Settings.Builder settings = Settings.builder(); settings.put(Node.NODE_MASTER_SETTING.getKey(), true); settings.put(Node.NODE_DATA_SETTING.getKey(), false); NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes, - transportServiceStartupCountDown::countDown); + onTransportServiceStarted); toStartAndPublish.add(nodeAndClient); } for (int i = numSharedDedicatedMasterNodes; i < numSharedDedicatedMasterNodes + numSharedDataNodes; i++) { @@ -1111,7 +1086,7 @@ private synchronized void reset(boolean wipeData) throws IOException { settings.put(Node.NODE_DATA_SETTING.getKey(), true).build(); } NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes, - transportServiceStartupCountDown::countDown); + onTransportServiceStarted); toStartAndPublish.add(nodeAndClient); } for (int i = numSharedDedicatedMasterNodes + numSharedDataNodes; @@ -1119,11 +1094,11 @@ private synchronized void reset(boolean wipeData) throws IOException { final Builder settings = Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false) .put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_INGEST_SETTING.getKey(), false); NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes, - transportServiceStartupCountDown::countDown); + onTransportServiceStarted); toStartAndPublish.add(nodeAndClient); } - startAndPublishNodesAndClients(toStartAndPublish, transportServiceStartupCountDown::await); + startAndPublishNodesAndClients(toStartAndPublish); nextNodeId.set(newSize); assert size() == newSize; @@ -1458,7 +1433,7 @@ public synchronized void stopRandomNonMasterNode() throws IOException { } } - private synchronized void startAndPublishNodesAndClients(List nodeAndClients, Runnable awaitTransportServicesStarted) { + private synchronized void startAndPublishNodesAndClients(List nodeAndClients) { if (nodeAndClients.size() > 0) { final int newMasters = (int) nodeAndClients.stream().filter(NodeAndClient::isMasterEligible) .filter(nac -> nodes.containsKey(nac.name) == false) // filter out old masters @@ -1471,9 +1446,6 @@ private synchronized void startAndPublishNodesAndClients(List nod } List> futures = nodeAndClients.stream().map(node -> executor.submit(node::startNode)).collect(Collectors.toList()); - awaitTransportServicesStarted.run(); - rebuildUnicastHostFiles(nodeAndClients); - try { for (Future future : futures) { future.get(); @@ -1495,21 +1467,24 @@ private synchronized void startAndPublishNodesAndClients(List nod } private void rebuildUnicastHostFiles(Collection newNodes) { - try { - List discoveryFileContents = Stream.concat(nodes.values().stream(), newNodes.stream()) - .map(nac -> nac.node.injector().getInstance(TransportService.class)).filter(Objects::nonNull) - .map(TransportService::getLocalNode).filter(Objects::nonNull).filter(DiscoveryNode::isMasterNode) - .map(n -> n.getAddress().toString()) - .distinct().collect(Collectors.toList()); - Set configPaths = Stream.concat(nodes.values().stream(), newNodes.stream()) - .map(nac -> nac.node.getEnvironment().configFile()).collect(Collectors.toSet()); - logger.info("configuring discovery with {} at {}", discoveryFileContents, configPaths); - for (final Path configPath : configPaths) { - Files.createDirectories(configPath); - Files.write(configPath.resolve(UNICAST_HOSTS_FILE), discoveryFileContents); + // cannot be a synchronized method since it's called on other threads from within synchronized startAndPublishNodesAndClients() + synchronized (discoveryFileMutex) { + try { + List discoveryFileContents = Stream.concat(nodes.values().stream(), newNodes.stream()) + .map(nac -> nac.node.injector().getInstance(TransportService.class)).filter(Objects::nonNull) + .map(TransportService::getLocalNode).filter(Objects::nonNull).filter(DiscoveryNode::isMasterNode) + .map(n -> n.getAddress().toString()) + .distinct().collect(Collectors.toList()); + Set configPaths = Stream.concat(nodes.values().stream(), newNodes.stream()) + .map(nac -> nac.node.getEnvironment().configFile()).collect(Collectors.toSet()); + logger.debug("configuring discovery with {} at {}", discoveryFileContents, configPaths); + for (final Path configPath : configPaths) { + Files.createDirectories(configPath); + Files.write(configPath.resolve(UNICAST_HOSTS_FILE), discoveryFileContents); + } + } catch (IOException e) { + throw new AssertionError("failed to configure file-based discovery", e); } - } catch (IOException e) { - throw new AssertionError("failed to configure file-based discovery", e); } } @@ -1671,7 +1646,7 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception for (List sameRoleNodes : nodesByRoles.values()) { Collections.shuffle(sameRoleNodes, random); } - List startUpOrder = new ArrayList<>(); + final List startUpOrder = new ArrayList<>(); for (Set roles : rolesOrderedByOriginalStartupOrder) { if (roles == null) { // if some nodes were stopped, we want have a role for that ordinal @@ -1682,15 +1657,14 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception } assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == 0; - final TransportServiceStartupCountDown transportServiceStartupCountDown = new TransportServiceStartupCountDown(startUpOrder.size()); for (NodeAndClient nodeAndClient : startUpOrder) { logger.info("resetting node [{}] ", nodeAndClient.name); // we already cleared data folders, before starting nodes up nodeAndClient.recreateNodeOnRestart(callback, false, autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1, - transportServiceStartupCountDown::countDown); + () -> rebuildUnicastHostFiles(startUpOrder)); } - startAndPublishNodesAndClients(startUpOrder, transportServiceStartupCountDown::await); + startAndPublishNodesAndClients(startUpOrder); if (callback.validateClusterForming()) { validateClusterFormed(); @@ -1806,12 +1780,11 @@ public synchronized List startNodes(Settings... settings) { } else { defaultMinMasterNodes = -1; } - List nodes = new ArrayList<>(); - TransportServiceStartupCountDown transportServiceStartupCountDown = new TransportServiceStartupCountDown(settings.length); + final List nodes = new ArrayList<>(); for (Settings nodeSettings : settings) { - nodes.add(buildNode(nodeSettings, defaultMinMasterNodes, transportServiceStartupCountDown::countDown)); + nodes.add(buildNode(nodeSettings, defaultMinMasterNodes, () -> rebuildUnicastHostFiles(nodes))); } - startAndPublishNodesAndClients(nodes, transportServiceStartupCountDown::await); + startAndPublishNodesAndClients(nodes); if (autoManageMinMasterNodes) { validateClusterFormed(); } From 24a91aa85aebc723819c5e8292d5a0dbda6489ba Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 12 Sep 2018 14:45:14 +0200 Subject: [PATCH 10/10] Move field --- .../main/java/org/elasticsearch/test/InternalTestCluster.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 68b8f616ce5bd..76c7db23f6abd 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -221,8 +221,6 @@ public final class InternalTestCluster extends TestCluster { private ServiceDisruptionScheme activeDisruptionScheme; private Function clientWrapper; - private final Object discoveryFileMutex = new Object(); - public InternalTestCluster( final long clusterSeed, final Path baseDir, @@ -1466,6 +1464,8 @@ private synchronized void startAndPublishNodesAndClients(List nod } } + private final Object discoveryFileMutex = new Object(); + private void rebuildUnicastHostFiles(Collection newNodes) { // cannot be a synchronized method since it's called on other threads from within synchronized startAndPublishNodesAndClients() synchronized (discoveryFileMutex) {