Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,29 @@

package org.elasticsearch.discovery;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.single.SingleNodeDiscovery;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* A module for loading classes for node discovery.
*/
Expand Down Expand Up @@ -83,6 +82,7 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
discoveryTypes.put("zen",
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, clusterService, hostsProvider));
discoveryTypes.put("none", () -> new NoneDiscovery(settings, clusterService, clusterService.getClusterSettings()));
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, clusterService));
for (DiscoveryPlugin plugin : plugins) {
plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry,
clusterService, hostsProvider).entrySet().forEach(entry -> {
Expand All @@ -96,10 +96,12 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
if (discoverySupplier == null) {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
}
Loggers.getLogger(getClass(), settings).info("using discovery type [{}]", discoveryType);
discovery = Objects.requireNonNull(discoverySupplier.get());
}

public Discovery getDiscovery() {
return discovery;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery.single;

import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.discovery.zen.PendingClusterStatesQueue;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
* A discovery implementation where the only member of the cluster is the local node.
*/
public class SingleNodeDiscovery extends AbstractLifecycleComponent implements Discovery {

private final ClusterService clusterService;
private final DiscoverySettings discoverySettings;

public SingleNodeDiscovery(final Settings settings, final ClusterService clusterService) {
super(Objects.requireNonNull(settings));
this.clusterService = Objects.requireNonNull(clusterService);
final ClusterSettings clusterSettings =
Objects.requireNonNull(clusterService.getClusterSettings());
this.discoverySettings = new DiscoverySettings(settings, clusterSettings);
}

@Override
public DiscoveryNode localNode() {
return clusterService.localNode();
}

@Override
public String nodeDescription() {
return clusterService.getClusterName().value() + "/" + clusterService.localNode().getId();
}

@Override
public void setAllocationService(final AllocationService allocationService) {

}

@Override
public void publish(final ClusterChangedEvent event, final AckListener listener) {

}

@Override
public DiscoveryStats stats() {
return new DiscoveryStats((PendingClusterStateStats) null);
}

@Override
public DiscoverySettings getDiscoverySettings() {
return discoverySettings;
}

@Override
public void startInitialJoin() {
final ClusterStateTaskExecutor<DiscoveryNode> executor =
new ClusterStateTaskExecutor<DiscoveryNode>() {

@Override
public ClusterTasksResult<DiscoveryNode> execute(
final ClusterState current,
final List<DiscoveryNode> tasks) throws Exception {
assert tasks.size() == 1;
final DiscoveryNodes.Builder nodes =
DiscoveryNodes.builder(current.nodes());
// always set the local node as master, there will not be other nodes
nodes.masterNodeId(localNode().getId());
final ClusterState next =
ClusterState.builder(current).nodes(nodes).build();
final ClusterTasksResult.Builder<DiscoveryNode> result =
ClusterTasksResult.builder();
return result.successes(tasks).build(next);
}

@Override
public boolean runOnlyOnMaster() {
return false;
}

};
final ClusterStateTaskConfig config = ClusterStateTaskConfig.build(Priority.URGENT);
clusterService.submitStateUpdateTasks(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity - why didn't you use a simple ClusterStateUpdateTask ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reasoning other than this is what my fingers did.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it wasn't my fingers. I see why I did this. A ClusterStateUpdateTask can not override runOnlyOnMaster which we need to set to false here. 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right. I forgot about that. ++

"single-node-start-initial-join",
Collections.singletonMap(localNode(), (s, e) -> {}), config, executor);
}

@Override
public int getMinimumMasterNodes() {
return 1;
}

@Override
protected void doStart() {

}

@Override
protected void doStop() {

}

@Override
protected void doClose() throws IOException {

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery.single;

import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.PingContextProvider;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.transport.TransportService;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Stack;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

@ESIntegTestCase.ClusterScope(
scope = ESIntegTestCase.Scope.TEST,
numDataNodes = 1,
numClientNodes = 0,
supportsDedicatedMasters = false,
autoMinMasterNodes = false)
public class SingleNodeDiscoveryIT extends ESIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings
.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("discovery.type", "single-node")
// TODO: do not use such a restrictive ephemeral port range
.put("transport.tcp.port", "49152-49156")
.build();
}

public void testDoesNotRespondToZenPings() throws Exception {
final Settings settings =
Settings.builder().put("cluster.name", internalCluster().getClusterName()).build();
final Version version = Version.CURRENT;
final Stack<Closeable> closeables = new Stack<>();
final TestThreadPool threadPool = new TestThreadPool(getClass().getName());
try {
final MockTransportService pingTransport =
MockTransportService.createNewService(settings, version, threadPool, null);
pingTransport.start();
closeables.push(pingTransport);
final TransportService nodeTransport =
internalCluster().getInstance(TransportService.class);
// try to ping the single node directly
final UnicastHostsProvider provider =
() -> Collections.singletonList(nodeTransport.getLocalNode());
final CountDownLatch latch = new CountDownLatch(1);
final UnicastZenPing unicastZenPing =
new UnicastZenPing(settings, threadPool, pingTransport, provider) {
@Override
protected void finishPingingRound(PingingRound pingingRound) {
latch.countDown();
super.finishPingingRound(pingingRound);
}
};
final DiscoveryNodes nodes =
DiscoveryNodes.builder().add(pingTransport.getLocalNode()).build();
final ClusterName clusterName = new ClusterName(internalCluster().getClusterName());
final ClusterState state = ClusterState.builder(clusterName).nodes(nodes).build();
unicastZenPing.start(new PingContextProvider() {
@Override
public ClusterState clusterState() {
return state;
}

@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes
.builder()
.add(nodeTransport.getLocalNode())
.add(pingTransport.getLocalNode())
.localNodeId(pingTransport.getLocalNode().getId())
.build();
}
});
closeables.push(unicastZenPing);
final CompletableFuture<ZenPing.PingCollection> responses = new CompletableFuture<>();
unicastZenPing.ping(responses::complete, TimeValue.timeValueSeconds(3));
latch.await();
responses.get();
assertThat(responses.get().size(), equalTo(0));
} finally {
while (!closeables.isEmpty()) {
IOUtils.closeWhileHandlingException(closeables.pop());
}
terminate(threadPool);
}
}

public void testSingleNodesDoNotDiscoverEachOther() throws IOException, InterruptedException {
final NodeConfigurationSource configurationSource = new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
return Settings
.builder()
.put("discovery.type", "single-node")
.put("http.enabled", false)
.put("transport.type", "mock-socket-network")
/*
* We align the port ranges of the two as then with zen discovery these two
* nodes would find each other.
*/
// TODO: do not use such a restrictive ephemeral port range
.put("transport.tcp.port", "49152-49156")
.build();
}
};
try (InternalTestCluster other =
new InternalTestCluster(
randomLong(),
createTempDir(),
false,
false,
1,
1,
internalCluster().getClusterName(),
configurationSource,
0,
false,
"other",
Collections.singletonList(MockTcpTransportPlugin.class),
Function.identity())) {
other.beforeTest(random(), 0);
final ClusterState first = internalCluster().getInstance(ClusterService.class).state();
final ClusterState second = other.getInstance(ClusterService.class).state();
assertThat(first.nodes().getSize(), equalTo(1));
assertThat(second.nodes().getSize(), equalTo(1));
assertThat(
first.nodes().getMasterNodeId(),
not(equalTo(second.nodes().getMasterNodeId())));
assertThat(
first.metaData().clusterUUID(),
not(equalTo(second.metaData().clusterUUID())));
}
}

}
Loading