From 3082f87d9c2337567a8ecb8338a47cb03d38d4c8 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 12 Aug 2020 10:15:52 -0600 Subject: [PATCH] Add data tiers (hot, warm, cold, frozen) as custom node roles (#60994) This commit adds the `data_hot`, `data_warm`, `data_cold`, and `data_frozen` node roles to the x-pack plugin. These roles are intended to be the base for the formalization of data tiers in Elasticsearch. These roles all act as data nodes (meaning shards can be allocated to them). Nodes with the existing `data` role acts as though they have all of the roles configured (it is a hot, warm, cold, and frozen node). This also includes a custom `AllocationDecider` that allows the user to configure the following settings on a cluster level: - `cluster.routing.allocation.require._tier` - `cluster.routing.allocation.include._tier` - `cluster.routing.allocation.exclude._tier` And in index settings: - `index.routing.allocation.require._tier` - `index.routing.allocation.include._tier` - `index.routing.allocation.exclude._tier` Relates to #60848 --- .../cluster/node/DiscoveryNode.java | 10 +- .../cluster/node/DiscoveryNodeRole.java | 11 + .../java/org/elasticsearch/node/Node.java | 8 +- .../test/InternalTestCluster.java | 4 +- .../allocation/DataTierAllocationDecider.java | 210 +++++++++++ .../elasticsearch/xpack/core/DataTier.java | 140 +++++++ .../elasticsearch/xpack/core/XPackPlugin.java | 29 +- .../DataTierAllocationDeciderTests.java | 352 ++++++++++++++++++ .../xpack/core/DataTierTests.java | 129 +++++++ .../xpack/transform/Transform.java | 9 +- 10 files changed, 893 insertions(+), 9 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDecider.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/DataTier.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/DataTierTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java index d57b9e30d2f40..94bddd40bd919 100644 --- a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.xcontent.ToXContentFragment; @@ -83,8 +84,13 @@ public static boolean isMasterNode(Settings settings) { return hasRole(settings, DiscoveryNodeRole.MASTER_ROLE); } + /** + * Due to the way that plugins may not be available when settings are being initialized, + * not all roles may be available from a static/initializing context such as a {@link Setting} + * default value function. In that case, be warned that this may not include all plugin roles. + */ public static boolean isDataNode(final Settings settings) { - return hasRole(settings, DiscoveryNodeRole.DATA_ROLE); + return getRolesFromSettings(settings).stream().anyMatch(DiscoveryNodeRole::canContainData); } public static boolean isIngestNode(Settings settings) { @@ -383,7 +389,7 @@ public Map getAttributes() { * Should this node hold data (shards) or not. */ public boolean isDataNode() { - return roles.contains(DiscoveryNodeRole.DATA_ROLE); + return roles.stream().anyMatch(DiscoveryNodeRole::canContainData); } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeRole.java b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeRole.java index 2a894291ecc0e..d72a7042d7091 100644 --- a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeRole.java +++ b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeRole.java @@ -84,6 +84,13 @@ private DiscoveryNodeRole(final boolean isKnownRole, final String roleName, fina public abstract Setting legacySetting(); + /** + * Indicates whether a node with the given role can contain data. Defaults to false and can be overridden + */ + public boolean canContainData() { + return false; + } + @Override public final boolean equals(Object o) { if (this == o) return true; @@ -124,6 +131,10 @@ public Setting legacySetting() { return Setting.boolSetting("node.data", true, Property.Deprecated, Property.NodeScope); } + @Override + public boolean canContainData() { + return true; + } }; /** diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index c97c0e2a177b1..3da548addeac0 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -190,6 +190,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -345,8 +346,11 @@ protected Node(final Environment initialEnvironment, this.environment = new Environment(settings, initialEnvironment.configFile(), Node.NODE_LOCAL_STORAGE_SETTING.get(settings)); Environment.assertEquivalent(initialEnvironment, this.environment); nodeEnvironment = new NodeEnvironment(tmpSettings, environment); - logger.info("node name [{}], node ID [{}], cluster name [{}]", - NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId(), ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value()); + logger.info("node name [{}], node ID [{}], cluster name [{}], roles {}", + NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId(), ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value(), + DiscoveryNode.getRolesFromSettings(settings).stream() + .map(DiscoveryNodeRole::roleName) + .collect(Collectors.toCollection(LinkedHashSet::new))); resourcesToClose.add(nodeEnvironment); localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId()); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 0aa10619bedab..3818908fae3c6 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -766,11 +766,11 @@ private static String getRoleSuffix(Settings settings) { if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.MASTER_ROLE)) { suffix = suffix + DiscoveryNodeRole.MASTER_ROLE.roleNameAbbreviation(); } - if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE)) { + if (DiscoveryNode.isDataNode(settings)) { suffix = suffix + DiscoveryNodeRole.DATA_ROLE.roleNameAbbreviation(); } if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.MASTER_ROLE) == false - && DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE) == false) { + && DiscoveryNode.isDataNode(settings) == false) { suffix = suffix + "c"; } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDecider.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDecider.java new file mode 100644 index 0000000000000..3f5cce55ebf79 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDecider.java @@ -0,0 +1,210 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.cluster.routing.allocation; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.xpack.core.DataTier; + +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * The {@code DataTierAllocationDecider} is a custom allocation decider that behaves similar to the + * {@link org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider}, however it + * is specific to the {@code _tier} setting for both the cluster and index level. + */ +public class DataTierAllocationDecider extends AllocationDecider { + + public static final String NAME = "data_tier"; + + public static final String CLUSTER_ROUTING_REQUIRE = "cluster.routing.allocation.require._tier"; + public static final String CLUSTER_ROUTING_INCLUDE = "cluster.routing.allocation.include._tier"; + public static final String CLUSTER_ROUTING_EXCLUDE = "cluster.routing.allocation.exclude._tier"; + public static final String INDEX_ROUTING_REQUIRE = "index.routing.allocation.require._tier"; + public static final String INDEX_ROUTING_INCLUDE = "index.routing.allocation.include._tier"; + public static final String INDEX_ROUTING_EXCLUDE = "index.routing.allocation.exclude._tier"; + + public static final Setting CLUSTER_ROUTING_REQUIRE_SETTING = Setting.simpleString(CLUSTER_ROUTING_REQUIRE, + DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.NodeScope); + public static final Setting CLUSTER_ROUTING_INCLUDE_SETTING = Setting.simpleString(CLUSTER_ROUTING_INCLUDE, + DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.NodeScope); + public static final Setting CLUSTER_ROUTING_EXCLUDE_SETTING = Setting.simpleString(CLUSTER_ROUTING_EXCLUDE, + DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.NodeScope); + public static final Setting INDEX_ROUTING_REQUIRE_SETTING = Setting.simpleString(INDEX_ROUTING_REQUIRE, + DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.IndexScope); + public static final Setting INDEX_ROUTING_INCLUDE_SETTING = Setting.simpleString(INDEX_ROUTING_INCLUDE, + DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.IndexScope); + public static final Setting INDEX_ROUTING_EXCLUDE_SETTING = Setting.simpleString(INDEX_ROUTING_EXCLUDE, + DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.IndexScope); + + private static void validateTierSetting(String setting) { + if (Strings.hasText(setting)) { + Set invalidTiers = Arrays.stream(setting.split(",")) + .filter(tier -> DataTier.validTierName(tier) == false) + .collect(Collectors.toSet()); + if (invalidTiers.size() > 0) { + throw new IllegalArgumentException("invalid tier names: " + invalidTiers); + } + } + } + + private volatile String clusterRequire = null; + private volatile String clusterInclude = null; + private volatile String clusterExclude = null; + + public DataTierAllocationDecider(ClusterSettings clusterSettings) { + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_REQUIRE_SETTING, s -> this.clusterRequire = s); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_INCLUDE_SETTING, s -> this.clusterInclude = s); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_SETTING, s -> this.clusterExclude = s); + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return shouldFilter(shardRouting, node.node(), allocation); + } + + @Override + public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) { + return shouldFilter(indexMetadata, node.node(), allocation); + } + + @Override + public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return shouldFilter(shardRouting, node.node(), allocation); + } + + @Override + public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) { + Decision decision = shouldClusterFilter(node, allocation); + if (decision != null) { + return decision; + } + + decision = shouldIndexFilter(indexMetadata, node, allocation); + if (decision != null) { + return decision; + } + + return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters"); + } + + private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) { + Decision decision = shouldClusterFilter(node, allocation); + if (decision != null) { + return decision; + } + + decision = shouldIndexFilter(allocation.metadata().getIndexSafe(shardRouting.index()), node, allocation); + if (decision != null) { + return decision; + } + + return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters"); + } + + private Decision shouldFilter(IndexMetadata indexMd, DiscoveryNode node, RoutingAllocation allocation) { + Decision decision = shouldClusterFilter(node, allocation); + if (decision != null) { + return decision; + } + + decision = shouldIndexFilter(indexMd, node, allocation); + if (decision != null) { + return decision; + } + + return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters"); + } + + private Decision shouldIndexFilter(IndexMetadata indexMd, DiscoveryNode node, RoutingAllocation allocation) { + Settings indexSettings = indexMd.getSettings(); + String indexRequire = INDEX_ROUTING_REQUIRE_SETTING.get(indexSettings); + String indexInclude = INDEX_ROUTING_INCLUDE_SETTING.get(indexSettings); + String indexExclude = INDEX_ROUTING_EXCLUDE_SETTING.get(indexSettings); + + if (Strings.hasText(indexRequire)) { + if (allocationAllowed(OpType.AND, indexRequire, node) == false) { + return allocation.decision(Decision.NO, NAME, "node does not match all index setting [%s] tier filters [%s]", + INDEX_ROUTING_REQUIRE, indexRequire); + } + } + if (Strings.hasText(indexInclude)) { + if (allocationAllowed(OpType.OR, indexInclude, node) == false) { + return allocation.decision(Decision.NO, NAME, "node does not match any index setting [%s] tier filters [%s]", + INDEX_ROUTING_INCLUDE, indexInclude); + } + } + if (Strings.hasText(indexExclude)) { + if (allocationAllowed(OpType.OR, indexExclude, node)) { + return allocation.decision(Decision.NO, NAME, "node matches any index setting [%s] tier filters [%s]", + INDEX_ROUTING_EXCLUDE, indexExclude); + } + } + return null; + } + + private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation allocation) { + if (Strings.hasText(clusterRequire)) { + if (allocationAllowed(OpType.AND, clusterRequire, node) == false) { + return allocation.decision(Decision.NO, NAME, "node does not match all cluster setting [%s] tier filters [%s]", + CLUSTER_ROUTING_REQUIRE, clusterRequire); + } + } + if (Strings.hasText(clusterInclude)) { + if (allocationAllowed(OpType.OR, clusterInclude, node) == false) { + return allocation.decision(Decision.NO, NAME, "node does not match any cluster setting [%s] tier filters [%s]", + CLUSTER_ROUTING_INCLUDE, clusterInclude); + } + } + if (Strings.hasText(clusterExclude)) { + if (allocationAllowed(OpType.OR, clusterExclude, node)) { + return allocation.decision(Decision.NO, NAME, "node matches any cluster setting [%s] tier filters [%s]", + CLUSTER_ROUTING_EXCLUDE, clusterExclude); + } + } + return null; + } + + private enum OpType { + AND, + OR + } + + private static boolean allocationAllowed(OpType opType, String tierSetting, DiscoveryNode node) { + String[] values = Strings.tokenizeToStringArray(tierSetting, ","); + for (String value : values) { + // generic "data" roles are considered to have all tiers + if (node.getRoles().contains(DiscoveryNodeRole.DATA_ROLE) || + node.getRoles().stream().map(DiscoveryNodeRole::roleName).collect(Collectors.toSet()).contains(value)) { + if (opType == OpType.OR) { + return true; + } + } else { + if (opType == OpType.AND) { + return false; + } + } + } + if (opType == OpType.OR) { + return false; + } else { + return true; + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/DataTier.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/DataTier.java new file mode 100644 index 0000000000000..08e093aa0ff51 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/DataTier.java @@ -0,0 +1,140 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; + +/** + * The {@code DataTier} class encapsulates the formalization of the "hot", + * "warm", "cold", and "frozen" tiers as node roles. In contains the roles + * themselves as well as helpers for validation and determining if a node has + * a tier configured. + * + * Related: + * {@link org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider} + */ +public class DataTier { + + public static final String DATA_HOT = "data_hot"; + public static final String DATA_WARM = "data_warm"; + public static final String DATA_COLD = "data_cold"; + public static final String DATA_FROZEN = "data_frozen"; + + /** + * Returns true if the given tier name is a valid tier + */ + public static boolean validTierName(String tierName) { + return DATA_HOT.equals(tierName) || + DATA_WARM.equals(tierName) || + DATA_COLD.equals(tierName) || + DATA_FROZEN.equals(tierName); + } + + /** + * Returns true iff the given settings have a data tier setting configured + */ + public static boolean isExplicitDataTier(Settings settings) { + /* + * This method can be called before the o.e.n.NodeRoleSettings.NODE_ROLES_SETTING is + * initialized. We do not want to trigger initialization prematurely because that will bake + * the default roles before plugins have had a chance to register them. Therefore, + * to avoid initializing this setting prematurely, we avoid using the actual node roles + * setting instance here in favor of the string. + */ + if (settings.hasValue("node.roles")) { + return settings.getAsList("node.roles").stream().anyMatch(DataTier::validTierName); + } + return false; + } + + public static DiscoveryNodeRole DATA_HOT_NODE_ROLE = new DiscoveryNodeRole("data_hot", "h") { + @Override + public boolean isEnabledByDefault(final Settings settings) { + return false; + } + + @Override + public Setting legacySetting() { + return null; + } + + @Override + public boolean canContainData() { + return true; + } + }; + + public static DiscoveryNodeRole DATA_WARM_NODE_ROLE = new DiscoveryNodeRole("data_warm", "w") { + @Override + public boolean isEnabledByDefault(final Settings settings) { + return false; + } + + @Override + public Setting legacySetting() { + return null; + } + + @Override + public boolean canContainData() { + return true; + } + }; + + public static DiscoveryNodeRole DATA_COLD_NODE_ROLE = new DiscoveryNodeRole("data_cold", "c") { + @Override + public boolean isEnabledByDefault(final Settings settings) { + return false; + } + + @Override + public Setting legacySetting() { + return null; + } + + @Override + public boolean canContainData() { + return true; + } + }; + + public static DiscoveryNodeRole DATA_FROZEN_NODE_ROLE = new DiscoveryNodeRole("data_frozen", "f") { + @Override + public boolean isEnabledByDefault(final Settings settings) { + return false; + } + + @Override + public Setting legacySetting() { + return null; + } + + @Override + public boolean canContainData() { + return true; + } + }; + + public static boolean isHotNode(DiscoveryNode discoveryNode) { + return discoveryNode.getRoles().contains(DATA_HOT_NODE_ROLE) || discoveryNode.getRoles().contains(DiscoveryNodeRole.DATA_ROLE); + } + + public static boolean isWarmNode(DiscoveryNode discoveryNode) { + return discoveryNode.getRoles().contains(DATA_WARM_NODE_ROLE) || discoveryNode.getRoles().contains(DiscoveryNodeRole.DATA_ROLE); + } + + public static boolean isColdNode(DiscoveryNode discoveryNode) { + return discoveryNode.getRoles().contains(DATA_COLD_NODE_ROLE) || discoveryNode.getRoles().contains(DiscoveryNodeRole.DATA_ROLE); + } + + public static boolean isFrozenNode(DiscoveryNode discoveryNode) { + return discoveryNode.getRoles().contains(DATA_FROZEN_NODE_ROLE) || discoveryNode.getRoles().contains(DiscoveryNodeRole.DATA_ROLE); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java index 6b8ae63de5bdd..07967f389f315 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java @@ -20,7 +20,9 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.inject.Binder; @@ -45,6 +47,7 @@ import org.elasticsearch.license.Licensing; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.ExtensiblePlugin; import org.elasticsearch.plugins.RepositoryPlugin; @@ -56,6 +59,7 @@ import org.elasticsearch.snapshots.SourceOnlySnapshotRepository; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider; import org.elasticsearch.xpack.core.action.ReloadAnalyzerAction; import org.elasticsearch.xpack.core.action.TransportReloadAnalyzersAction; import org.elasticsearch.xpack.core.action.TransportXPackInfoAction; @@ -80,17 +84,20 @@ import java.security.PrivilegedAction; import java.time.Clock; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -public class XPackPlugin extends XPackClientPlugin implements ExtensiblePlugin, RepositoryPlugin, EnginePlugin { +public class XPackPlugin extends XPackClientPlugin implements ExtensiblePlugin, RepositoryPlugin, EnginePlugin, ClusterPlugin { private static final Logger logger = LogManager.getLogger(XPackPlugin.class); private static final DeprecationLogger deprecationLogger = new DeprecationLogger(logger); @@ -397,9 +404,29 @@ public Optional getEngineFactory(IndexSettings indexSettings) { public List> getSettings() { List> settings = super.getSettings(); settings.add(SourceOnlySnapshotRepository.SOURCE_ONLY); + settings.add(DataTierAllocationDecider.CLUSTER_ROUTING_REQUIRE_SETTING); + settings.add(DataTierAllocationDecider.CLUSTER_ROUTING_INCLUDE_SETTING); + settings.add(DataTierAllocationDecider.CLUSTER_ROUTING_EXCLUDE_SETTING); + settings.add(DataTierAllocationDecider.INDEX_ROUTING_REQUIRE_SETTING); + settings.add(DataTierAllocationDecider.INDEX_ROUTING_INCLUDE_SETTING); + settings.add(DataTierAllocationDecider.INDEX_ROUTING_EXCLUDE_SETTING); return settings; } + @Override + public Set getRoles() { + return new HashSet<>(Arrays.asList( + DataTier.DATA_HOT_NODE_ROLE, + DataTier.DATA_WARM_NODE_ROLE, + DataTier.DATA_COLD_NODE_ROLE, + DataTier.DATA_FROZEN_NODE_ROLE)); + } + + @Override + public Collection createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { + return Collections.singleton(new DataTierAllocationDecider(clusterSettings)); + } + /** * Handles the creation of the SSLService along with the necessary actions to enable reloading * of SSLContexts when configuration files change on disk. diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java new file mode 100644 index 0000000000000..bedbcdeecca3e --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java @@ -0,0 +1,352 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.cluster.routing.allocation; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.gateway.TestGatewayAllocator; +import org.elasticsearch.xpack.core.DataTier; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class DataTierAllocationDeciderTests extends ESAllocationTestCase { + + private static final Set> ALL_SETTINGS; + private static final DiscoveryNode HOT_NODE = newNode("node-hot", Collections.singleton(DataTier.DATA_HOT_NODE_ROLE)); + private static final DiscoveryNode WARM_NODE = newNode("node-warm", Collections.singleton(DataTier.DATA_WARM_NODE_ROLE)); + private static final DiscoveryNode COLD_NODE = newNode("node-cold", Collections.singleton(DataTier.DATA_COLD_NODE_ROLE)); + private static final DiscoveryNode FROZEN_NODE = newNode("node-frozen", Collections.singleton(DataTier.DATA_FROZEN_NODE_ROLE)); + private static final DiscoveryNode DATA_NODE = newNode("node-data", Collections.singleton(DiscoveryNodeRole.DATA_ROLE)); + + private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ALL_SETTINGS); + private final DataTierAllocationDecider decider = new DataTierAllocationDecider(clusterSettings); + private final AllocationDeciders allocationDeciders = new AllocationDeciders( + Arrays.asList(decider, + new SameShardAllocationDecider(Settings.EMPTY, clusterSettings), + new ReplicaAfterPrimaryActiveAllocationDecider())); + private final AllocationService service = new AllocationService(allocationDeciders, + new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + + private final ShardRouting shard = ShardRouting.newUnassigned(new ShardId("myindex", "myindex", 0), true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "index created")); + + static { + Set> allSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + allSettings.add(DataTierAllocationDecider.CLUSTER_ROUTING_REQUIRE_SETTING); + allSettings.add(DataTierAllocationDecider.CLUSTER_ROUTING_INCLUDE_SETTING); + allSettings.add(DataTierAllocationDecider.CLUSTER_ROUTING_EXCLUDE_SETTING); + ALL_SETTINGS = allSettings; + } + + public void testClusterRequires() { + ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state")); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, + null, 0); + allocation.debugDecision(true); + clusterSettings.applySettings(Settings.builder() + .put(DataTierAllocationDecider.CLUSTER_ROUTING_REQUIRE, "data_hot") + .build()); + Decision d; + RoutingNode node; + + for (DiscoveryNode n : Arrays.asList(HOT_NODE, DATA_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.YES)); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.YES)); + } + + for (DiscoveryNode n : Arrays.asList(WARM_NODE, COLD_NODE, FROZEN_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node does not match all cluster setting [cluster.routing.allocation.require._tier] " + + "tier filters [data_hot]")); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node does not match all cluster setting [cluster.routing.allocation.require._tier] " + + "tier filters [data_hot]")); + } + } + + public void testClusterIncludes() { + ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state")); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, + null, 0); + allocation.debugDecision(true); + clusterSettings.applySettings(Settings.builder() + .put(DataTierAllocationDecider.CLUSTER_ROUTING_INCLUDE, "data_warm,data_frozen") + .build()); + Decision d; + RoutingNode node; + + for (DiscoveryNode n : Arrays.asList(WARM_NODE, FROZEN_NODE, DATA_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.YES)); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.YES)); + } + + for (DiscoveryNode n : Arrays.asList(HOT_NODE, COLD_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node does not match any cluster setting [cluster.routing.allocation.include._tier] " + + "tier filters [data_warm,data_frozen]")); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node does not match any cluster setting [cluster.routing.allocation.include._tier] " + + "tier filters [data_warm,data_frozen]")); + } + } + + + public void testClusterExcludes() { + ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state")); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, + null, 0); + allocation.debugDecision(true); + clusterSettings.applySettings(Settings.builder() + .put(DataTierAllocationDecider.CLUSTER_ROUTING_EXCLUDE, "data_warm,data_frozen") + .build()); + Decision d; + RoutingNode node; + + for (DiscoveryNode n : Arrays.asList(WARM_NODE, FROZEN_NODE, DATA_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node matches any cluster setting [cluster.routing.allocation.exclude._tier] " + + "tier filters [data_warm,data_frozen]")); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node matches any cluster setting [cluster.routing.allocation.exclude._tier] " + + "tier filters [data_warm,data_frozen]")); + + } + + for (DiscoveryNode n : Arrays.asList(HOT_NODE, COLD_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(n.toString(), d.type(), equalTo(Decision.Type.YES)); + d = decider.canRemain(shard, node, allocation); + assertThat(n.toString(), d.type(), equalTo(Decision.Type.YES)); + } + } + + public void testIndexRequires() { + ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"), + Settings.builder() + .put(DataTierAllocationDecider.INDEX_ROUTING_REQUIRE, "data_hot") + .build()); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, + null, 0); + allocation.debugDecision(true); + Decision d; + RoutingNode node; + + for (DiscoveryNode n : Arrays.asList(HOT_NODE, DATA_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.YES)); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.YES)); + } + + for (DiscoveryNode n : Arrays.asList(WARM_NODE, COLD_NODE, FROZEN_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node does not match all index setting [index.routing.allocation.require._tier] tier filters [data_hot]")); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node does not match all index setting [index.routing.allocation.require._tier] tier filters [data_hot]")); + } + } + + public void testIndexIncludes() { + ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"), + Settings.builder() + .put(DataTierAllocationDecider.INDEX_ROUTING_INCLUDE, "data_warm,data_frozen") + .build()); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, + null, 0); + allocation.debugDecision(true); + Decision d; + RoutingNode node; + + for (DiscoveryNode n : Arrays.asList(WARM_NODE, FROZEN_NODE, DATA_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.YES)); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.YES)); + } + + for (DiscoveryNode n : Arrays.asList(HOT_NODE, COLD_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node does not match any index setting [index.routing.allocation.include._tier] " + + "tier filters [data_warm,data_frozen]")); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node does not match any index setting [index.routing.allocation.include._tier] " + + "tier filters [data_warm,data_frozen]")); + } + } + + public void testIndexExcludes() { + ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"), + Settings.builder() + .put(DataTierAllocationDecider.INDEX_ROUTING_EXCLUDE, "data_warm,data_frozen") + .build()); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, + null, 0); + allocation.debugDecision(true); + Decision d; + RoutingNode node; + + for (DiscoveryNode n : Arrays.asList(WARM_NODE, FROZEN_NODE, DATA_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node matches any index setting [index.routing.allocation.exclude._tier] " + + "tier filters [data_warm,data_frozen]")); + d = decider.canRemain(shard, node, allocation); + assertThat(d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node matches any index setting [index.routing.allocation.exclude._tier] " + + "tier filters [data_warm,data_frozen]")); + + } + + for (DiscoveryNode n : Arrays.asList(HOT_NODE, COLD_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(n.toString(), d.type(), equalTo(Decision.Type.YES)); + d = decider.canRemain(shard, node, allocation); + assertThat(n.toString(), d.type(), equalTo(Decision.Type.YES)); + } + } + + public void testClusterAndIndex() { + ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"), + Settings.builder() + .put(DataTierAllocationDecider.INDEX_ROUTING_INCLUDE, "data_warm,data_frozen") + .build()); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, + null, 0); + clusterSettings.applySettings(Settings.builder() + .put(DataTierAllocationDecider.CLUSTER_ROUTING_EXCLUDE, "data_frozen") + .build()); + allocation.debugDecision(true); + Decision d; + RoutingNode node; + + for (DiscoveryNode n : Arrays.asList(HOT_NODE, COLD_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(node.toString(), d.type(), equalTo(Decision.Type.NO)); + assertThat(node.toString(), d.getExplanation(), + containsString("node does not match any index setting [index.routing.allocation.include._tier] " + + "tier filters [data_warm,data_frozen]")); + d = decider.canRemain(shard, node, allocation); + assertThat(node.toString(), d.type(), equalTo(Decision.Type.NO)); + assertThat(node.toString(), d.getExplanation(), + containsString("node does not match any index setting [index.routing.allocation.include._tier] " + + "tier filters [data_warm,data_frozen]")); + } + + for (DiscoveryNode n : Arrays.asList(FROZEN_NODE, DATA_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(node.toString(), d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node matches any cluster setting [cluster.routing.allocation.exclude._tier] tier filters [data_frozen]")); + d = decider.canRemain(shard, node, allocation); + assertThat(node.toString(), d.type(), equalTo(Decision.Type.NO)); + assertThat(d.getExplanation(), + containsString("node matches any cluster setting [cluster.routing.allocation.exclude._tier] tier filters [data_frozen]")); + } + + for (DiscoveryNode n : Arrays.asList(WARM_NODE)) { + node = new RoutingNode(n.getId(), n, shard); + d = decider.canAllocate(shard, node, allocation); + assertThat(n.toString(), d.type(), equalTo(Decision.Type.YES)); + d = decider.canRemain(shard, node, allocation); + assertThat(n.toString(), d.type(), equalTo(Decision.Type.YES)); + } + } + + private ClusterState prepareState(ClusterState initialState) { + return prepareState(initialState, Settings.EMPTY); + } + + private ClusterState prepareState(ClusterState initialState, Settings indexSettings) { + return ClusterState.builder(initialState) + .nodes(DiscoveryNodes.builder() + .add(HOT_NODE) + .add(WARM_NODE) + .add(COLD_NODE) + .add(FROZEN_NODE) + .add(DATA_NODE) + .build()) + .metadata(Metadata.builder() + .put(IndexMetadata.builder("myindex") + .settings(Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "myindex") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(indexSettings) + .build())) + .build()) + .build(); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/DataTierTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/DataTierTests.java new file mode 100644 index 0000000000000..e45560b629f5b --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/DataTierTests.java @@ -0,0 +1,129 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.StreamSupport; + +import static org.hamcrest.Matchers.arrayContainingInAnyOrder; + +public class DataTierTests extends ESTestCase { + + private static final AtomicInteger idGenerator = new AtomicInteger(); + + public void testNodeSelection() { + DiscoveryNodes discoveryNodes = buildDiscoveryNodes(); + + final String[] dataNodes = + StreamSupport.stream(discoveryNodes.getNodes().values().spliterator(), false) + .map(n -> n.value) + .filter(DiscoveryNode::isDataNode) + .map(DiscoveryNode::getId) + .toArray(String[]::new); + + final String[] hotNodes = + StreamSupport.stream(discoveryNodes.getNodes().values().spliterator(), false) + .map(n -> n.value) + .filter(DataTier::isHotNode) + .map(DiscoveryNode::getId) + .toArray(String[]::new); + + final String[] warmNodes = + StreamSupport.stream(discoveryNodes.getNodes().values().spliterator(), false) + .map(n -> n.value) + .filter(DataTier::isWarmNode) + .map(DiscoveryNode::getId) + .toArray(String[]::new); + + final String[] coldNodes = + StreamSupport.stream(discoveryNodes.getNodes().values().spliterator(), false) + .map(n -> n.value) + .filter(DataTier::isColdNode) + .map(DiscoveryNode::getId) + .toArray(String[]::new); + + final String[] frozenNodes = + StreamSupport.stream(discoveryNodes.getNodes().values().spliterator(), false) + .map(n -> n.value) + .filter(DataTier::isFrozenNode) + .map(DiscoveryNode::getId) + .toArray(String[]::new); + + assertThat(discoveryNodes.resolveNodes("data:true"), arrayContainingInAnyOrder(dataNodes)); + assertThat(discoveryNodes.resolveNodes("data_hot:true"), arrayContainingInAnyOrder(hotNodes)); + assertThat(discoveryNodes.resolveNodes("data_warm:true"), arrayContainingInAnyOrder(warmNodes)); + assertThat(discoveryNodes.resolveNodes("data_cold:true"), arrayContainingInAnyOrder(coldNodes)); + assertThat(discoveryNodes.resolveNodes("data_frozen:true"), arrayContainingInAnyOrder(frozenNodes)); + Set allTiers = new HashSet<>(Arrays.asList(hotNodes)); + allTiers.addAll(Arrays.asList(warmNodes)); + allTiers.addAll(Arrays.asList(coldNodes)); + allTiers.addAll(Arrays.asList(frozenNodes)); + assertThat(discoveryNodes.resolveNodes("data:true"), arrayContainingInAnyOrder(allTiers.toArray(Strings.EMPTY_ARRAY))); + } + + private static DiscoveryNodes buildDiscoveryNodes() { + int numNodes = randomIntBetween(3, 15); + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + List nodesList = randomNodes(numNodes); + for (DiscoveryNode node : nodesList) { + discoBuilder = discoBuilder.add(node); + } + discoBuilder.localNodeId(randomFrom(nodesList).getId()); + discoBuilder.masterNodeId(randomFrom(nodesList).getId()); + return discoBuilder.build(); + } + + private static DiscoveryNode newNode(int nodeId, Map attributes, Set roles) { + return new DiscoveryNode("name_" + nodeId, "node_" + nodeId, buildNewFakeTransportAddress(), attributes, roles, + Version.CURRENT); + } + + private static List randomNodes(final int numNodes) { + Set allRoles = new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES); + allRoles.remove(DiscoveryNodeRole.DATA_ROLE); + allRoles.add(DataTier.DATA_HOT_NODE_ROLE); + allRoles.add(DataTier.DATA_WARM_NODE_ROLE); + allRoles.add(DataTier.DATA_COLD_NODE_ROLE); + allRoles.add(DataTier.DATA_FROZEN_NODE_ROLE); + List nodesList = new ArrayList<>(); + for (int i = 0; i < numNodes; i++) { + Map attributes = new HashMap<>(); + if (frequently()) { + attributes.put("custom", randomBoolean() ? "match" : randomAlphaOfLengthBetween(3, 5)); + } + final Set roles = new HashSet<>(randomSubsetOf(allRoles)); + if (frequently()) { + roles.add(new DiscoveryNodeRole("custom_role", "cr") { + + @Override + public Setting legacySetting() { + return null; + } + + }); + } + final DiscoveryNode node = newNode(idGenerator.getAndIncrement(), attributes, roles); + nodesList.add(node); + } + return nodesList; + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 9cb1ca4c14a39..820a6ef33d46d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -47,6 +47,7 @@ import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.core.DataTier; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.TransformNamedXContentProvider; @@ -157,7 +158,9 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa */ private static final Setting TRANSFORM_ENABLED_NODE = Setting.boolSetting( "node.transform", - settings -> Boolean.toString(DiscoveryNode.isDataNode(settings)), + settings -> + // Don't use DiscoveryNode#isDataNode(Settings) here, as it is called before all plugins are initialized + Boolean.toString(DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE) || DataTier.isExplicitDataTier(settings)), Property.Deprecated, Property.NodeScope ); @@ -171,7 +174,9 @@ public Setting legacySetting() { @Override public boolean isEnabledByDefault(final Settings settings) { - return super.isEnabledByDefault(settings) && DiscoveryNode.isDataNode(settings); + return super.isEnabledByDefault(settings) && + // Don't use DiscoveryNode#isDataNode(Settings) here, as it is called before all plugins are initialized + (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE) || DataTier.isExplicitDataTier(settings)); } };