Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
Expand Down Expand Up @@ -75,15 +78,28 @@ public class ClusterBootstrapService {
public ClusterBootstrapService(Settings settings, TransportService transportService,
Supplier<Iterable<DiscoveryNode>> discoveredNodesSupplier, BooleanSupplier isBootstrappedSupplier,
Consumer<VotingConfiguration> votingConfigurationConsumer) {

final List<String> initialMasterNodes = INITIAL_MASTER_NODES_SETTING.get(settings);
bootstrapRequirements = unmodifiableSet(new LinkedHashSet<>(initialMasterNodes));
if (bootstrapRequirements.size() != initialMasterNodes.size()) {
throw new IllegalArgumentException(
"setting [" + INITIAL_MASTER_NODES_SETTING.getKey() + "] contains duplicates: " + initialMasterNodes);
if (DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings))) {
if (INITIAL_MASTER_NODES_SETTING.exists(settings)) {
throw new IllegalArgumentException("setting [" + INITIAL_MASTER_NODES_SETTING.getKey() +
"] is not allowed when [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] is set to [" +
DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "]");
}
if (DiscoveryNode.isMasterNode(settings) == false) {
throw new IllegalArgumentException("node with [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] set to [" +
DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "] must be master-eligible");
}
bootstrapRequirements = Collections.singleton(Node.NODE_NAME_SETTING.get(settings));
unconfiguredBootstrapTimeout = null;
} else {
final List<String> initialMasterNodes = INITIAL_MASTER_NODES_SETTING.get(settings);
bootstrapRequirements = unmodifiableSet(new LinkedHashSet<>(initialMasterNodes));
if (bootstrapRequirements.size() != initialMasterNodes.size()) {
throw new IllegalArgumentException(
"setting [" + INITIAL_MASTER_NODES_SETTING.getKey() + "] contains duplicates: " + initialMasterNodes);
}
unconfiguredBootstrapTimeout = discoveryIsConfigured(settings) ? null : UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING.get(settings);
}

unconfiguredBootstrapTimeout = discoveryIsConfigured(settings) ? null : UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING.get(settings);
this.transportService = transportService;
this.discoveredNodesSupplier = discoveredNodesSupplier;
this.isBootstrappedSupplier = isBootstrappedSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.HandshakingTransportAddressConnector;
import org.elasticsearch.discovery.PeerFinder;
Expand All @@ -69,6 +71,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
Expand All @@ -94,6 +97,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
TimeValue.timeValueMillis(30000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);

private final Settings settings;
private final boolean singleNodeDiscovery;
private final TransportService transportService;
private final MasterService masterService;
private final AllocationService allocationService;
Expand Down Expand Up @@ -143,6 +147,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.masterService = masterService;
this.allocationService = allocationService;
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.singleNodeDiscovery = DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings));
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);
this.persistedStateSupplier = persistedStateSupplier;
Expand Down Expand Up @@ -424,6 +429,13 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
assert Thread.holdsLock(mutex) == false;
assert getLocalNode().isMasterNode() : getLocalNode() + " received a join but is not master-eligible";
logger.trace("handleJoinRequest: as {}, handling {}", mode, joinRequest);

if (singleNodeDiscovery && joinRequest.getSourceNode().equals(getLocalNode()) == false) {
joinCallback.onFailure(new IllegalStateException("cannot join node with [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() +
"] set to [" + DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "] discovery"));
return;
}

transportService.connectToNode(joinRequest.getSourceNode());

final ClusterState stateForJoinValidation = getStateForMasterService();
Expand Down Expand Up @@ -636,6 +648,14 @@ protected void doStart() {
coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState));
peerFinder.setCurrentTerm(getCurrentTerm());
configuredHostsResolver.start();
VotingConfiguration votingConfiguration = coordinationState.get().getLastAcceptedState().getLastCommittedConfiguration();
if (singleNodeDiscovery &&
votingConfiguration.isEmpty() == false &&
votingConfiguration.hasQuorum(Collections.singleton(getLocalNode().getId())) == false) {
throw new IllegalStateException("cannot start with [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] set to [" +
DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "] when local node " + getLocalNode() +
" does not have quorum in voting configuration " + votingConfiguration);
}
ClusterState initialState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
.blocks(ClusterBlocks.builder()
.addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
Expand Down Expand Up @@ -1049,7 +1069,8 @@ private class CoordinatorPeerFinder extends PeerFinder {

CoordinatorPeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector,
ConfiguredHostsResolver configuredHostsResolver) {
super(settings, transportService, transportAddressConnector, configuredHostsResolver);
super(settings, transportService, transportAddressConnector,
singleNodeDiscovery ? hostsResolver -> Collections.emptyList() : configuredHostsResolver);
}

@Override
Expand All @@ -1060,6 +1081,13 @@ protected void onActiveMasterFound(DiscoveryNode masterNode, long term) {
}
}

@Override
protected void startProbe(TransportAddress transportAddress) {
if (singleNodeDiscovery == false) {
super.startProbe(transportAddress);
}
}

@Override
protected void onFoundPeersUpdated() {
synchronized (mutex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.discovery.single.SingleNodeDiscovery;
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -50,7 +49,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.function.BiConsumer;
Expand All @@ -68,6 +66,8 @@ public class DiscoveryModule {

public static final String ZEN2_DISCOVERY_TYPE = "zen";

public static final String SINGLE_NODE_DISCOVERY_TYPE = "single-node";

public static final Setting<String> DISCOVERY_TYPE_SETTING =
new Setting<>("discovery.type", ZEN2_DISCOVERY_TYPE, Function.identity(), Property.NodeScope);
public static final Setting<List<String>> DISCOVERY_SEED_PROVIDERS_SETTING =
Expand Down Expand Up @@ -114,6 +114,8 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
List<SeedHostsProvider> filteredSeedProviders = seedProviderNames.stream()
.map(hostProviders::get).map(Supplier::get).collect(Collectors.toList());

String discoveryType = DISCOVERY_TYPE_SETTING.get(settings);

final SeedHostsProvider seedHostsProvider = hostsResolver -> {
final List<TransportAddress> addresses = new ArrayList<>();
for (SeedHostsProvider provider : filteredSeedProviders) {
Expand All @@ -122,20 +124,17 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
return Collections.unmodifiableList(addresses);
};

Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
discoveryTypes.put(ZEN2_DISCOVERY_TYPE, () -> new Coordinator(NODE_NAME_SETTING.get(settings), settings, clusterSettings,
transportService, namedWriteableRegistry, allocationService, masterService,
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider, clusterApplier,
joinValidators, new Random(Randomness.get().nextLong())));
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier,
gatewayMetaState));
String discoveryType = DISCOVERY_TYPE_SETTING.get(settings);
Supplier<Discovery> discoverySupplier = discoveryTypes.get(discoveryType);
if (discoverySupplier == null) {
if (ZEN2_DISCOVERY_TYPE.equals(discoveryType) || SINGLE_NODE_DISCOVERY_TYPE.equals(discoveryType)) {
discovery = new Coordinator(NODE_NAME_SETTING.get(settings),
settings, clusterSettings,
transportService, namedWriteableRegistry, allocationService, masterService,
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider,
clusterApplier, joinValidators, new Random(Randomness.get().nextLong()));
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
}

logger.info("using discovery type [{}] and seed hosts providers {}", discoveryType, seedProviderNames);
discovery = Objects.requireNonNull(discoverySupplier.get());
}

public Discovery getDiscovery() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public String toString() {
return peersRemoved;
}

private void startProbe(TransportAddress transportAddress) {
protected void startProbe(TransportAddress transportAddress) {
assert holdsLock() : "PeerFinder mutex not held";
if (active == false) {
logger.trace("startProbe({}) not running", transportAddress);
Expand Down

This file was deleted.

Loading