diff --git a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index f34798605d784..91f4e6151594d 100644 --- a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -131,7 +131,7 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic if (discoverySupplier == null) { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); } - Loggers.getLogger(getClass(), settings).info("using discovery type [{}]", discoveryType); + Loggers.getLogger(getClass(), settings).info("using discovery type [{}] and host providers {}", discoveryType, hostsProviderNames); discovery = Objects.requireNonNull(discoverySupplier.get()); } diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index b17a0cc5418e9..1a0e964ef7740 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -53,7 +53,6 @@ import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.emptySet; @@ -113,6 +112,7 @@ public void blockActions(String... actions) { @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) // manual collection or upon cluster forming. .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2) .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.getKey(), "1s") @@ -121,8 +121,7 @@ protected Settings nodeSettings(int nodeOrdinal) { @Override protected Collection> nodePlugins() { - return Arrays.asList(TestPlugin.class, - MockTransportService.TestPlugin.class); + return Arrays.asList(TestPlugin.class, MockTransportService.TestPlugin.class); } public void testClusterInfoServiceCollectsInformation() throws Exception { @@ -172,7 +171,7 @@ public void testClusterInfoServiceCollectsInformation() throws Exception { } } - public void testClusterInfoServiceInformationClearOnError() throws InterruptedException, ExecutionException { + public void testClusterInfoServiceInformationClearOnError() { internalCluster().startNodes(2, // manually control publishing Settings.builder().put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m").build()); diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java b/server/src/test/java/org/elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java new file mode 100644 index 0000000000000..429950bf8530a --- /dev/null +++ b/server/src/test/java/org/elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java @@ -0,0 +1,81 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.zen; + +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESIntegTestCase; + +import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING; +import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; +import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.LIMIT_LOCAL_PORTS_COUNT; +import static org.elasticsearch.transport.TcpTransport.PORT; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) +public class SettingsBasedHostProviderIT extends ESIntegTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal)); + + // super.nodeSettings enables file-based discovery, but here we disable it again so we can test the static list: + if (randomBoolean()) { + builder.putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey()); + } else { + builder.remove(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey()); + } + + // super.nodeSettings sets this to an empty list, which disables any search for other nodes, but here we want this to happen: + builder.remove(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()); + + return builder.build(); + } + + public void testClusterFormsWithSingleSeedHostInSettings() { + final String seedNodeName = internalCluster().startNode(); + final NodesInfoResponse nodesInfoResponse + = client(seedNodeName).admin().cluster().nodesInfo(new NodesInfoRequest("_local")).actionGet(); + final String seedNodeAddress = nodesInfoResponse.getNodes().get(0).getTransport().getAddress().publishAddress().toString(); + logger.info("--> using seed node address {}", seedNodeAddress); + + int extraNodes = randomIntBetween(1, 5); + internalCluster().startNodes(extraNodes, + Settings.builder().putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey(), seedNodeAddress).build()); + + ensureStableCluster(extraNodes + 1); + } + + public void testClusterFormsByScanningPorts() { + // This test will fail if all 4 ports just less than the one used by the first node are already bound by something else. It's hard + // to know how often this might happen in reality, so let's try it and see. + + final String seedNodeName = internalCluster().startNode(); + final NodesInfoResponse nodesInfoResponse + = client(seedNodeName).admin().cluster().nodesInfo(new NodesInfoRequest("_local")).actionGet(); + final int seedNodePort = nodesInfoResponse.getNodes().get(0).getTransport().getAddress().publishAddress().getPort(); + final int minPort = randomIntBetween(seedNodePort - LIMIT_LOCAL_PORTS_COUNT + 1, seedNodePort - 1); + final String portSpec = minPort + "-" + seedNodePort; + + logger.info("--> using port specification [{}]", portSpec); + internalCluster().startNode(Settings.builder().put(PORT.getKey(), portSpec)); + ensureStableCluster(2); + } +} diff --git a/server/src/test/java/org/elasticsearch/search/SearchCancellationIT.java b/server/src/test/java/org/elasticsearch/search/SearchCancellationIT.java index bc1e106696315..746e45f515b6a 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchCancellationIT.java +++ b/server/src/test/java/org/elasticsearch/search/SearchCancellationIT.java @@ -69,7 +69,10 @@ protected Collection> nodePlugins() { protected Settings nodeSettings(int nodeOrdinal) { boolean lowLevelCancellation = randomBoolean(); logger.info("Using lowLevelCancellation: {}", lowLevelCancellation); - return Settings.builder().put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), lowLevelCancellation).build(); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), lowLevelCancellation) + .build(); } private void indexTestData() { diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index cc2ddaee0ae1b..61fb8a3151b90 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -206,6 +206,8 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.common.util.CollectionUtils.eagerPartition; +import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING; +import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.XContentTestUtils.convertToMap; import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder; @@ -1808,7 +1810,9 @@ protected Settings nodeSettings(int nodeOrdinal) { // wait short time for other active shards before actually deleting, default 30s not needed in tests .put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT.getKey(), new TimeValue(1, TimeUnit.SECONDS)) // randomly enable low-level search cancellation to make sure it does not alter results - .put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), randomBoolean()); + .put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), randomBoolean()) + .putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes + .putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "file"); if (rarely()) { // Sometimes adjust the minimum search thread pool size, causing // QueueResizingEsThreadPoolExecutor to be used instead of a regular @@ -1921,7 +1925,7 @@ protected NodeConfigurationSource getNodeConfigSource() { networkSettings.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType()); } - NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() { + return new NodeConfigurationSource() { @Override public Settings nodeSettings(int nodeOrdinal) { return Settings.builder() @@ -1956,7 +1960,6 @@ public Collection> transportClientPlugins() { return Collections.unmodifiableCollection(plugins); } }; - return nodeConfigurationSource; } /** @@ -2023,7 +2026,7 @@ protected Collection> getMockPlugins() { public static final class TestSeedPlugin extends Plugin { @Override public List> getSettings() { - return Arrays.asList(INDEX_TEST_SEED_SETTING); + return Collections.singletonList(INDEX_TEST_SEED_SETTING); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index 8defacf228e45..0e28c72f7a7e4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -62,6 +62,7 @@ import java.util.Collection; import java.util.Collections; +import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -190,6 +191,7 @@ private Node newNode() { .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b") .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b") .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1b") + .putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes .put(nodeSettings()) // allow test cases to provide their own settings or override these .build(); Collection> plugins = getPlugins(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 19d3e856960f7..a9e58af93a0a2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -47,6 +47,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.lease.Releasables; @@ -103,6 +104,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.InetSocketAddress; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; @@ -114,6 +116,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.TreeMap; @@ -128,10 +131,12 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.Collections.emptyList; import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY; import static org.apache.lucene.util.LuceneTestCase.rarely; import static org.elasticsearch.discovery.DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING; import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; +import static org.elasticsearch.discovery.zen.FileBasedUnicastHostsProvider.UNICAST_HOSTS_FILE; import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.elasticsearch.test.ESTestCase.awaitBusy; import static org.elasticsearch.test.ESTestCase.getTestTransportType; @@ -488,11 +493,13 @@ private void ensureOpen() { private synchronized NodeAndClient getOrBuildRandomNode() { ensureOpen(); - NodeAndClient randomNodeAndClient = getRandomNodeAndClient(); + final NodeAndClient randomNodeAndClient = getRandomNodeAndClient(); if (randomNodeAndClient != null) { return randomNodeAndClient; } - NodeAndClient buildNode = buildNode(1); + final int ord = nextNodeId.getAndIncrement(); + final Runnable onTransportServiceStarted = () -> {}; // do not create unicast host file for this one node. + final NodeAndClient buildNode = buildNode(ord, random.nextLong(), null, false, 1, onTransportServiceStarted); buildNode.startNode(); publishNode(buildNode); return buildNode; @@ -564,20 +571,11 @@ public synchronized void ensureAtMostNumDataNodes(int n) throws IOException { * * @param settings the settings to use * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed + * @param onTransportServiceStarted callback to run when transport service is started */ - private NodeAndClient buildNode(Settings settings, int defaultMinMasterNodes) { - int ord = nextNodeId.getAndIncrement(); - return buildNode(ord, random.nextLong(), settings, false, defaultMinMasterNodes); - } - - /** - * builds a new node with default settings - * - * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed - */ - private NodeAndClient buildNode(int defaultMinMasterNodes) { + private NodeAndClient buildNode(Settings settings, int defaultMinMasterNodes, Runnable onTransportServiceStarted) { int ord = nextNodeId.getAndIncrement(); - return buildNode(ord, random.nextLong(), null, false, defaultMinMasterNodes); + return buildNode(ord, random.nextLong(), settings, false, defaultMinMasterNodes, onTransportServiceStarted); } /** @@ -589,15 +587,17 @@ private NodeAndClient buildNode(int defaultMinMasterNodes) { * @param reuseExisting if a node with the same name is already part of {@link #nodes}, no new node will be built and * the method will return the existing one * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed + * @param onTransportServiceStarted callback to run when transport service is started */ private NodeAndClient buildNode(int nodeId, long seed, Settings settings, - boolean reuseExisting, int defaultMinMasterNodes) { + boolean reuseExisting, int defaultMinMasterNodes, Runnable onTransportServiceStarted) { assert Thread.holdsLock(this); ensureOpen(); settings = getSettings(nodeId, seed, settings); Collection> plugins = getPlugins(); String name = buildNodeName(nodeId, settings); if (reuseExisting && nodes.containsKey(name)) { + onTransportServiceStarted.run(); // reusing an existing node implies its transport service already started return nodes.get(name); } else { assert reuseExisting == true || nodes.containsKey(name) == false : @@ -633,6 +633,12 @@ private NodeAndClient buildNode(int nodeId, long seed, Settings settings, plugins, nodeConfigurationSource.nodeConfigPath(nodeId), forbidPrivateIndexSettings); + node.injector().getInstance(TransportService.class).addLifecycleListener(new LifecycleListener() { + @Override + public void afterStart() { + onTransportServiceStarted.run(); + } + }); try { IOUtils.close(secureSettings); } catch (IOException e) { @@ -909,14 +915,15 @@ void restart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterN if (!node.isClosed()) { closeNode(); } - recreateNodeOnRestart(callback, clearDataIfNeeded, minMasterNodes); + recreateNodeOnRestart(callback, clearDataIfNeeded, minMasterNodes, () -> rebuildUnicastHostFiles(emptyList())); startNode(); } /** * rebuilds a new node object using the current node settings and starts it */ - void recreateNodeOnRestart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes) throws Exception { + void recreateNodeOnRestart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes, + Runnable onTransportServiceStarted) throws Exception { assert callback != null; Settings callbackSettings = callback.onNodeStopped(name); Settings.Builder newSettings = Settings.builder(); @@ -930,7 +937,7 @@ void recreateNodeOnRestart(RestartCallback callback, boolean clearDataIfNeeded, if (clearDataIfNeeded) { clearDataIfNeeded(callback); } - createNewNode(newSettings.build()); + createNewNode(newSettings.build(), onTransportServiceStarted); // make sure cached client points to new node resetClient(); } @@ -946,7 +953,7 @@ private void clearDataIfNeeded(RestartCallback callback) throws IOException { } } - private void createNewNode(final Settings newSettings) { + private void createNewNode(final Settings newSettings, final Runnable onTransportServiceStarted) { final long newIdSeed = NodeEnvironment.NODE_ID_SEED_SETTING.get(node.settings()) + 1; // use a new seed to make sure we have new node id Settings finalSettings = Settings.builder().put(node.originalSettings()).put(newSettings).put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), newIdSeed).build(); if (DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(finalSettings) == false) { @@ -955,6 +962,12 @@ private void createNewNode(final Settings newSettings) { } Collection> plugins = node.getClasspathPlugins(); node = new MockNode(finalSettings, plugins); + node.injector().getInstance(TransportService.class).addLifecycleListener(new LifecycleListener() { + @Override + public void afterStart() { + onTransportServiceStarted.run(); + } + }); markNodeDataDirsAsNotEligableForWipe(node); } @@ -1057,11 +1070,13 @@ private synchronized void reset(boolean wipeData) throws IOException { final int numberOfMasterNodes = numSharedDedicatedMasterNodes > 0 ? numSharedDedicatedMasterNodes : numSharedDataNodes; final int defaultMinMasterNodes = (numberOfMasterNodes / 2) + 1; final List toStartAndPublish = new ArrayList<>(); // we want to start nodes in one go due to min master nodes + final Runnable onTransportServiceStarted = () -> rebuildUnicastHostFiles(toStartAndPublish); for (int i = 0; i < numSharedDedicatedMasterNodes; i++) { final Settings.Builder settings = Settings.builder(); settings.put(Node.NODE_MASTER_SETTING.getKey(), true); settings.put(Node.NODE_DATA_SETTING.getKey(), false); - NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes); + NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes, + onTransportServiceStarted); toStartAndPublish.add(nodeAndClient); } for (int i = numSharedDedicatedMasterNodes; i < numSharedDedicatedMasterNodes + numSharedDataNodes; i++) { @@ -1071,14 +1086,16 @@ private synchronized void reset(boolean wipeData) throws IOException { settings.put(Node.NODE_MASTER_SETTING.getKey(), false).build(); settings.put(Node.NODE_DATA_SETTING.getKey(), true).build(); } - NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes); + NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes, + onTransportServiceStarted); toStartAndPublish.add(nodeAndClient); } for (int i = numSharedDedicatedMasterNodes + numSharedDataNodes; i < numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes; i++) { final Builder settings = Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false) .put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_INGEST_SETTING.getKey(), false); - NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes); + NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes, + onTransportServiceStarted); toStartAndPublish.add(nodeAndClient); } @@ -1433,6 +1450,7 @@ private synchronized void startAndPublishNodesAndClients(List nod updateMinMasterNodes(currentMasters + newMasters); } List> futures = nodeAndClients.stream().map(node -> executor.submit(node::startNode)).collect(Collectors.toList()); + try { for (Future future : futures) { future.get(); @@ -1453,6 +1471,30 @@ private synchronized void startAndPublishNodesAndClients(List nod } } + private final Object discoveryFileMutex = new Object(); + + private void rebuildUnicastHostFiles(Collection newNodes) { + // cannot be a synchronized method since it's called on other threads from within synchronized startAndPublishNodesAndClients() + synchronized (discoveryFileMutex) { + try { + List discoveryFileContents = Stream.concat(nodes.values().stream(), newNodes.stream()) + .map(nac -> nac.node.injector().getInstance(TransportService.class)).filter(Objects::nonNull) + .map(TransportService::getLocalNode).filter(Objects::nonNull).filter(DiscoveryNode::isMasterNode) + .map(n -> n.getAddress().toString()) + .distinct().collect(Collectors.toList()); + Set configPaths = Stream.concat(nodes.values().stream(), newNodes.stream()) + .map(nac -> nac.node.getEnvironment().configFile()).collect(Collectors.toSet()); + logger.debug("configuring discovery with {} at {}", discoveryFileContents, configPaths); + for (final Path configPath : configPaths) { + Files.createDirectories(configPath); + Files.write(configPath.resolve(UNICAST_HOSTS_FILE), discoveryFileContents); + } + } catch (IOException e) { + throw new AssertionError("failed to configure file-based discovery", e); + } + } + } + private synchronized void stopNodesAndClient(NodeAndClient nodeAndClient) throws IOException { stopNodesAndClients(Collections.singleton(nodeAndClient)); } @@ -1611,7 +1653,7 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception for (List sameRoleNodes : nodesByRoles.values()) { Collections.shuffle(sameRoleNodes, random); } - List startUpOrder = new ArrayList<>(); + final List startUpOrder = new ArrayList<>(); for (Set roles : rolesOrderedByOriginalStartupOrder) { if (roles == null) { // if some nodes were stopped, we want have a role for that ordinal @@ -1622,11 +1664,11 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception } assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == 0; - // do two rounds to minimize pinging (mock zen pings pings with no delay and can create a lot of logs) for (NodeAndClient nodeAndClient : startUpOrder) { logger.info("resetting node [{}] ", nodeAndClient.name); // we already cleared data folders, before starting nodes up - nodeAndClient.recreateNodeOnRestart(callback, false, autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1); + nodeAndClient.recreateNodeOnRestart(callback, false, autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1, + () -> rebuildUnicastHostFiles(startUpOrder)); } startAndPublishNodesAndClients(startUpOrder); @@ -1745,9 +1787,9 @@ public synchronized List startNodes(Settings... settings) { } else { defaultMinMasterNodes = -1; } - List nodes = new ArrayList<>(); - for (Settings nodeSettings: settings) { - nodes.add(buildNode(nodeSettings, defaultMinMasterNodes)); + final List nodes = new ArrayList<>(); + for (Settings nodeSettings : settings) { + nodes.add(buildNode(nodeSettings, defaultMinMasterNodes, () -> rebuildUnicastHostFiles(nodes))); } startAndPublishNodesAndClients(nodes); if (autoManageMinMasterNodes) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java deleted file mode 100644 index dc9304637cdca..0000000000000 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.test.discovery; - -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.discovery.zen.UnicastHostsProvider; - -import java.io.Closeable; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -/** - * A {@link UnicastHostsProvider} implementation which returns results based on a static in-memory map. This allows running - * with nodes that only determine their transport address at runtime, which is the default behavior of - * {@link org.elasticsearch.test.InternalTestCluster} - */ -public final class MockUncasedHostProvider implements UnicastHostsProvider, Closeable { - - static final Map> activeNodesPerCluster = new HashMap<>(); - - - private final Supplier localNodeSupplier; - private final ClusterName clusterName; - - public MockUncasedHostProvider(Supplier localNodeSupplier, ClusterName clusterName) { - this.localNodeSupplier = localNodeSupplier; - this.clusterName = clusterName; - synchronized (activeNodesPerCluster) { - getActiveNodesForCurrentCluster().add(this); - } - } - - @Override - public List buildDynamicHosts(HostsResolver hostsResolver) { - final DiscoveryNode localNode = getNode(); - assert localNode != null; - synchronized (activeNodesPerCluster) { - Set activeNodes = getActiveNodesForCurrentCluster(); - return activeNodes.stream() - .map(MockUncasedHostProvider::getNode) - .filter(Objects::nonNull) - .filter(n -> localNode.equals(n) == false) - .map(DiscoveryNode::getAddress) - .collect(Collectors.toList()); - } - } - - @Nullable - private DiscoveryNode getNode() { - return localNodeSupplier.get(); - } - - private Set getActiveNodesForCurrentCluster() { - assert Thread.holdsLock(activeNodesPerCluster); - return activeNodesPerCluster.computeIfAbsent(clusterName, - clusterName -> ConcurrentCollections.newConcurrentSet()); - } - - @Override - public void close() { - synchronized (activeNodesPerCluster) { - boolean found = getActiveNodesForCurrentCluster().remove(this); - assert found; - } - } -} diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java index 5387a659aa274..2c8305b4e12bb 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java @@ -19,13 +19,10 @@ package org.elasticsearch.test.discovery; -import org.apache.lucene.util.SetOnce; -import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -39,7 +36,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -59,7 +55,6 @@ public class TestZenDiscovery extends ZenDiscovery { /** A plugin which installs mock discovery and configures it to be used. */ public static class TestPlugin extends Plugin implements DiscoveryPlugin { protected final Settings settings; - private final SetOnce unicastHostProvider = new SetOnce<>(); public TestPlugin(Settings settings) { this.settings = settings; } @@ -78,26 +73,6 @@ public Map> getDiscoveryTypes(ThreadPool threadPool, clusterApplier, clusterSettings, hostsProvider, allocationService)); } - @Override - public Map> getZenHostsProviders(TransportService transportService, - NetworkService networkService) { - final Supplier supplier; - if (USE_MOCK_PINGS.get(settings)) { - // we have to return something in order for the unicast host provider setting to resolve to something. It will never be used - supplier = () -> hostsResolver -> { - throw new UnsupportedOperationException(); - }; - } else { - supplier = () -> { - unicastHostProvider.set( - new MockUncasedHostProvider(transportService::getLocalNode, ClusterName.CLUSTER_NAME_SETTING.get(settings)) - ); - return unicastHostProvider.get(); - }; - } - return Collections.singletonMap("test-zen", supplier); - } - @Override public List> getSettings() { return Collections.singletonList(USE_MOCK_PINGS); @@ -107,18 +82,9 @@ public List> getSettings() { public Settings additionalSettings() { return Settings.builder() .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "test-zen") - .put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "test-zen") .putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()) .build(); } - - @Override - public void close() throws IOException { - super.close(); - if (unicastHostProvider.get() != null) { - unicastHostProvider.get().close(); - } - } } private TestZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java index 25e04ca702cc5..f62e39ab5607c 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java @@ -8,6 +8,7 @@ import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsIndices; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse; @@ -52,8 +53,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -274,6 +278,10 @@ public void testNodeJoinWithoutSecurityExplicitlyEnabled() throws Exception { enableLicensing(mode); ensureGreen(); + final List unicastHostsList = internalCluster().masterClient().admin().cluster().nodesInfo(new NodesInfoRequest()).get() + .getNodes().stream().map(n -> n.getTransport().getAddress().publishAddress().toString()).distinct() + .collect(Collectors.toList()); + Path home = createTempDir(); Path conf = home.resolve("config"); Files.createDirectories(conf); @@ -287,7 +295,8 @@ public void testNodeJoinWithoutSecurityExplicitlyEnabled() throws Exception { .put("path.home", home) .put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false) .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "test-zen") - .put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "test-zen") + .putList(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey()) + .putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey(), unicastHostsList) .build(); Collection> mockPlugins = Arrays.asList(LocalStateSecurity.class, TestZenDiscovery.TestPlugin.class); try (Node node = new MockNode(nodeSettings, mockPlugins)) {