diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_connection_mode_configuration.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_connection_mode_configuration.yml new file mode 100644 index 0000000000000..ed639b3655ed5 --- /dev/null +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_connection_mode_configuration.yml @@ -0,0 +1,212 @@ +--- +"Add transient remote cluster in simple mode with invalid sniff settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.node_connections: "5" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.node_connections\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" } + +--- +"Add transient remote cluster in sniff mode with invalid simple settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.simple.socket_connections: "20" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.socket_connections\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.addresses\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" } + +--- +"Add transient remote cluster using simple connection mode using valid settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.simple.socket_connections: "3" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.socket_connections: "3"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + +--- +"Add transient remote cluster using sniff connection mode using valid settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "sniff" + cluster.remote.test_remote_cluster.sniff.node_connections: "3" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.node_connections: "3"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + +--- +"Switch connection mode for configured cluster": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "sniff" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.seeds: null + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index c2aeeac3f49d9..f51dfb62e0593 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -102,6 +102,8 @@ import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.RemoteConnectionStrategy; +import org.elasticsearch.transport.SimpleConnectionStrategy; import org.elasticsearch.transport.SniffConnectionStrategy; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.watcher.ResourceWatcherService; @@ -281,12 +283,15 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE, - RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER, + SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER, RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING, RemoteClusterService.REMOTE_NODE_ATTRIBUTE, RemoteClusterService.ENABLE_REMOTE_CLUSTERS, RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, RemoteClusterService.REMOTE_CLUSTER_COMPRESS, + RemoteConnectionStrategy.REMOTE_CONNECTION_MODE, + SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, + SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD, SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, diff --git a/server/src/main/java/org/elasticsearch/common/settings/Setting.java b/server/src/main/java/org/elasticsearch/common/settings/Setting.java index 8848392aade82..2e6bfdc635cd2 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/server/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -53,6 +53,7 @@ import java.util.Objects; import java.util.Set; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.regex.Matcher; @@ -445,6 +446,7 @@ private T get(Settings settings, boolean validate) { } validator.validate(parsed); validator.validate(parsed, map); + validator.validate(parsed, map, exists(settings)); } return parsed; } catch (ElasticsearchParseException ex) { @@ -671,10 +673,11 @@ public String toString() { public static class AffixSetting extends Setting { private final AffixKey key; - private final Function> delegateFactory; + private final BiFunction> delegateFactory; private final Set dependencies; - public AffixSetting(AffixKey key, Setting delegate, Function> delegateFactory, AffixSetting... dependencies) { + public AffixSetting(AffixKey key, Setting delegate, BiFunction> delegateFactory, + AffixSetting... dependencies) { super(key, delegate.defaultValue, delegate.parser, delegate.properties.toArray(new Property[0])); this.key = key; this.delegateFactory = delegateFactory; @@ -689,6 +692,7 @@ private Stream matchStream(Settings settings) { return settings.keySet().stream().filter(this::match).map(key::getConcreteString); } + @Override public Set> getSettingsDependencies(String settingsKey) { if (dependencies.isEmpty()) { return Collections.emptySet(); @@ -713,7 +717,7 @@ public Map, T> getValue(Settings curren final Map, T> result = new IdentityHashMap<>(); Stream.concat(matchStream(current), matchStream(previous)).distinct().forEach(aKey -> { String namespace = key.getNamespace(aKey); - Setting concreteSetting = getConcreteSetting(aKey); + Setting concreteSetting = getConcreteSetting(namespace, aKey); AbstractScopedSettings.SettingUpdater updater = concreteSetting.newUpdater((v) -> consumer.accept(namespace, v), logger, (v) -> validator.accept(namespace, v)); @@ -751,7 +755,7 @@ public Map getValue(Settings current, Settings previous) { final Map result = new IdentityHashMap<>(); Stream.concat(matchStream(current), matchStream(previous)).distinct().forEach(aKey -> { String namespace = key.getNamespace(aKey); - Setting concreteSetting = getConcreteSetting(aKey); + Setting concreteSetting = getConcreteSetting(namespace, aKey); AbstractScopedSettings.SettingUpdater updater = concreteSetting.newUpdater((v) -> {}, logger, (v) -> validator.accept(namespace, v)); if (updater.hasChanged(current, previous)) { @@ -786,7 +790,16 @@ public String innerGetRaw(final Settings settings) { @Override public Setting getConcreteSetting(String key) { if (match(key)) { - return delegateFactory.apply(key); + String namespace = this.key.getNamespace(key); + return delegateFactory.apply(namespace, key); + } else { + throw new IllegalArgumentException("key [" + key + "] must match [" + getKey() + "] but didn't."); + } + } + + private Setting getConcreteSetting(String namespace, String key) { + if (match(key)) { + return delegateFactory.apply(namespace, key); } else { throw new IllegalArgumentException("key [" + key + "] must match [" + getKey() + "] but didn't."); } @@ -797,7 +810,7 @@ public Setting getConcreteSetting(String key) { */ public Setting getConcreteSettingForNamespace(String namespace) { String fullKey = key.toConcreteKey(namespace).toString(); - return getConcreteSetting(fullKey); + return getConcreteSetting(namespace, fullKey); } @Override @@ -834,8 +847,9 @@ public Set getNamespaces(Settings settings) { public Map getAsMap(Settings settings) { Map map = new HashMap<>(); matchStream(settings).distinct().forEach(key -> { - Setting concreteSetting = getConcreteSetting(key); - map.put(getNamespace(concreteSetting), concreteSetting.get(settings)); + String namespace = this.key.getNamespace(key); + Setting concreteSetting = getConcreteSetting(namespace, key); + map.put(namespace, concreteSetting.get(settings)); }); return Collections.unmodifiableMap(map); } @@ -843,9 +857,9 @@ public Map getAsMap(Settings settings) { /** * Represents a validator for a setting. The {@link #validate(Object)} method is invoked early in the update setting process with the - * value of this setting for a fail-fast validation. Later on, the {@link #validate(Object, Map)} method is invoked with the value of - * this setting and a map from the settings specified by {@link #settings()}} to their values. All these values come from the same - * {@link Settings} instance. + * value of this setting for a fail-fast validation. Later on, the {@link #validate(Object, Map)} and + * {@link #validate(Object, Map, boolean)} methods are invoked with the value of this setting and a map from the settings specified by + * {@link #settings()}} to their values. All these values come from the same {@link Settings} instance. * * @param the type of the {@link Setting} */ @@ -869,6 +883,18 @@ public interface Validator { default void validate(T value, Map, Object> settings) { } + /** + * Validate this setting against its dependencies, specified by {@link #settings()}. This method allows validation logic + * to evaluate whether the setting will be present in the {@link Settings} after the update. The default implementation + * does nothing, accepting any value as valid as long as it passes the validation in {@link #validate(Object)}. + * + * @param value the value of this setting + * @param settings a map from the settings specified by {@link #settings()}} to their values + * @param isPresent boolean indicating if this setting is present + */ + default void validate(T value, Map, Object> settings, boolean isPresent) { + } + /** * The settings on which the validity of this setting depends. The values of the specified settings are passed to * {@link #validate(Object, Map)}. By default this returns an empty iterator, indicating that this setting does not depend on any @@ -1066,6 +1092,12 @@ public static Setting intSetting(String key, int defaultValue, int minV properties); } + public static Setting intSetting(String key, int defaultValue, int minValue, Validator validator, + Property... properties) { + return new Setting<>(key, Integer.toString(defaultValue), (s) -> parseInt(s, minValue, key, isFiltered(properties)), validator, + properties); + } + public static Setting intSetting(String key, Setting fallbackSetting, int minValue, Property... properties) { return new Setting<>(key, fallbackSetting, (s) -> parseInt(s, minValue, key, isFiltered(properties)), properties); } @@ -1317,6 +1349,15 @@ public static Setting> listSetting( return listSetting(key, null, singleValueParser, defaultStringValue, properties); } + public static Setting> listSetting( + final String key, + final Function singleValueParser, + final Function> defaultStringValue, + final Validator> validator, + final Property... properties) { + return listSetting(key, null, singleValueParser, defaultStringValue, validator, properties); + } + public static Setting> listSetting( final String key, final @Nullable Setting> fallbackSetting, @@ -1326,7 +1367,7 @@ public static Setting> listSetting( return listSetting(key, fallbackSetting, singleValueParser, defaultStringValue, v -> {}, properties); } - static Setting> listSetting( + public static Setting> listSetting( final String key, final @Nullable Setting> fallbackSetting, final Function singleValueParser, @@ -1584,7 +1625,8 @@ public int hashCode() { * {@link #getConcreteSetting(String)} is used to pull the updater. */ public static AffixSetting prefixKeySetting(String prefix, Function> delegateFactory) { - return affixKeySetting(new AffixKey(prefix), delegateFactory); + BiFunction> delegateFactoryWithNamespace = (ns, k) -> delegateFactory.apply(k); + return affixKeySetting(new AffixKey(prefix), delegateFactoryWithNamespace); } /** @@ -1594,12 +1636,19 @@ public static AffixSetting prefixKeySetting(String prefix, Function AffixSetting affixKeySetting(String prefix, String suffix, Function> delegateFactory, AffixSetting... dependencies) { - return affixKeySetting(new AffixKey(prefix, suffix), delegateFactory, dependencies); + BiFunction> delegateFactoryWithNamespace = (ns, k) -> delegateFactory.apply(k); + return affixKeySetting(new AffixKey(prefix, suffix), delegateFactoryWithNamespace, dependencies); + } + + public static AffixSetting affixKeySetting(String prefix, String suffix, BiFunction> delegateFactory, + AffixSetting... dependencies) { + Setting delegate = delegateFactory.apply("_na_", "_na_"); + return new AffixSetting<>(new AffixKey(prefix, suffix), delegate, delegateFactory, dependencies); } - private static AffixSetting affixKeySetting(AffixKey key, Function> delegateFactory, + private static AffixSetting affixKeySetting(AffixKey key, BiFunction> delegateFactory, AffixSetting... dependencies) { - Setting delegate = delegateFactory.apply("_na_"); + Setting delegate = delegateFactory.apply("_na_", "_na_"); return new AffixSetting<>(key, delegate, delegateFactory, dependencies); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 2212525411101..be1ca9a1a2c43 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -102,9 +102,16 @@ void validateAndUpdateRemoteCluster(String clusterAlias, Settings settings) { * Registers this instance to listen to updates on the cluster settings. */ public void listenForUpdates(ClusterSettings clusterSettings) { - List> remoteClusterSettings = Arrays.asList(SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY, - SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD, RemoteClusterService.REMOTE_CLUSTER_COMPRESS, - RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS); + List> remoteClusterSettings = Arrays.asList( + RemoteClusterService.REMOTE_CLUSTER_COMPRESS, + RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, + RemoteConnectionStrategy.REMOTE_CONNECTION_MODE, + SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY, + SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, + SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD, + SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS, + SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, + SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS); clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 16f13e57e7223..8b89e8f8f8905 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -47,7 +47,7 @@ * in the remote cluster and connects to all eligible nodes, for details see {@link RemoteClusterService#REMOTE_NODE_ATTRIBUTE}. * * In the case of a disconnection, this class will issue a re-connect task to establish at most - * {@link RemoteClusterService#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of + * {@link SniffConnectionStrategy#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of * connections per cluster has been reached. */ final class RemoteClusterConnection implements Closeable { @@ -238,7 +238,7 @@ ConnectionManager getConnectionManager() { return remoteConnectionManager.getConnectionManager(); } - public boolean shouldRebuildConnection(Settings newSettings) { + boolean shouldRebuildConnection(Settings newSettings) { return connectionStrategy.shouldRebuildConnection(newSettings); } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 3a1ab055a05cf..2bfe3980ed8d3 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -64,17 +64,6 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl private static final ActionListener noopListener = ActionListener.wrap((x) -> {}, (x) -> {}); - /** - * The maximum number of connections that will be established to a remote cluster. For instance if there is only a single - * seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3. - */ - public static final Setting REMOTE_CONNECTIONS_PER_CLUSTER = - Setting.intSetting( - "cluster.remote.connections_per_cluster", - 3, - 1, - Setting.Property.NodeScope); - /** * The initial connect timeout for remote cluster connections */ @@ -237,22 +226,18 @@ private synchronized void updateSkipUnavailable(String clusterAlias, Boolean ski @Override protected void updateRemoteCluster(String clusterAlias, Settings settings) { - if (remoteClusters.containsKey(clusterAlias) == false) { - CountDownLatch latch = new CountDownLatch(1); - updateRemoteCluster(clusterAlias, settings, ActionListener.wrap(latch::countDown)); + CountDownLatch latch = new CountDownLatch(1); + updateRemoteCluster(clusterAlias, settings, ActionListener.wrap(latch::countDown)); - try { - // Wait 10 seconds for a new cluster. We must use a latch instead of a future because we - // are on the cluster state thread and our custom future implementation will throw an - // assertion. - if (latch.await(10, TimeUnit.SECONDS) == false) { - logger.warn("failed to connect to new remote cluster {} within {}", clusterAlias, TimeValue.timeValueSeconds(10)); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + try { + // Wait 10 seconds for a connections. We must use a latch instead of a future because we + // are on the cluster state thread and our custom future implementation will throw an + // assertion. + if (latch.await(10, TimeUnit.SECONDS) == false) { + logger.warn("failed to connect to new remote cluster {} within {}", clusterAlias, TimeValue.timeValueSeconds(10)); } - } else { - updateRemoteCluster(clusterAlias, settings, noopListener); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } @@ -280,13 +265,14 @@ synchronized void updateRemoteCluster(String clusterAlias, Settings newSettings, return; } - // this is a new cluster we have to add a new representation if (remote == null) { + // this is a new cluster we have to add a new representation Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build(); remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService); remoteClusters.put(clusterAlias, remote); + remote.ensureConnected(listener); } else if (remote.shouldRebuildConnection(newSettings)) { - // New ConnectionProfile. Must tear down existing connection + // Changes to connection configuration. Must tear down existing connection try { IOUtils.close(remote); } catch (IOException e) { @@ -296,9 +282,11 @@ synchronized void updateRemoteCluster(String clusterAlias, Settings newSettings, Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build(); remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService); remoteClusters.put(clusterAlias, remote); + remote.ensureConnected(listener); + } else { + // No changes to connection configuration. + listener.onResponse(null); } - - remote.ensureConnected(listener); } /** diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 78d831b878bd3..d8a459a79a56e 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -40,13 +40,16 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -58,11 +61,15 @@ enum ConnectionStrategy { SIMPLE(SimpleConnectionStrategy.CHANNELS_PER_CONNECTION, SimpleConnectionStrategy::enablementSettings); private final int numberOfChannels; - private final Supplier>> enabledSettings; + private final Supplier>> enablementSettings; - ConnectionStrategy(int numberOfChannels, Supplier>> enabledSettings) { + ConnectionStrategy(int numberOfChannels, Supplier>> enablementSettings) { this.numberOfChannels = numberOfChannels; - this.enabledSettings = enabledSettings; + this.enablementSettings = enablementSettings; + } + + public int getNumberOfChannels() { + return numberOfChannels; } } @@ -71,6 +78,7 @@ enum ConnectionStrategy { key, ConnectionStrategy.SNIFF.name(), value -> ConnectionStrategy.valueOf(value.toUpperCase(Locale.ROOT)), + Setting.Property.NodeScope, Setting.Property.Dynamic)); @@ -121,7 +129,7 @@ static RemoteConnectionStrategy buildStrategy(String clusterAlias, TransportServ static Set getRemoteClusters(Settings settings) { final Stream> enablementSettings = Arrays.stream(ConnectionStrategy.values()) - .flatMap(strategy -> strategy.enabledSettings.get()); + .flatMap(strategy -> strategy.enablementSettings.get()); return enablementSettings.flatMap(s -> getClusterAlias(settings, s)).collect(Collectors.toSet()); } @@ -319,4 +327,45 @@ private boolean connectionProfileChanged(ConnectionProfile oldProfile, Connectio return Objects.equals(oldProfile.getCompressionEnabled(), newProfile.getCompressionEnabled()) == false || Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false; } + + static class StrategyValidator implements Setting.Validator { + + private final String key; + private final ConnectionStrategy expectedStrategy; + private final String namespace; + private final Consumer valueChecker; + + StrategyValidator(String namespace, String key, ConnectionStrategy expectedStrategy) { + this(namespace, key, expectedStrategy, (v) -> {}); + } + + StrategyValidator(String namespace, String key, ConnectionStrategy expectedStrategy, Consumer valueChecker) { + this.namespace = namespace; + this.key = key; + this.expectedStrategy = expectedStrategy; + this.valueChecker = valueChecker; + } + + @Override + public void validate(T value) { + valueChecker.accept(value); + } + + @Override + public void validate(T value, Map, Object> settings, boolean isPresent) { + Setting concrete = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(namespace); + ConnectionStrategy modeType = (ConnectionStrategy) settings.get(concrete); + if (isPresent && modeType.equals(expectedStrategy) == false) { + throw new IllegalArgumentException("Setting \"" + key + "\" cannot be used with the configured \"" + concrete.getKey() + + "\" [required=" + expectedStrategy.name() + ", configured=" + modeType.name() + "]"); + } + } + + @Override + public Iterator> settings() { + Setting concrete = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(namespace); + Stream> settingStream = Stream.of(concrete); + return settingStream.iterator(); + } + } } diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java index 97e0d8a36a00c..839a1d19285b7 100644 --- a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -50,12 +50,13 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { */ public static final Setting.AffixSetting> REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting( "cluster.remote.", - "addresses", - key -> Setting.listSetting(key, Collections.emptyList(), s -> { + "simple.addresses", + (ns, key) -> Setting.listSetting(key, Collections.emptyList(), s -> { // validate address parsePort(s); return s; - }, Setting.Property.Dynamic, Setting.Property.NodeScope)); + }, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE), + Setting.Property.Dynamic, Setting.Property.NodeScope)); /** * The maximum number of socket connections that will be established to a remote cluster. The default is 18. @@ -63,7 +64,8 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { public static final Setting.AffixSetting REMOTE_SOCKET_CONNECTIONS = Setting.affixKeySetting( "cluster.remote.", "simple.socket_connections", - key -> intSetting(key, 18, 1, Setting.Property.Dynamic, Setting.Property.NodeScope)); + (ns, key) -> intSetting(key, 18, 1, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE), + Setting.Property.Dynamic, Setting.Property.NodeScope)); static final int CHANNELS_PER_CONNECTION = 1; @@ -78,7 +80,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { private final ConnectionManager.ConnectionValidator clusterNameValidator; SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, - Settings settings) { + Settings settings) { this( clusterAlias, transportService, diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 725c6a6e6fb16..ee56629ebf0aa 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -65,18 +65,19 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { * A list of initial seed nodes to discover eligible nodes from the remote cluster */ public static final Setting.AffixSetting> REMOTE_CLUSTER_SEEDS_OLD = Setting.affixKeySetting( - "cluster.remote.", - "seeds", - key -> Setting.listSetting( - key, - Collections.emptyList(), - s -> { - // validate seed address - parsePort(s); - return s; - }, - Setting.Property.Dynamic, - Setting.Property.NodeScope)); + "cluster.remote.", + "seeds", + (ns, key) -> Setting.listSetting( + key, + Collections.emptyList(), + s -> { + // validate seed address + parsePort(s); + return s; + }, + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF), + Setting.Property.Dynamic, + Setting.Property.NodeScope)); /** * A list of initial seed nodes to discover eligible nodes from the remote cluster @@ -84,14 +85,19 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { public static final Setting.AffixSetting> REMOTE_CLUSTER_SEEDS = Setting.affixKeySetting( "cluster.remote.", "sniff.seeds", - key -> Setting.listSetting(key, - "_na_".equals(key) ? REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(key) - : REMOTE_CLUSTER_SEEDS_OLD.getConcreteSetting(key.replaceAll("sniff\\.seeds$", "seeds")), + (ns, key) -> Setting.listSetting(key, + REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(ns), s -> { // validate seed address parsePort(s); return s; - }, Setting.Property.Dynamic, Setting.Property.NodeScope)); + }, + s -> REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(ns).get(s), + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF), + Setting.Property.Dynamic, + Setting.Property.NodeScope)); + + /** * A proxy address for the remote cluster. By default this is not set, meaning that Elasticsearch will connect directly to the nodes in * the remote cluster using their publish addresses. If this setting is set to an IP address or hostname then Elasticsearch will connect @@ -99,19 +105,29 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { * undocumented as it does not work well with all proxies. */ public static final Setting.AffixSetting REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting( - "cluster.remote.", - "proxy", - key -> Setting.simpleString( - key, - s -> { - if (Strings.hasLength(s)) { - parsePort(s); - } - }, - Setting.Property.Dynamic, - Setting.Property.NodeScope), + "cluster.remote.", + "proxy", + (ns, key) -> Setting.simpleString( + key, + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF, s -> { + if (Strings.hasLength(s)) { + parsePort(s); + } + }), + Setting.Property.Dynamic, + Setting.Property.NodeScope), REMOTE_CLUSTER_SEEDS); + /** + * The maximum number of connections that will be established to a remote cluster. For instance if there is only a single + * seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3. + */ + public static final Setting REMOTE_CONNECTIONS_PER_CLUSTER = + intSetting( + "cluster.remote.connections_per_cluster", + 3, + 1, + Setting.Property.NodeScope); /** * The maximum number of node connections that will be established to a remote cluster. For instance if there is only a single * seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3. @@ -119,8 +135,13 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { public static final Setting.AffixSetting REMOTE_NODE_CONNECTIONS = Setting.affixKeySetting( "cluster.remote.", "sniff.node_connections", - key -> intSetting(key, RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER, 1, - Setting.Property.Dynamic, Setting.Property.NodeScope)); + (ns, key) -> intSetting( + key, + REMOTE_CONNECTIONS_PER_CLUSTER, + 1, + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF), + Setting.Property.Dynamic, + Setting.Property.NodeScope)); static final int CHANNELS_PER_CONNECTION = 6; diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index b9038e8101ed6..d74a8daa98d61 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -330,7 +330,7 @@ public void testGetConnectionInfo() throws Exception { int maxNumConnections = randomIntBetween(1, 5); String clusterAlias = "test-cluster"; Settings settings = Settings.builder().put(buildSniffSettings(clusterAlias, seedNodes)) - .put(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER.getKey(), maxNumConnections).build(); + .put(SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER.getKey(), maxNumConnections).build(); try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { // test no nodes connected RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 60e7a848bbb18..0fdc797b8b377 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -76,15 +76,18 @@ private MockTransportService startTransport( } public void testSettingsAreRegistered() { - assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)); - assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_NODE_ATTRIBUTE)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_COMPRESS)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS)); } public void testRemoteClusterSeedSetting() { diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java index 80206eaf2b21d..af855314278f7 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java @@ -29,7 +29,7 @@ import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD; import static org.elasticsearch.transport.RemoteClusterService.ENABLE_REMOTE_CLUSTERS; import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE; -import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER; +import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER; import static org.elasticsearch.transport.RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING; import static org.elasticsearch.transport.RemoteClusterService.REMOTE_NODE_ATTRIBUTE; import static org.hamcrest.Matchers.emptyCollectionOf; diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index 3bae8b4c9559b..5ea54c7356b94 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -72,14 +72,14 @@ public void testChangeInConnectionProfileMeansTheStrategyMustBeRebuilt() { public void testCorrectChannelNumber() { String clusterAlias = "cluster-alias"; - String settingKey = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(); - Settings simpleSettings = Settings.builder().put(settingKey, "simple").build(); - ConnectionProfile simpleProfile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, simpleSettings); - assertEquals(1, simpleProfile.getNumConnections()); - - Settings sniffSettings = Settings.builder().put(settingKey, "sniff").build(); - ConnectionProfile sniffProfile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, sniffSettings); - assertEquals(6, sniffProfile.getNumConnections()); + + for (RemoteConnectionStrategy.ConnectionStrategy strategy : RemoteConnectionStrategy.ConnectionStrategy.values()) { + String settingKey = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(); + Settings simpleSettings = Settings.builder().put(settingKey, strategy.name()).build(); + ConnectionProfile simpleProfile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, simpleSettings); + assertEquals("Incorrect number of channels for " + strategy.name(), + strategy.getNumberOfChannels(), simpleProfile.getNumConnections()); + } } private static class FakeConnectionStrategy extends RemoteConnectionStrategy { diff --git a/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java index 95297bf33e931..35a6b7a6758ac 100644 --- a/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java @@ -22,6 +22,10 @@ import org.elasticsearch.Version; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.AbstractScopedSettings; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; @@ -32,7 +36,9 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -297,6 +303,36 @@ numOfConnections, addresses(address), Collections.singletonList(addressSupplier } } + public void testModeSettingsCannotBeUsedWhenInDifferentMode() { + List, String>> restrictedSettings = Arrays.asList( + new Tuple<>(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, "192.168.0.1:8080"), + new Tuple<>(SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, "3")); + + RemoteConnectionStrategy.ConnectionStrategy sniff = RemoteConnectionStrategy.ConnectionStrategy.SNIFF; + + String clusterName = "cluster_name"; + Settings settings = Settings.builder() + .put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterName).getKey(), sniff.name()) + .build(); + + Set> clusterSettings = new HashSet<>(); + clusterSettings.add(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE); + clusterSettings.addAll(restrictedSettings.stream().map(Tuple::v1).collect(Collectors.toList())); + AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, clusterSettings); + + // Should validate successfully + service.validate(settings, true); + + for (Tuple, String> restrictedSetting : restrictedSettings) { + Setting concreteSetting = restrictedSetting.v1().getConcreteSettingForNamespace(clusterName); + Settings invalid = Settings.builder().put(settings).put(concreteSetting.getKey(), restrictedSetting.v2()).build(); + IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> service.validate(invalid, true)); + String expected = "Setting \"" + concreteSetting.getKey() + "\" cannot be used with the configured " + + "\"cluster.remote.cluster_name.mode\" [required=SIMPLE, configured=SNIFF]"; + assertEquals(expected, iae.getMessage()); + } + } + private static List addresses(final TransportAddress... addresses) { return Arrays.stream(addresses).map(TransportAddress::toString).collect(Collectors.toList()); } diff --git a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java index 758b5dca101e5..721055a9c20f7 100644 --- a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java @@ -30,6 +30,9 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.AbstractScopedSettings; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -42,6 +45,7 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; @@ -637,6 +641,37 @@ public void testGetNodePredicatesCombination() { } } + public void testModeSettingsCannotBeUsedWhenInDifferentMode() { + List, String>> restrictedSettings = Arrays.asList( + new Tuple<>(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, "192.168.0.1:8080"), + new Tuple<>(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD, "192.168.0.1:8080"), + new Tuple<>(SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS, "2")); + + RemoteConnectionStrategy.ConnectionStrategy simple = RemoteConnectionStrategy.ConnectionStrategy.SIMPLE; + + String clusterName = "cluster_name"; + Settings settings = Settings.builder() + .put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterName).getKey(), simple.name()) + .build(); + + Set> clusterSettings = new HashSet<>(); + clusterSettings.add(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE); + clusterSettings.addAll(restrictedSettings.stream().map(Tuple::v1).collect(Collectors.toList())); + AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, clusterSettings); + + // Should validate successfully + service.validate(settings, true); + + for (Tuple, String> restrictedSetting : restrictedSettings) { + Setting concreteSetting = restrictedSetting.v1().getConcreteSettingForNamespace(clusterName); + Settings invalid = Settings.builder().put(settings).put(concreteSetting.getKey(), restrictedSetting.v2()).build(); + IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> service.validate(invalid, true)); + String expected = "Setting \"" + concreteSetting.getKey() + "\" cannot be used with the configured " + + "\"cluster.remote.cluster_name.mode\" [required=SNIFF, configured=SIMPLE]"; + assertEquals(expected, iae.getMessage()); + } + } + private static List seedNodes(final DiscoveryNode... seedNodes) { return Arrays.stream(seedNodes).map(s -> s.getAddress().toString()).collect(Collectors.toList()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/RealmSettings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/RealmSettings.java index 6ab511df90eed..fda2cf614c8bd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/RealmSettings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/RealmSettings.java @@ -74,7 +74,7 @@ public static Setting.AffixSetting secureString(String realmType, * The {@code Function} takes the realm-type as an argument. * @param suffix The suffix of the setting (everything following the realm name in the affix setting) * @param delegateFactory A factory to produce the concrete setting. - * See {@link Setting#affixKeySetting(Setting.AffixKey, Function, Setting.AffixSetting[])} + * See {@link Setting#affixKeySetting(String, String, Function, Setting.AffixSetting[])} */ public static Function> affixSetting(String suffix, Function> delegateFactory) { return realmType -> Setting.affixKeySetting(realmSettingPrefix(realmType), suffix, delegateFactory); diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_connection_mode_configuration.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_connection_mode_configuration.yml new file mode 100644 index 0000000000000..ed639b3655ed5 --- /dev/null +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_connection_mode_configuration.yml @@ -0,0 +1,212 @@ +--- +"Add transient remote cluster in simple mode with invalid sniff settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.node_connections: "5" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.node_connections\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" } + +--- +"Add transient remote cluster in sniff mode with invalid simple settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.simple.socket_connections: "20" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.socket_connections\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.addresses\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" } + +--- +"Add transient remote cluster using simple connection mode using valid settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.simple.socket_connections: "3" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.socket_connections: "3"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + +--- +"Add transient remote cluster using sniff connection mode using valid settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "sniff" + cluster.remote.test_remote_cluster.sniff.node_connections: "3" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.node_connections: "3"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + +--- +"Switch connection mode for configured cluster": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "sniff" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.seeds: null + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" }