From 27974f13be8a140e8404f4484d3aa837937ce945 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Thu, 6 Aug 2020 12:48:53 -0600 Subject: [PATCH 1/4] Add data tiers (hot, warm, cold, frozen) as custom node roles This commit adds the `data_hot`, `data_warm`, `data_cold`, and `data_frozen` node roles to the x-pack plugin. These roles are intended to be the base for the formalization of data tiers in Elasticsearch. These roles all act as data nodes (meaning shards can be allocated to them). Nodes with the existing `data` role acts as though they have all of the roles configured (it is a hot, warm, cold, and frozen node). This also includes a custom `AllocationDecider` that allows the user to configure the following settings on a cluster level: - `cluster.routing.allocation.require._tier` - `cluster.routing.allocation.include._tier` - `cluster.routing.allocation.exclude._tier` And in index settings: - `index.routing.allocation.require._tier` - `index.routing.allocation.include._tier` - `index.routing.allocation.exclude._tier` Relates to #60848 --- .../cluster/node/DiscoveryNode.java | 10 +- .../cluster/node/DiscoveryNodeRole.java | 11 + .../java/org/elasticsearch/node/Node.java | 8 +- .../test/InternalTestCluster.java | 4 +- .../allocation/DataTierAllocationDecider.java | 198 ++++++++++++ .../elasticsearch/xpack/core/DataTier.java | 140 ++++++++ .../elasticsearch/xpack/core/XPackPlugin.java | 29 +- .../DataTierAllocationDeciderTests.java | 303 ++++++++++++++++++ .../xpack/core/DataTierTests.java | 129 ++++++++ .../xpack/transform/Transform.java | 9 +- 10 files changed, 832 insertions(+), 9 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDecider.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/DataTier.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/DataTierTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java index 0e9f3341e9d29..f9aae518f81cd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.set.Sets; @@ -72,8 +73,13 @@ public static boolean isMasterNode(final Settings settings) { return hasRole(settings, DiscoveryNodeRole.MASTER_ROLE); } + /** + * Note, due to the way that plugins may not be available when settings are being initialized, + * do not use this from a static {@link Setting} default value function, as it may not contain + * all roles at that point. + */ public static boolean isDataNode(final Settings settings) { - return hasRole(settings, DiscoveryNodeRole.DATA_ROLE); + return getRolesFromSettings(settings).stream().anyMatch(DiscoveryNodeRole::canContainData); } public static boolean isIngestNode(final Settings settings) { @@ -328,7 +334,7 @@ public Map getAttributes() { * Should this node hold data (shards) or not. */ public boolean isDataNode() { - return roles.contains(DiscoveryNodeRole.DATA_ROLE); + return roles.stream().anyMatch(DiscoveryNodeRole::canContainData); } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeRole.java b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeRole.java index ce4f0b2c76769..10f92e39c2a84 100644 --- a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeRole.java +++ b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeRole.java @@ -74,6 +74,13 @@ private DiscoveryNodeRole(final boolean isKnownRole, final String roleName, fina public abstract Setting legacySetting(); + /** + * Indicates whether a node with the given role can contain data. Defaults to false and can be overridden + */ + public boolean canContainData() { + return false; + } + @Override public final boolean equals(Object o) { if (this == o) return true; @@ -114,6 +121,10 @@ public Setting legacySetting() { return Setting.boolSetting("node.data", true, Property.Deprecated, Property.NodeScope); } + @Override + public boolean canContainData() { + return true; + } }; /** diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 8be8504698998..49875acc4b09e 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -185,6 +185,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -332,8 +333,11 @@ protected Node(final Environment initialEnvironment, this.environment = new Environment(settings, initialEnvironment.configFile()); Environment.assertEquivalent(initialEnvironment, this.environment); nodeEnvironment = new NodeEnvironment(tmpSettings, environment); - logger.info("node name [{}], node ID [{}], cluster name [{}]", - NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId(), ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value()); + logger.info("node name [{}], node ID [{}], cluster name [{}], roles {}", + NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId(), ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value(), + DiscoveryNode.getRolesFromSettings(settings).stream() + .map(DiscoveryNodeRole::roleName) + .collect(Collectors.toCollection(LinkedHashSet::new))); resourcesToClose.add(nodeEnvironment); localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId()); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 3ec52bef0a1d4..ea69bfb99e989 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -720,11 +720,11 @@ private static String getRoleSuffix(Settings settings) { if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.MASTER_ROLE)) { suffix = suffix + DiscoveryNodeRole.MASTER_ROLE.roleNameAbbreviation(); } - if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE)) { + if (DiscoveryNode.isDataNode(settings)) { suffix = suffix + DiscoveryNodeRole.DATA_ROLE.roleNameAbbreviation(); } if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.MASTER_ROLE) == false - && DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE) == false) { + && DiscoveryNode.isDataNode(settings) == false) { suffix = suffix + "c"; } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDecider.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDecider.java new file mode 100644 index 0000000000000..f1686cae8787c --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDecider.java @@ -0,0 +1,198 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.cluster.routing.allocation; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.xpack.core.DataTier; + +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * The {@code DataTierAllocationDecider} is a custom allocation decider that behaves similar to the + * {@link org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider}, however it + * is specific to the {@code _tier} setting for both the cluster and index level. + */ +public class DataTierAllocationDecider extends AllocationDecider { + + public static final String NAME = "data_tier"; + + public static final String CLUSTER_ROUTING_REQUIRE = "cluster.routing.allocation.require._tier"; + public static final String CLUSTER_ROUTING_INCLUDE = "cluster.routing.allocation.include._tier"; + public static final String CLUSTER_ROUTING_EXCLUDE = "cluster.routing.allocation.exclude._tier"; + public static final String INDEX_ROUTING_REQUIRE = "index.routing.allocation.require._tier"; + public static final String INDEX_ROUTING_INCLUDE = "index.routing.allocation.include._tier"; + public static final String INDEX_ROUTING_EXCLUDE = "index.routing.allocation.exclude._tier"; + + public static final Setting CLUSTER_ROUTING_REQUIRE_SETTING = Setting.simpleString(CLUSTER_ROUTING_REQUIRE, + DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.NodeScope); + public static final Setting CLUSTER_ROUTING_INCLUDE_SETTING = Setting.simpleString(CLUSTER_ROUTING_INCLUDE, + DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.NodeScope); + public static final Setting CLUSTER_ROUTING_EXCLUDE_SETTING = Setting.simpleString(CLUSTER_ROUTING_EXCLUDE, + DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.NodeScope); + public static final Setting INDEX_ROUTING_REQUIRE_SETTING = Setting.simpleString(INDEX_ROUTING_REQUIRE, + DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.IndexScope); + public static final Setting INDEX_ROUTING_INCLUDE_SETTING = Setting.simpleString(INDEX_ROUTING_INCLUDE, + DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.IndexScope); + public static final Setting INDEX_ROUTING_EXCLUDE_SETTING = Setting.simpleString(INDEX_ROUTING_EXCLUDE, + DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.IndexScope); + + private static void validateTierSetting(String setting) { + if (Strings.hasText(setting)) { + Set invalidTiers = Arrays.stream(setting.split(",")) + .filter(tier -> DataTier.validTierName(tier) == false) + .collect(Collectors.toSet()); + if (invalidTiers.size() > 0) { + throw new IllegalArgumentException("invalid tier names: " + invalidTiers); + } + } + } + + private volatile String clusterRequire = null; + private volatile String clusterInclude = null; + private volatile String clusterExclude = null; + + public DataTierAllocationDecider(ClusterSettings clusterSettings) { + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_REQUIRE_SETTING, s -> this.clusterRequire = s); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_INCLUDE_SETTING, s -> this.clusterInclude = s); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_SETTING, s -> this.clusterExclude = s); + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return shouldFilter(shardRouting, node.node(), allocation); + } + + @Override + public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) { + return shouldFilter(indexMetadata, node.node(), allocation); + } + + @Override + public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return shouldFilter(shardRouting, node.node(), allocation); + } + + @Override + public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) { + Decision decision = shouldClusterFilter(node, allocation); + if (decision != null) return decision; + + decision = shouldIndexFilter(indexMetadata, node, allocation); + if (decision != null) return decision; + + return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters"); + } + + private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) { + Decision decision = shouldClusterFilter(node, allocation); + if (decision != null) return decision; + + decision = shouldIndexFilter(allocation.metadata().getIndexSafe(shardRouting.index()), node, allocation); + if (decision != null) return decision; + + return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters"); + } + + private Decision shouldFilter(IndexMetadata indexMd, DiscoveryNode node, RoutingAllocation allocation) { + Decision decision = shouldClusterFilter(node, allocation); + if (decision != null) return decision; + + decision = shouldIndexFilter(indexMd, node, allocation); + if (decision != null) return decision; + + return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters"); + } + + private Decision shouldIndexFilter(IndexMetadata indexMd, DiscoveryNode node, RoutingAllocation allocation) { + Settings indexSettings = indexMd.getSettings(); + String indexRequire = INDEX_ROUTING_REQUIRE_SETTING.get(indexSettings); + String indexInclude = INDEX_ROUTING_INCLUDE_SETTING.get(indexSettings); + String indexExclude = INDEX_ROUTING_EXCLUDE_SETTING.get(indexSettings); + + if (Strings.hasText(indexRequire)) { + if (allocationAllowed(OpType.AND, indexRequire, node) == false) { + return allocation.decision(Decision.NO, NAME, "node does not match all index setting [%s] tier filters [%s]", + INDEX_ROUTING_REQUIRE, indexRequire); + } + } + if (Strings.hasText(indexInclude)) { + if (allocationAllowed(OpType.OR, indexInclude, node) == false) { + return allocation.decision(Decision.NO, NAME, "node does not match any index setting [%s] tier filters [%s]", + INDEX_ROUTING_INCLUDE, indexInclude); + } + } + if (Strings.hasText(indexExclude)) { + if (allocationAllowed(OpType.OR, indexExclude, node)) { + return allocation.decision(Decision.NO, NAME, "node matches any index setting [%s] tier filters [%s]", + INDEX_ROUTING_EXCLUDE, indexExclude); + } + } + return null; + } + + private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation allocation) { + if (Strings.hasText(clusterRequire)) { + if (allocationAllowed(OpType.AND, clusterRequire, node) == false) { + return allocation.decision(Decision.NO, NAME, "node does not match all cluster setting [%s] tier filters [%s]", + CLUSTER_ROUTING_REQUIRE, clusterRequire); + } + } + if (Strings.hasText(clusterInclude)) { + if (allocationAllowed(OpType.OR, clusterInclude, node) == false) { + return allocation.decision(Decision.NO, NAME, "node does not match any cluster setting [%s] tier filters [%s]", + CLUSTER_ROUTING_INCLUDE, clusterInclude); + } + } + if (Strings.hasText(clusterExclude)) { + if (allocationAllowed(OpType.OR, clusterExclude, node)) { + return allocation.decision(Decision.NO, NAME, "node matches any cluster setting [%s] tier filters [%s]", + CLUSTER_ROUTING_EXCLUDE, clusterExclude); + } + } + return null; + } + + private enum OpType { + AND, + OR + } + + private static boolean allocationAllowed(OpType opType, String tierSetting, DiscoveryNode node) { + String[] values = Strings.tokenizeToStringArray(tierSetting, ","); + for (String value : values) { + // generic "data" roles are considered to have all tiers + if (node.getRoles().contains(DiscoveryNodeRole.DATA_ROLE) || + node.getRoles().stream().map(DiscoveryNodeRole::roleName).collect(Collectors.toSet()).contains(value)) { + if (opType == OpType.OR) { + return true; + } + } else { + if (opType == OpType.AND) { + return false; + } + } + } + if (opType == OpType.OR) { + return false; + } else { + return true; + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/DataTier.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/DataTier.java new file mode 100644 index 0000000000000..08e093aa0ff51 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/DataTier.java @@ -0,0 +1,140 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; + +/** + * The {@code DataTier} class encapsulates the formalization of the "hot", + * "warm", "cold", and "frozen" tiers as node roles. In contains the roles + * themselves as well as helpers for validation and determining if a node has + * a tier configured. + * + * Related: + * {@link org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider} + */ +public class DataTier { + + public static final String DATA_HOT = "data_hot"; + public static final String DATA_WARM = "data_warm"; + public static final String DATA_COLD = "data_cold"; + public static final String DATA_FROZEN = "data_frozen"; + + /** + * Returns true if the given tier name is a valid tier + */ + public static boolean validTierName(String tierName) { + return DATA_HOT.equals(tierName) || + DATA_WARM.equals(tierName) || + DATA_COLD.equals(tierName) || + DATA_FROZEN.equals(tierName); + } + + /** + * Returns true iff the given settings have a data tier setting configured + */ + public static boolean isExplicitDataTier(Settings settings) { + /* + * This method can be called before the o.e.n.NodeRoleSettings.NODE_ROLES_SETTING is + * initialized. We do not want to trigger initialization prematurely because that will bake + * the default roles before plugins have had a chance to register them. Therefore, + * to avoid initializing this setting prematurely, we avoid using the actual node roles + * setting instance here in favor of the string. + */ + if (settings.hasValue("node.roles")) { + return settings.getAsList("node.roles").stream().anyMatch(DataTier::validTierName); + } + return false; + } + + public static DiscoveryNodeRole DATA_HOT_NODE_ROLE = new DiscoveryNodeRole("data_hot", "h") { + @Override + public boolean isEnabledByDefault(final Settings settings) { + return false; + } + + @Override + public Setting legacySetting() { + return null; + } + + @Override + public boolean canContainData() { + return true; + } + }; + + public static DiscoveryNodeRole DATA_WARM_NODE_ROLE = new DiscoveryNodeRole("data_warm", "w") { + @Override + public boolean isEnabledByDefault(final Settings settings) { + return false; + } + + @Override + public Setting legacySetting() { + return null; + } + + @Override + public boolean canContainData() { + return true; + } + }; + + public static DiscoveryNodeRole DATA_COLD_NODE_ROLE = new DiscoveryNodeRole("data_cold", "c") { + @Override + public boolean isEnabledByDefault(final Settings settings) { + return false; + } + + @Override + public Setting legacySetting() { + return null; + } + + @Override + public boolean canContainData() { + return true; + } + }; + + public static DiscoveryNodeRole DATA_FROZEN_NODE_ROLE = new DiscoveryNodeRole("data_frozen", "f") { + @Override + public boolean isEnabledByDefault(final Settings settings) { + return false; + } + + @Override + public Setting legacySetting() { + return null; + } + + @Override + public boolean canContainData() { + return true; + } + }; + + public static boolean isHotNode(DiscoveryNode discoveryNode) { + return discoveryNode.getRoles().contains(DATA_HOT_NODE_ROLE) || discoveryNode.getRoles().contains(DiscoveryNodeRole.DATA_ROLE); + } + + public static boolean isWarmNode(DiscoveryNode discoveryNode) { + return discoveryNode.getRoles().contains(DATA_WARM_NODE_ROLE) || discoveryNode.getRoles().contains(DiscoveryNodeRole.DATA_ROLE); + } + + public static boolean isColdNode(DiscoveryNode discoveryNode) { + return discoveryNode.getRoles().contains(DATA_COLD_NODE_ROLE) || discoveryNode.getRoles().contains(DiscoveryNodeRole.DATA_ROLE); + } + + public static boolean isFrozenNode(DiscoveryNode discoveryNode) { + return discoveryNode.getRoles().contains(DATA_FROZEN_NODE_ROLE) || discoveryNode.getRoles().contains(DiscoveryNodeRole.DATA_ROLE); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java index 67d33803d0c80..050a20a49e7e4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java @@ -17,7 +17,9 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.inject.Binder; @@ -39,6 +41,7 @@ import org.elasticsearch.license.LicensesMetadata; import org.elasticsearch.license.Licensing; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.ExtensiblePlugin; import org.elasticsearch.plugins.RepositoryPlugin; @@ -53,6 +56,7 @@ import org.elasticsearch.snapshots.SourceOnlySnapshotRepository; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider; import org.elasticsearch.xpack.core.action.ReloadAnalyzerAction; import org.elasticsearch.xpack.core.action.TransportReloadAnalyzersAction; import org.elasticsearch.xpack.core.action.TransportXPackInfoAction; @@ -78,17 +82,20 @@ import java.security.PrivilegedAction; import java.time.Clock; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -public class XPackPlugin extends XPackClientPlugin implements ExtensiblePlugin, RepositoryPlugin, EnginePlugin { +public class XPackPlugin extends XPackClientPlugin implements ExtensiblePlugin, RepositoryPlugin, EnginePlugin, ClusterPlugin { private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(XPackPlugin.class); public static final String ASYNC_RESULTS_INDEX = ".async-search"; @@ -349,9 +356,29 @@ public Optional getEngineFactory(IndexSettings indexSettings) { public List> getSettings() { List> settings = super.getSettings(); settings.add(SourceOnlySnapshotRepository.SOURCE_ONLY); + settings.add(DataTierAllocationDecider.CLUSTER_ROUTING_REQUIRE_SETTING); + settings.add(DataTierAllocationDecider.CLUSTER_ROUTING_INCLUDE_SETTING); + settings.add(DataTierAllocationDecider.CLUSTER_ROUTING_EXCLUDE_SETTING); + settings.add(DataTierAllocationDecider.INDEX_ROUTING_REQUIRE_SETTING); + settings.add(DataTierAllocationDecider.INDEX_ROUTING_INCLUDE_SETTING); + settings.add(DataTierAllocationDecider.INDEX_ROUTING_EXCLUDE_SETTING); return settings; } + @Override + public Set getRoles() { + return new HashSet<>(Arrays.asList( + DataTier.DATA_HOT_NODE_ROLE, + DataTier.DATA_WARM_NODE_ROLE, + DataTier.DATA_COLD_NODE_ROLE, + DataTier.DATA_FROZEN_NODE_ROLE)); + } + + @Override + public Collection createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { + return Collections.singleton(new DataTierAllocationDecider(clusterSettings)); + } + /** * Handles the creation of the SSLService along with the necessary actions to enable reloading * of SSLContexts when configuration files change on disk. diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java new file mode 100644 index 0000000000000..c67da35158875 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java @@ -0,0 +1,303 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.cluster.routing.allocation; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.gateway.TestGatewayAllocator; +import org.elasticsearch.xpack.core.DataTier; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class DataTierAllocationDeciderTests extends ESAllocationTestCase { + + private static final Set> ALL_SETTINGS; + private static final DiscoveryNode HOT_NODE = newNode("node-hot", Collections.singleton(DataTier.DATA_HOT_NODE_ROLE)); + private static final DiscoveryNode WARM_NODE = newNode("node-warm", Collections.singleton(DataTier.DATA_WARM_NODE_ROLE)); + private static final DiscoveryNode COLD_NODE = newNode("node-cold", Collections.singleton(DataTier.DATA_COLD_NODE_ROLE)); + private static final DiscoveryNode FROZEN_NODE = newNode("node-frozen", Collections.singleton(DataTier.DATA_FROZEN_NODE_ROLE)); + private static final DiscoveryNode DATA_NODE = newNode("node-data", Collections.singleton(DiscoveryNodeRole.DATA_ROLE)); + + private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ALL_SETTINGS); + private final DataTierAllocationDecider decider = new DataTierAllocationDecider(clusterSettings); + private final AllocationDeciders allocationDeciders = new AllocationDeciders( + Arrays.asList(decider, + new SameShardAllocationDecider(Settings.EMPTY, clusterSettings), + new ReplicaAfterPrimaryActiveAllocationDecider())); + private final AllocationService service = new AllocationService(allocationDeciders, + new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + + private final ShardRouting shard = ShardRouting.newUnassigned(new ShardId("myindex", "myindex", 0), true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "index created")); + + static { + Set> allSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + allSettings.add(DataTierAllocationDecider.CLUSTER_ROUTING_REQUIRE_SETTING); + allSettings.add(DataTierAllocationDecider.CLUSTER_ROUTING_INCLUDE_SETTING); + allSettings.add(DataTierAllocationDecider.CLUSTER_ROUTING_EXCLUDE_SETTING); + ALL_SETTINGS = allSettings; + } + + public void testClusterRequires() { + ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state")); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, + null, 0); + allocation.debugDecision(true); + clusterSettings.applySettings(Settings.builder() + .put(DataTierAllocationDecider.CLUSTER_ROUTING_REQUIRE, "data_hot") + .build()); + Decision d; + RoutingNode node; + + for (DiscoveryNode n : Arrays.asList(HOT_NODE, DATA_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.YES)); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.YES)); + } + + for (DiscoveryNode n : Arrays.asList(WARM_NODE, COLD_NODE, FROZEN_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node does not match all cluster setting [cluster.routing.allocation.require._tier] " + + "tier filters [data_hot]")); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node does not match all cluster setting [cluster.routing.allocation.require._tier] " + + "tier filters [data_hot]")); + } + } + + public void testClusterIncludes() { + ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state")); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, + null, 0); + allocation.debugDecision(true); + clusterSettings.applySettings(Settings.builder() + .put(DataTierAllocationDecider.CLUSTER_ROUTING_INCLUDE, "data_warm,data_frozen") + .build()); + Decision d; + RoutingNode node; + + for (DiscoveryNode n : Arrays.asList(WARM_NODE, FROZEN_NODE, DATA_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.YES)); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.YES)); + } + + for (DiscoveryNode n : Arrays.asList(HOT_NODE, COLD_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node does not match any cluster setting [cluster.routing.allocation.include._tier] " + + "tier filters [data_warm,data_frozen]")); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node does not match any cluster setting [cluster.routing.allocation.include._tier] " + + "tier filters [data_warm,data_frozen]")); + } + } + + + public void testClusterExcludes() { + ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state")); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, + null, 0); + allocation.debugDecision(true); + clusterSettings.applySettings(Settings.builder() + .put(DataTierAllocationDecider.CLUSTER_ROUTING_EXCLUDE, "data_warm,data_frozen") + .build()); + Decision d; + RoutingNode node; + + for (DiscoveryNode n : Arrays.asList(WARM_NODE, FROZEN_NODE, DATA_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node matches any cluster setting [cluster.routing.allocation.exclude._tier] " + + "tier filters [data_warm,data_frozen]")); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node matches any cluster setting [cluster.routing.allocation.exclude._tier] " + + "tier filters [data_warm,data_frozen]")); + + } + + for (DiscoveryNode n : Arrays.asList(HOT_NODE, COLD_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(n.toString(), d.type(), equalTo(Decision.Type.YES)); + d = decider.canRemain(shard, node, allocation); + assertThat(n.toString(), d.type(), equalTo(Decision.Type.YES)); + } + } + + public void testIndexRequires() { + ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"), + Settings.builder() + .put(DataTierAllocationDecider.INDEX_ROUTING_REQUIRE, "data_hot") + .build()); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, + null, 0); + allocation.debugDecision(true); + Decision d; + RoutingNode node; + + for (DiscoveryNode n : Arrays.asList(HOT_NODE, DATA_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.YES)); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.YES)); + } + + for (DiscoveryNode n : Arrays.asList(WARM_NODE, COLD_NODE, FROZEN_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node does not match all index setting [index.routing.allocation.require._tier] tier filters [data_hot]")); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node does not match all index setting [index.routing.allocation.require._tier] tier filters [data_hot]")); + } + } + + public void testIndexIncludes() { + ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"), + Settings.builder() + .put(DataTierAllocationDecider.INDEX_ROUTING_INCLUDE, "data_warm,data_frozen") + .build()); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, + null, 0); + allocation.debugDecision(true); + Decision d; + RoutingNode node; + + for (DiscoveryNode n : Arrays.asList(WARM_NODE, FROZEN_NODE, DATA_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.YES)); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.YES)); + } + + for (DiscoveryNode n : Arrays.asList(HOT_NODE, COLD_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node does not match any index setting [index.routing.allocation.include._tier] " + + "tier filters [data_warm,data_frozen]")); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node does not match any index setting [index.routing.allocation.include._tier] " + + "tier filters [data_warm,data_frozen]")); + } + } + + public void testIndexExcludes() { + ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"), + Settings.builder() + .put(DataTierAllocationDecider.INDEX_ROUTING_EXCLUDE, "data_warm,data_frozen") + .build()); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, + null, 0); + allocation.debugDecision(true); + Decision d; + RoutingNode node; + + for (DiscoveryNode n : Arrays.asList(WARM_NODE, FROZEN_NODE, DATA_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node matches any index setting [index.routing.allocation.exclude._tier] " + + "tier filters [data_warm,data_frozen]")); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node matches any index setting [index.routing.allocation.exclude._tier] " + + "tier filters [data_warm,data_frozen]")); + + } + + for (DiscoveryNode n : Arrays.asList(HOT_NODE, COLD_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(n.toString(), d.type(), equalTo(Decision.Type.YES)); + d = decider.canRemain(shard, node, allocation); + assertThat(n.toString(), d.type(), equalTo(Decision.Type.YES)); + } + } + + private ClusterState prepareState(ClusterState initialState) { + return prepareState(initialState, Settings.EMPTY); + } + + private ClusterState prepareState(ClusterState initialState, Settings indexSettings) { + return ClusterState.builder(initialState) + .nodes(DiscoveryNodes.builder() + .add(HOT_NODE) + .add(WARM_NODE) + .add(COLD_NODE) + .add(FROZEN_NODE) + .add(DATA_NODE) + .build()) + .metadata(Metadata.builder() + .put(IndexMetadata.builder("myindex") + .settings(Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "myindex") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(indexSettings) + .build())) + .build()) + .build(); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/DataTierTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/DataTierTests.java new file mode 100644 index 0000000000000..e45560b629f5b --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/DataTierTests.java @@ -0,0 +1,129 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.StreamSupport; + +import static org.hamcrest.Matchers.arrayContainingInAnyOrder; + +public class DataTierTests extends ESTestCase { + + private static final AtomicInteger idGenerator = new AtomicInteger(); + + public void testNodeSelection() { + DiscoveryNodes discoveryNodes = buildDiscoveryNodes(); + + final String[] dataNodes = + StreamSupport.stream(discoveryNodes.getNodes().values().spliterator(), false) + .map(n -> n.value) + .filter(DiscoveryNode::isDataNode) + .map(DiscoveryNode::getId) + .toArray(String[]::new); + + final String[] hotNodes = + StreamSupport.stream(discoveryNodes.getNodes().values().spliterator(), false) + .map(n -> n.value) + .filter(DataTier::isHotNode) + .map(DiscoveryNode::getId) + .toArray(String[]::new); + + final String[] warmNodes = + StreamSupport.stream(discoveryNodes.getNodes().values().spliterator(), false) + .map(n -> n.value) + .filter(DataTier::isWarmNode) + .map(DiscoveryNode::getId) + .toArray(String[]::new); + + final String[] coldNodes = + StreamSupport.stream(discoveryNodes.getNodes().values().spliterator(), false) + .map(n -> n.value) + .filter(DataTier::isColdNode) + .map(DiscoveryNode::getId) + .toArray(String[]::new); + + final String[] frozenNodes = + StreamSupport.stream(discoveryNodes.getNodes().values().spliterator(), false) + .map(n -> n.value) + .filter(DataTier::isFrozenNode) + .map(DiscoveryNode::getId) + .toArray(String[]::new); + + assertThat(discoveryNodes.resolveNodes("data:true"), arrayContainingInAnyOrder(dataNodes)); + assertThat(discoveryNodes.resolveNodes("data_hot:true"), arrayContainingInAnyOrder(hotNodes)); + assertThat(discoveryNodes.resolveNodes("data_warm:true"), arrayContainingInAnyOrder(warmNodes)); + assertThat(discoveryNodes.resolveNodes("data_cold:true"), arrayContainingInAnyOrder(coldNodes)); + assertThat(discoveryNodes.resolveNodes("data_frozen:true"), arrayContainingInAnyOrder(frozenNodes)); + Set allTiers = new HashSet<>(Arrays.asList(hotNodes)); + allTiers.addAll(Arrays.asList(warmNodes)); + allTiers.addAll(Arrays.asList(coldNodes)); + allTiers.addAll(Arrays.asList(frozenNodes)); + assertThat(discoveryNodes.resolveNodes("data:true"), arrayContainingInAnyOrder(allTiers.toArray(Strings.EMPTY_ARRAY))); + } + + private static DiscoveryNodes buildDiscoveryNodes() { + int numNodes = randomIntBetween(3, 15); + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + List nodesList = randomNodes(numNodes); + for (DiscoveryNode node : nodesList) { + discoBuilder = discoBuilder.add(node); + } + discoBuilder.localNodeId(randomFrom(nodesList).getId()); + discoBuilder.masterNodeId(randomFrom(nodesList).getId()); + return discoBuilder.build(); + } + + private static DiscoveryNode newNode(int nodeId, Map attributes, Set roles) { + return new DiscoveryNode("name_" + nodeId, "node_" + nodeId, buildNewFakeTransportAddress(), attributes, roles, + Version.CURRENT); + } + + private static List randomNodes(final int numNodes) { + Set allRoles = new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES); + allRoles.remove(DiscoveryNodeRole.DATA_ROLE); + allRoles.add(DataTier.DATA_HOT_NODE_ROLE); + allRoles.add(DataTier.DATA_WARM_NODE_ROLE); + allRoles.add(DataTier.DATA_COLD_NODE_ROLE); + allRoles.add(DataTier.DATA_FROZEN_NODE_ROLE); + List nodesList = new ArrayList<>(); + for (int i = 0; i < numNodes; i++) { + Map attributes = new HashMap<>(); + if (frequently()) { + attributes.put("custom", randomBoolean() ? "match" : randomAlphaOfLengthBetween(3, 5)); + } + final Set roles = new HashSet<>(randomSubsetOf(allRoles)); + if (frequently()) { + roles.add(new DiscoveryNodeRole("custom_role", "cr") { + + @Override + public Setting legacySetting() { + return null; + } + + }); + } + final DiscoveryNode node = newNode(idGenerator.getAndIncrement(), attributes, roles); + nodesList.add(node); + } + return nodesList; + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 1342ea117596f..e4c7cfb38ab77 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -45,6 +45,7 @@ import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.core.DataTier; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; @@ -153,7 +154,9 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa */ private static final Setting TRANSFORM_ENABLED_NODE = Setting.boolSetting( "node.transform", - settings -> Boolean.toString(DiscoveryNode.isDataNode(settings)), + settings -> + // Don't use DiscoveryNode#isDataNode(Settings) here, as it is called before all plugins are initialized + Boolean.toString(DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE) || DataTier.isExplicitDataTier(settings)), Property.Deprecated, Property.NodeScope ); @@ -167,7 +170,9 @@ public Setting legacySetting() { @Override public boolean isEnabledByDefault(final Settings settings) { - return super.isEnabledByDefault(settings) && DiscoveryNode.isDataNode(settings); + return super.isEnabledByDefault(settings) && + // Don't use DiscoveryNode#isDataNode(Settings) here, as it is called before all plugins are initialized + (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE) || DataTier.isExplicitDataTier(settings)); } }; From 9a5233d281ff5582dc211dce54e2067a966ea6e3 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 12 Aug 2020 08:13:36 -0600 Subject: [PATCH 2/4] Use real "if" statements --- .../allocation/DataTierAllocationDecider.java | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDecider.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDecider.java index f1686cae8787c..3f5cce55ebf79 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDecider.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDecider.java @@ -92,30 +92,42 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl @Override public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) { Decision decision = shouldClusterFilter(node, allocation); - if (decision != null) return decision; + if (decision != null) { + return decision; + } decision = shouldIndexFilter(indexMetadata, node, allocation); - if (decision != null) return decision; + if (decision != null) { + return decision; + } return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters"); } private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) { Decision decision = shouldClusterFilter(node, allocation); - if (decision != null) return decision; + if (decision != null) { + return decision; + } decision = shouldIndexFilter(allocation.metadata().getIndexSafe(shardRouting.index()), node, allocation); - if (decision != null) return decision; + if (decision != null) { + return decision; + } return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters"); } private Decision shouldFilter(IndexMetadata indexMd, DiscoveryNode node, RoutingAllocation allocation) { Decision decision = shouldClusterFilter(node, allocation); - if (decision != null) return decision; + if (decision != null) { + return decision; + } decision = shouldIndexFilter(indexMd, node, allocation); - if (decision != null) return decision; + if (decision != null) { + return decision; + } return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters"); } From 4373994d3d969271d75601777244d9fb70fef3c0 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 12 Aug 2020 08:15:19 -0600 Subject: [PATCH 3/4] Clarify documentation message --- .../java/org/elasticsearch/cluster/node/DiscoveryNode.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java index f9aae518f81cd..299989815b7e8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java @@ -74,9 +74,9 @@ public static boolean isMasterNode(final Settings settings) { } /** - * Note, due to the way that plugins may not be available when settings are being initialized, - * do not use this from a static {@link Setting} default value function, as it may not contain - * all roles at that point. + * Due to the way that plugins may not be available when settings are being initialized, + * not all roles may be available from a static/initializing context such as a {@link Setting} + * default value function. In that case, be warned that this may not include all plugin roles. */ public static boolean isDataNode(final Settings settings) { return getRolesFromSettings(settings).stream().anyMatch(DiscoveryNodeRole::canContainData); From b0b4c08cfc5129d48b79c73d379494381a78d8b3 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 12 Aug 2020 08:24:24 -0600 Subject: [PATCH 4/4] Add a test using both cluster and index settings --- .../DataTierAllocationDeciderTests.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java index c67da35158875..bedbcdeecca3e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java @@ -275,6 +275,55 @@ public void testIndexExcludes() { } } + public void testClusterAndIndex() { + ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"), + Settings.builder() + .put(DataTierAllocationDecider.INDEX_ROUTING_INCLUDE, "data_warm,data_frozen") + .build()); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, + null, 0); + clusterSettings.applySettings(Settings.builder() + .put(DataTierAllocationDecider.CLUSTER_ROUTING_EXCLUDE, "data_frozen") + .build()); + allocation.debugDecision(true); + Decision d; + RoutingNode node; + + for (DiscoveryNode n : Arrays.asList(HOT_NODE, COLD_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(node.toString(), d.type(), equalTo(Decision.Type.NO)); + assertThat(node.toString(), d.getExplanation(), + containsString("node does not match any index setting [index.routing.allocation.include._tier] " + + "tier filters [data_warm,data_frozen]")); + d = decider.canRemain(shard, node, allocation); + assertThat(node.toString(), d.type(), equalTo(Decision.Type.NO)); + assertThat(node.toString(), d.getExplanation(), + containsString("node does not match any index setting [index.routing.allocation.include._tier] " + + "tier filters [data_warm,data_frozen]")); + } + + for (DiscoveryNode n : Arrays.asList(FROZEN_NODE, DATA_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(node.toString(), d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node matches any cluster setting [cluster.routing.allocation.exclude._tier] tier filters [data_frozen]")); + d = decider.canRemain(shard, node, allocation); + assertThat(node.toString(), d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node matches any cluster setting [cluster.routing.allocation.exclude._tier] tier filters [data_frozen]")); + } + + for (DiscoveryNode n : Arrays.asList(WARM_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(n.toString(), d.type(), equalTo(Decision.Type.YES)); + d = decider.canRemain(shard, node, allocation); + assertThat(n.toString(), d.type(), equalTo(Decision.Type.YES)); + } + } + private ClusterState prepareState(ClusterState initialState) { return prepareState(initialState, Settings.EMPTY); }