diff --git a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index ea3ae0c919b3c..2328b5a861675 100644 --- a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -19,30 +19,29 @@ package org.elasticsearch.discovery; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.function.Function; -import java.util.function.Supplier; - import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.network.NetworkService; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.single.SingleNodeDiscovery; import org.elasticsearch.discovery.zen.UnicastHostsProvider; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.ZenPing; import org.elasticsearch.plugins.DiscoveryPlugin; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; +import java.util.function.Supplier; + /** * A module for loading classes for node discovery. */ @@ -83,6 +82,7 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic discoveryTypes.put("zen", () -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, clusterService, hostsProvider)); discoveryTypes.put("none", () -> new NoneDiscovery(settings, clusterService, clusterService.getClusterSettings())); + discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, clusterService)); for (DiscoveryPlugin plugin : plugins) { plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry, clusterService, hostsProvider).entrySet().forEach(entry -> { @@ -96,10 +96,12 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic if (discoverySupplier == null) { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); } + Loggers.getLogger(getClass(), settings).info("using discovery type [{}]", discoveryType); discovery = Objects.requireNonNull(discoverySupplier.get()); } public Discovery getDiscovery() { return discovery; } + } diff --git a/core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java new file mode 100644 index 0000000000000..f4735c8bf3a0d --- /dev/null +++ b/core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java @@ -0,0 +1,144 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.single; + +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.discovery.DiscoveryStats; +import org.elasticsearch.discovery.zen.PendingClusterStateStats; +import org.elasticsearch.discovery.zen.PendingClusterStatesQueue; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * A discovery implementation where the only member of the cluster is the local node. + */ +public class SingleNodeDiscovery extends AbstractLifecycleComponent implements Discovery { + + private final ClusterService clusterService; + private final DiscoverySettings discoverySettings; + + public SingleNodeDiscovery(final Settings settings, final ClusterService clusterService) { + super(Objects.requireNonNull(settings)); + this.clusterService = Objects.requireNonNull(clusterService); + final ClusterSettings clusterSettings = + Objects.requireNonNull(clusterService.getClusterSettings()); + this.discoverySettings = new DiscoverySettings(settings, clusterSettings); + } + + @Override + public DiscoveryNode localNode() { + return clusterService.localNode(); + } + + @Override + public String nodeDescription() { + return clusterService.getClusterName().value() + "/" + clusterService.localNode().getId(); + } + + @Override + public void setAllocationService(final AllocationService allocationService) { + + } + + @Override + public void publish(final ClusterChangedEvent event, final AckListener listener) { + + } + + @Override + public DiscoveryStats stats() { + return new DiscoveryStats((PendingClusterStateStats) null); + } + + @Override + public DiscoverySettings getDiscoverySettings() { + return discoverySettings; + } + + @Override + public void startInitialJoin() { + final ClusterStateTaskExecutor executor = + new ClusterStateTaskExecutor() { + + @Override + public ClusterTasksResult execute( + final ClusterState current, + final List tasks) throws Exception { + assert tasks.size() == 1; + final DiscoveryNodes.Builder nodes = + DiscoveryNodes.builder(current.nodes()); + // always set the local node as master, there will not be other nodes + nodes.masterNodeId(localNode().getId()); + final ClusterState next = + ClusterState.builder(current).nodes(nodes).build(); + final ClusterTasksResult.Builder result = + ClusterTasksResult.builder(); + return result.successes(tasks).build(next); + } + + @Override + public boolean runOnlyOnMaster() { + return false; + } + + }; + final ClusterStateTaskConfig config = ClusterStateTaskConfig.build(Priority.URGENT); + clusterService.submitStateUpdateTasks( + "single-node-start-initial-join", + Collections.singletonMap(localNode(), (s, e) -> {}), config, executor); + } + + @Override + public int getMinimumMasterNodes() { + return 1; + } + + @Override + protected void doStart() { + + } + + @Override + protected void doStop() { + + } + + @Override + protected void doClose() throws IOException { + + } + +} diff --git a/core/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java new file mode 100644 index 0000000000000..25641e16fca9c --- /dev/null +++ b/core/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java @@ -0,0 +1,178 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.single; + +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.zen.PingContextProvider; +import org.elasticsearch.discovery.zen.UnicastHostsProvider; +import org.elasticsearch.discovery.zen.UnicastZenPing; +import org.elasticsearch.discovery.zen.ZenPing; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.NodeConfigurationSource; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.transport.MockTcpTransportPlugin; +import org.elasticsearch.transport.TransportService; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; +import java.util.Stack; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.function.Function; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; + +@ESIntegTestCase.ClusterScope( + scope = ESIntegTestCase.Scope.TEST, + numDataNodes = 1, + numClientNodes = 0, + supportsDedicatedMasters = false, + autoMinMasterNodes = false) +public class SingleNodeDiscoveryIT extends ESIntegTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings + .builder() + .put(super.nodeSettings(nodeOrdinal)) + .put("discovery.type", "single-node") + // TODO: do not use such a restrictive ephemeral port range + .put("transport.tcp.port", "49152-49156") + .build(); + } + + public void testDoesNotRespondToZenPings() throws Exception { + final Settings settings = + Settings.builder().put("cluster.name", internalCluster().getClusterName()).build(); + final Version version = Version.CURRENT; + final Stack closeables = new Stack<>(); + final TestThreadPool threadPool = new TestThreadPool(getClass().getName()); + try { + final MockTransportService pingTransport = + MockTransportService.createNewService(settings, version, threadPool, null); + pingTransport.start(); + closeables.push(pingTransport); + final TransportService nodeTransport = + internalCluster().getInstance(TransportService.class); + // try to ping the single node directly + final UnicastHostsProvider provider = + () -> Collections.singletonList(nodeTransport.getLocalNode()); + final CountDownLatch latch = new CountDownLatch(1); + final UnicastZenPing unicastZenPing = + new UnicastZenPing(settings, threadPool, pingTransport, provider) { + @Override + protected void finishPingingRound(PingingRound pingingRound) { + latch.countDown(); + super.finishPingingRound(pingingRound); + } + }; + final DiscoveryNodes nodes = + DiscoveryNodes.builder().add(pingTransport.getLocalNode()).build(); + final ClusterName clusterName = new ClusterName(internalCluster().getClusterName()); + final ClusterState state = ClusterState.builder(clusterName).nodes(nodes).build(); + unicastZenPing.start(new PingContextProvider() { + @Override + public ClusterState clusterState() { + return state; + } + + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes + .builder() + .add(nodeTransport.getLocalNode()) + .add(pingTransport.getLocalNode()) + .localNodeId(pingTransport.getLocalNode().getId()) + .build(); + } + }); + closeables.push(unicastZenPing); + final CompletableFuture responses = new CompletableFuture<>(); + unicastZenPing.ping(responses::complete, TimeValue.timeValueSeconds(3)); + latch.await(); + responses.get(); + assertThat(responses.get().size(), equalTo(0)); + } finally { + while (!closeables.isEmpty()) { + IOUtils.closeWhileHandlingException(closeables.pop()); + } + terminate(threadPool); + } + } + + public void testSingleNodesDoNotDiscoverEachOther() throws IOException, InterruptedException { + final NodeConfigurationSource configurationSource = new NodeConfigurationSource() { + @Override + public Settings nodeSettings(int nodeOrdinal) { + return Settings + .builder() + .put("discovery.type", "single-node") + .put("http.enabled", false) + .put("transport.type", "mock-socket-network") + /* + * We align the port ranges of the two as then with zen discovery these two + * nodes would find each other. + */ + // TODO: do not use such a restrictive ephemeral port range + .put("transport.tcp.port", "49152-49156") + .build(); + } + }; + try (InternalTestCluster other = + new InternalTestCluster( + randomLong(), + createTempDir(), + false, + false, + 1, + 1, + internalCluster().getClusterName(), + configurationSource, + 0, + false, + "other", + Collections.singletonList(MockTcpTransportPlugin.class), + Function.identity())) { + other.beforeTest(random(), 0); + final ClusterState first = internalCluster().getInstance(ClusterService.class).state(); + final ClusterState second = other.getInstance(ClusterService.class).state(); + assertThat(first.nodes().getSize(), equalTo(1)); + assertThat(second.nodes().getSize(), equalTo(1)); + assertThat( + first.nodes().getMasterNodeId(), + not(equalTo(second.nodes().getMasterNodeId()))); + assertThat( + first.metaData().clusterUUID(), + not(equalTo(second.metaData().clusterUUID()))); + } + } + +} diff --git a/core/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryTests.java b/core/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryTests.java new file mode 100644 index 0000000000000..a0e0b699d78db --- /dev/null +++ b/core/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryTests.java @@ -0,0 +1,101 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.single; + +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.Closeable; +import java.util.Stack; +import java.util.concurrent.CountDownLatch; + +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.hamcrest.Matchers.equalTo; + +public class SingleNodeDiscoveryTests extends ESTestCase { + + public void testInitialJoin() throws Exception { + final Settings settings = Settings.EMPTY; + final Version version = Version.CURRENT; + final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + final Stack stack = new Stack<>(); + try { + final MockTransportService transportService = + MockTransportService.createNewService(settings, version, threadPool, null); + stack.push(transportService); + transportService.start(); + final DiscoveryNode node = transportService.getLocalNode(); + final ClusterService clusterService = createClusterService(threadPool, node); + stack.push(clusterService); + final SingleNodeDiscovery discovery = + new SingleNodeDiscovery(Settings.EMPTY, clusterService); + discovery.startInitialJoin(); + + // we are racing against the initial join which is asynchronous so we use an observer + final ClusterState state = clusterService.state(); + final ThreadContext threadContext = threadPool.getThreadContext(); + final ClusterStateObserver observer = + new ClusterStateObserver(state, clusterService, null, logger, threadContext); + if (state.nodes().getMasterNodeId() == null) { + final CountDownLatch latch = new CountDownLatch(1); + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + latch.countDown(); + } + + @Override + public void onClusterServiceClose() { + latch.countDown(); + } + + @Override + public void onTimeout(TimeValue timeout) { + assert false; + } + }, s -> s.nodes().getMasterNodeId() != null); + + latch.await(); + } + + final DiscoveryNodes nodes = clusterService.state().nodes(); + assertThat(nodes.getSize(), equalTo(1)); + assertThat(nodes.getMasterNode().getId(), equalTo(node.getId())); + } finally { + while (!stack.isEmpty()) { + IOUtils.closeWhileHandlingException(stack.pop()); + } + terminate(threadPool); + } + } + +} diff --git a/qa/smoke-test-client/build.gradle b/qa/smoke-test-client/build.gradle index 888d932524220..e4d197e7e6a6d 100644 --- a/qa/smoke-test-client/build.gradle +++ b/qa/smoke-test-client/build.gradle @@ -1,3 +1,5 @@ +import org.elasticsearch.gradle.test.RestIntegTestTask + /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with @@ -25,3 +27,16 @@ apply plugin: 'elasticsearch.rest-test' dependencies { testCompile project(path: ':client:transport', configuration: 'runtime') // randomly swapped in as a transport } + +task singleNodeIntegTest(type: RestIntegTestTask) { + mustRunAfter(precommit) +} + +singleNodeIntegTestCluster { + numNodes = 1 + setting 'discovery.type', 'single-node' +} + +integTest.dependsOn(singleNodeIntegTestRunner, 'singleNodeIntegTestCluster#stop') + +check.dependsOn(integTest) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index d20a4e95e385f..cf9c2dc351510 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -64,6 +64,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.env.Environment; @@ -590,7 +591,8 @@ private NodeAndClient buildNode(int nodeId, long seed, Settings settings, .put("node.name", name) .put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), seed); - if (autoManageMinMasterNodes) { + final boolean usingSingleNodeDiscovery = DiscoveryModule.DISCOVERY_TYPE_SETTING.get(finalSettings.build()).equals("single-node"); + if (!usingSingleNodeDiscovery && autoManageMinMasterNodes) { assert finalSettings.get(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) == null : "min master nodes may not be set when auto managed"; assert finalSettings.get(INITIAL_STATE_TIMEOUT_SETTING.getKey()) == null : @@ -600,7 +602,7 @@ private NodeAndClient buildNode(int nodeId, long seed, Settings settings, // don't wait too long not to slow down tests .put(ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING.getKey(), "5s") .put(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), defaultMinMasterNodes); - } else if (finalSettings.get(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) == null) { + } else if (!usingSingleNodeDiscovery && finalSettings.get(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) == null) { throw new IllegalArgumentException(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey() + " must be configured"); } MockNode node = new MockNode(finalSettings.build(), plugins); @@ -1083,7 +1085,7 @@ private void validateClusterFormed(String viaNode) { } return true; }, 30, TimeUnit.SECONDS) == false) { - throw new IllegalStateException("cluster failed to from with expected nodes " + expectedNodes + " and actual nodes " + + throw new IllegalStateException("cluster failed to form with expected nodes " + expectedNodes + " and actual nodes " + client.admin().cluster().prepareState().get().getState().nodes()); } } catch (InterruptedException e) {