From 5aa4ca5af535ac1f65e9429ad9418252eb9165e3 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 7 Jun 2021 08:39:15 -0600 Subject: [PATCH 1/2] Make ILM aware of node shutdown (#73690) This commit makes ILM aware of different parts of the node shutdown lifecycle. It consists are two main parts, reacting to the state during execution, and signaling the status of shutdown from ILM. Reacting to shutdown state ILM now considers nodes that are going to be shut down when deciding which node to assign for the shrink action. It uses the `NodeShutdownAllocationDecider` within the `SetSingleNodeAllocateStep` to not assign shards to a node that will be removed. If an index is already past this step and waiting for allocation, this commit adds an `isCompletable` method to the `ClusterStateWaitUntilThresholdStep` so that an allocation that cannot happen can be rewound and retried on another (non-shutdown) node. Signaling shutdown status This commit introduces the `PluginShutdownService` which deals with `ShutdownAwarePlugin` classes. This class is used to signal shutdowns to plugins, and also to gather the status of a shutdown from these plugins. ILM implements this `ShutdownAwarePlugin` to signal if an index is in a step that is unsafe, such as the actual shrink step, so that shutdown will wait until after the allocation rules have been removed by ILM. This commit also hooks up the get shutdown API response to consider the statuses of its parts (see `SingleNodeShutdownMetadata.Status#combine`) when creating a response. Relates to #70338 --- .../ShutdownPersistentTasksStatus.java | 8 +- .../metadata/ShutdownPluginsStatus.java | 13 +- .../ShutdownShardMigrationStatus.java | 8 +- .../metadata/SingleNodeShutdownMetadata.java | 41 +++++- .../java/org/elasticsearch/node/Node.java | 7 + .../plugins/ShutdownAwarePlugin.java | 34 +++++ .../shutdown/PluginShutdownService.java | 108 ++++++++++++++ .../SingleNodeShutdownMetadataTests.java | 45 ++++++ .../xpack/core/ilm/CheckShrinkReadyStep.java | 21 +++ .../xpack/core/ilm/ClusterStateWaitStep.java | 10 ++ .../ClusterStateWaitUntilThresholdStep.java | 16 +++ .../core/ilm/SetSingleNodeAllocateStep.java | 4 +- .../core/ilm/CheckShrinkReadyStepTests.java | 107 ++++++++++++++ ...usterStateWaitUntilThresholdStepTests.java | 57 ++++++++ .../xpack/ilm/IndexLifecycleService.java | 65 ++++++++- .../xpack/ilm/IndexLifecycleServiceTests.java | 99 +++++++++++++ .../xpack/shutdown/NodeShutdownPluginsIT.java | 134 ++++++++++++++++++ .../xpack/shutdown/PutShutdownNodeAction.java | 2 +- .../shutdown/SingleNodeShutdownStatus.java | 19 ++- .../TransportGetShutdownStatusAction.java | 13 +- .../GetShutdownStatusResponseTests.java | 2 +- 21 files changed, 794 insertions(+), 19 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/plugins/ShutdownAwarePlugin.java create mode 100644 server/src/main/java/org/elasticsearch/shutdown/PluginShutdownService.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadataTests.java create mode 100644 x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPersistentTasksStatus.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPersistentTasksStatus.java index bee04444f9667..05430a9fa770f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPersistentTasksStatus.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPersistentTasksStatus.java @@ -22,11 +22,11 @@ public class ShutdownPersistentTasksStatus implements Writeable, ToXContentObjec private final SingleNodeShutdownMetadata.Status status; public ShutdownPersistentTasksStatus() { - this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS; + this.status = SingleNodeShutdownMetadata.Status.COMPLETE; } public ShutdownPersistentTasksStatus(StreamInput in) throws IOException { - this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS; + this.status = SingleNodeShutdownMetadata.Status.COMPLETE; } @Override @@ -42,6 +42,10 @@ public void writeTo(StreamOutput out) throws IOException { } + public SingleNodeShutdownMetadata.Status getStatus() { + return status; + } + @Override public int hashCode() { return status.hashCode(); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPluginsStatus.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPluginsStatus.java index be129c281632c..964f9f93b531b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPluginsStatus.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPluginsStatus.java @@ -21,12 +21,17 @@ public class ShutdownPluginsStatus implements Writeable, ToXContentObject { private final SingleNodeShutdownMetadata.Status status; - public ShutdownPluginsStatus() { - this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS; + public ShutdownPluginsStatus(boolean safeToShutdown) { + this.status = safeToShutdown ? SingleNodeShutdownMetadata.Status.COMPLETE : + SingleNodeShutdownMetadata.Status.IN_PROGRESS; } public ShutdownPluginsStatus(StreamInput in) throws IOException { - this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS; + this.status = in.readEnum(SingleNodeShutdownMetadata.Status.class); + } + + public SingleNodeShutdownMetadata.Status getStatus() { + return this.status; } @Override @@ -39,7 +44,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public void writeTo(StreamOutput out) throws IOException { - + out.writeEnum(this.status); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownShardMigrationStatus.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownShardMigrationStatus.java index 270f244a8f7dd..60065747f2249 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownShardMigrationStatus.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownShardMigrationStatus.java @@ -22,11 +22,11 @@ public class ShutdownShardMigrationStatus implements Writeable, ToXContentObject private final SingleNodeShutdownMetadata.Status status; public ShutdownShardMigrationStatus() { - this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS; + this.status = SingleNodeShutdownMetadata.Status.COMPLETE; } public ShutdownShardMigrationStatus(StreamInput in) throws IOException { - this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS; + this.status = SingleNodeShutdownMetadata.Status.COMPLETE; } @Override @@ -42,6 +42,10 @@ public void writeTo(StreamOutput out) throws IOException { } + public SingleNodeShutdownMetadata.Status getStatus() { + return status; + } + @Override public int hashCode() { return status.hashCode(); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java index 1d1caeead9076..8e61a80d3c8a2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; +import java.util.Locale; import java.util.Objects; /** @@ -234,16 +235,52 @@ public SingleNodeShutdownMetadata build() { */ public enum Type { REMOVE, - RESTART + RESTART; + + public static Type parse(String type) { + if ("remove".equals(type.toLowerCase(Locale.ROOT))) { + return REMOVE; + } else if ("restart".equals(type.toLowerCase(Locale.ROOT))) { + return RESTART; + } else { + throw new IllegalArgumentException("unknown shutdown type: " + type); + } + } } /** * Describes the status of a component of shutdown. */ public enum Status { + // These are ordered (see #combine(...)) NOT_STARTED, IN_PROGRESS, STALLED, - COMPLETE + COMPLETE; + + /** + * Merges multiple statuses into a single, final, status + * + * For example, if called with NOT_STARTED, IN_PROGRESS, and STALLED, the returned state is STALLED. + * Called with IN_PROGRESS, IN_PROGRESS, NOT_STARTED, the returned state is IN_PROGRESS. + * Called with IN_PROGRESS, NOT_STARTED, COMPLETE, the returned state is IN_PROGRESS + * Called with COMPLETE, COMPLETE, COMPLETE, the returned state is COMPLETE + * Called with an empty array, the returned state is COMPLETE + */ + public static Status combine(Status... statuses) { + int statusOrd = -1; + for (Status status : statuses) { + // Max the status up to, but not including, "complete" + if (status != COMPLETE) { + statusOrd = Math.max(status.ordinal(), statusOrd); + } + } + if (statusOrd == -1) { + // Either all the statuses were complete, or there were no statuses given + return COMPLETE; + } else { + return Status.values()[statusOrd]; + } + } } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index f0d78585e90a7..7b4b405c92868 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -140,6 +140,7 @@ import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.plugins.ScriptPlugin; import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.plugins.ShutdownAwarePlugin; import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.repositories.RepositoriesModule; import org.elasticsearch.repositories.RepositoriesService; @@ -152,6 +153,7 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.search.aggregations.support.AggregationUsageService; import org.elasticsearch.search.fetch.FetchPhase; +import org.elasticsearch.shutdown.PluginShutdownService; import org.elasticsearch.snapshots.InternalSnapshotsInfoService; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotShardsService; @@ -697,6 +699,10 @@ protected Node(final Environment initialEnvironment, resourcesToClose.add(persistentTasksClusterService); final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); + final List shutdownAwarePlugins = pluginsService.filterPlugins(ShutdownAwarePlugin.class); + final PluginShutdownService pluginShutdownService = new PluginShutdownService(shutdownAwarePlugins); + clusterService.addListener(pluginShutdownService); + modules.add(b -> { b.bind(Node.class).toInstance(this); b.bind(NodeService.class).toInstance(nodeService); @@ -759,6 +765,7 @@ protected Node(final Environment initialEnvironment, b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator); b.bind(FsHealthService.class).toInstance(fsHealthService); b.bind(SystemIndices.class).toInstance(systemIndices); + b.bind(PluginShutdownService.class).toInstance(pluginShutdownService); b.bind(ExecutorSelector.class).toInstance(executorSelector); } ); diff --git a/server/src/main/java/org/elasticsearch/plugins/ShutdownAwarePlugin.java b/server/src/main/java/org/elasticsearch/plugins/ShutdownAwarePlugin.java new file mode 100644 index 0000000000000..1603f1ab60bc1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/plugins/ShutdownAwarePlugin.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.plugins; + +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; + +import java.util.Collection; + +/** + * A {@link ShutdownAwarePlugin} is a plugin that can be made aware of a shutdown. It comprises two + * parts, one part used for telling plugins that a set of nodes are going to be shut down + * ({@link #signalShutdown(Collection)}), the other for retrieving the status of those plugins + * as to whether it is safe to shut down ({@link #safeToShutdown(String, SingleNodeShutdownMetadata.Type)} + */ +public interface ShutdownAwarePlugin { + + /** + * Whether the plugin is considered safe to shut down. This method is called when the status of + * a shutdown is retrieved via the API, and it is only called on the master node. + */ + boolean safeToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shutdownType); + + /** + * A trigger to notify the plugin that a shutdown for the nodes has been triggered. This method + * will be called on every node for each cluster state, so it should return quickly. + */ + void signalShutdown(Collection shutdownNodeIds); +} diff --git a/server/src/main/java/org/elasticsearch/shutdown/PluginShutdownService.java b/server/src/main/java/org/elasticsearch/shutdown/PluginShutdownService.java new file mode 100644 index 0000000000000..e84b3dbb1a721 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/shutdown/PluginShutdownService.java @@ -0,0 +1,108 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.shutdown; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.plugins.ShutdownAwarePlugin; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * The {@link PluginShutdownService} is used for the node shutdown infrastructure to signal to + * plugins that a shutdown is occurring, and to check whether it is safe to shut down. + */ +public class PluginShutdownService implements ClusterStateListener { + + private static final Logger logger = LogManager.getLogger(PluginShutdownService.class); + public List plugins; + + public PluginShutdownService(@Nullable List plugins) { + this.plugins = plugins == null ? Collections.emptyList() : plugins; + } + + /** + * Return all nodes shutting down from the given cluster state + */ + public static Set shutdownNodes(final ClusterState clusterState) { + return NodesShutdownMetadata.getShutdowns(clusterState) + .map(NodesShutdownMetadata::getAllNodeMetadataMap) + .map(Map::keySet) + .orElse(Collections.emptySet()); + } + + /** + * Return all nodes shutting down with the given shutdown type from the given cluster state + */ + public static Set shutdownTypeNodes(final ClusterState clusterState, final SingleNodeShutdownMetadata.Type shutdownType) { + return NodesShutdownMetadata.getShutdowns(clusterState) + .map(NodesShutdownMetadata::getAllNodeMetadataMap) + .map(m -> m.entrySet().stream() + .filter(e -> e.getValue().getType() == shutdownType) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) + .map(Map::keySet) + .orElse(Collections.emptySet()); + } + + /** + * Check with registered plugins whether the shutdown is safe for the given node id and type + */ + public boolean readyToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shutdownType) { + // TODO: consider adding debugging information (a message about why not?) + // TODO: consider adding more fine-grained status rather than true/false + for (ShutdownAwarePlugin plugin : plugins) { + try { + if (plugin.safeToShutdown(nodeId, shutdownType) == false) { + logger.trace("shutdown aware plugin [{}] is not yet ready for shutdown", plugin); + return false; + } + } catch (Exception e) { + logger.warn("uncaught exception when retrieving whether plugin is ready for node shutdown", e); + } + } + return true; + } + + /** + * Signal to plugins the nodes that are currently shutting down + */ + public void signalShutdown(final ClusterState state) { + Set shutdownNodes = shutdownNodes(state); + for (ShutdownAwarePlugin plugin : plugins) { + try { + plugin.signalShutdown(shutdownNodes); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("uncaught exception when notifying plugins of nodes {} shutdown", shutdownNodes), e); + } + } + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + signalShutdown(event.state()); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadataTests.java new file mode 100644 index 0000000000000..82438fb0a19f6 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadataTests.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.test.ESTestCase; + +public class SingleNodeShutdownMetadataTests extends ESTestCase { + public void testStatusComination() { + SingleNodeShutdownMetadata.Status status; + + status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.NOT_STARTED, + SingleNodeShutdownMetadata.Status.IN_PROGRESS, + SingleNodeShutdownMetadata.Status.STALLED); + assertEquals(status, SingleNodeShutdownMetadata.Status.STALLED); + + status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.NOT_STARTED, + SingleNodeShutdownMetadata.Status.IN_PROGRESS, + SingleNodeShutdownMetadata.Status.NOT_STARTED); + assertEquals(status, SingleNodeShutdownMetadata.Status.IN_PROGRESS); + + status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.NOT_STARTED, + SingleNodeShutdownMetadata.Status.NOT_STARTED, + SingleNodeShutdownMetadata.Status.NOT_STARTED); + assertEquals(status, SingleNodeShutdownMetadata.Status.NOT_STARTED); + + status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.IN_PROGRESS, + SingleNodeShutdownMetadata.Status.IN_PROGRESS, + SingleNodeShutdownMetadata.Status.COMPLETE); + assertEquals(status, SingleNodeShutdownMetadata.Status.IN_PROGRESS); + + status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.COMPLETE, + SingleNodeShutdownMetadata.Status.COMPLETE, + SingleNodeShutdownMetadata.Status.COMPLETE); + assertEquals(status, SingleNodeShutdownMetadata.Status.COMPLETE); + + status = SingleNodeShutdownMetadata.Status.combine(); + assertEquals(status, SingleNodeShutdownMetadata.Status.COMPLETE); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStep.java index 1c413ac1b8e3c..3203a03228628 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStep.java @@ -11,6 +11,8 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -20,6 +22,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.Index; +import org.elasticsearch.xpack.core.ilm.step.info.SingleMessageFieldInfo; import java.io.IOException; import java.util.Locale; @@ -34,6 +37,7 @@ public class CheckShrinkReadyStep extends ClusterStateWaitStep { public static final String NAME = "check-shrink-allocation"; private static final Logger logger = LogManager.getLogger(CheckShrinkReadyStep.class); + private boolean completable = true; CheckShrinkReadyStep(StepKey key, StepKey nextStepKey) { super(key, nextStepKey); @@ -44,6 +48,11 @@ public boolean isRetryable() { return true; } + @Override + public boolean isCompletable() { + return completable; + } + @Override public Result isConditionMet(Index index, ClusterState clusterState) { IndexMetadata idxMeta = clusterState.metadata().index(index); @@ -64,6 +73,12 @@ public Result isConditionMet(Index index, ClusterState clusterState) { throw new IllegalStateException("Cannot check shrink allocation as there are no allocation rules by _id"); } + boolean nodeBeingRemoved = NodesShutdownMetadata.getShutdowns(clusterState) + .map(NodesShutdownMetadata::getAllNodeMetadataMap) + .map(shutdownMetadataMap -> shutdownMetadataMap.get(idShardsShouldBeOn)) + .map(singleNodeShutdown -> singleNodeShutdown.getType() == SingleNodeShutdownMetadata.Type.REMOVE) + .orElse(false); + final IndexRoutingTable routingTable = clusterState.getRoutingTable().index(index); int foundShards = 0; for (ShardRouting shard : routingTable.shardsWithState(ShardRoutingState.STARTED)) { @@ -81,6 +96,12 @@ public Result isConditionMet(Index index, ClusterState clusterState) { index, expectedShardCount, idShardsShouldBeOn, getKey().getAction()); return new Result(true, null); } else { + if (nodeBeingRemoved) { + completable = false; + return new Result(false, new SingleMessageFieldInfo("node with id [" + idShardsShouldBeOn + + "] is currently marked as shutting down for removal")); + } + logger.trace("{} failed to find {} allocated shards (found {}) on node [{}] for shrink readiness ({})", index, expectedShardCount, foundShards, idShardsShouldBeOn, getKey().getAction()); return new Result(false, new CheckShrinkReadyStep.Info(idShardsShouldBeOn, expectedShardCount, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitStep.java index 702c99bf1e3e5..e6e95d1110a7f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitStep.java @@ -23,6 +23,16 @@ public ClusterStateWaitStep(StepKey key, StepKey nextStepKey) { public abstract Result isConditionMet(Index index, ClusterState clusterState); + /** + * Whether the step can be completed at all. This only affects the + * {@link ClusterStateWaitUntilThresholdStep} which waits for a threshold to be met before + * retrying. Setting this to false means that ILM should retry the sequence immediately without + * waiting for the threshold to be met. + */ + public boolean isCompletable() { + return true; + } + public static class Result { private final boolean complete; private final ToXContentObject infomationContext; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitUntilThresholdStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitUntilThresholdStep.java index 0fee72e060556..7ee3e2311420c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitUntilThresholdStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitUntilThresholdStep.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.xpack.core.ilm.step.info.SingleMessageFieldInfo; @@ -69,6 +70,21 @@ public Result isConditionMet(Index index, ClusterState clusterState) { // wonderful thing) TimeValue retryThreshold = LifecycleSettings.LIFECYCLE_STEP_WAIT_TIME_THRESHOLD_SETTING.get(idxMeta.getSettings()); LifecycleExecutionState lifecycleState = fromIndexMetadata(idxMeta); + if (stepToExecute.isCompletable() == false) { + // we may not have passed the time threshold, but the step is not completable due to a different reason + thresholdPassed.set(true); + + String message = String.format(Locale.ROOT, "[%s] lifecycle step, as part of [%s] action, for index [%s] Is not " + + "completable, reason: [%s]. Abandoning execution and moving to the next fallback step [%s]", + getKey().getName(), + getKey().getAction(), + idxMeta.getIndex().getName(), + Strings.toString(stepResult.getInfomationContext()), + nextKeyOnThresholdBreach); + logger.debug(message); + + return new Result(true, new SingleMessageFieldInfo(message)); + } if (waitedMoreThanThresholdLevel(retryThreshold, lifecycleState, Clock.systemUTC())) { // we retried this step enough, next step will be the configured to {@code nextKeyOnThresholdBreach} thresholdPassed.set(true); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java index ba0b85d52689e..1490ee240d257 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.ClusterSettings; @@ -78,7 +79,8 @@ public void performAction(IndexMetadata indexMetadata, ClusterState clusterState new FilterAllocationDecider(clusterState.getMetadata().settings(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), new DataTierAllocationDecider(clusterState.getMetadata().settings(), new ClusterSettings(Settings.EMPTY, ALL_CLUSTER_SETTINGS)), - new NodeVersionAllocationDecider() + new NodeVersionAllocationDecider(), + new NodeShutdownAllocationDecider() )); final RoutingNodes routingNodes = clusterState.getRoutingNodes(); RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState, null, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStepTests.java index 0d26726807cc4..006a2a8be11c4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStepTests.java @@ -11,6 +11,8 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -19,6 +21,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -328,6 +331,110 @@ public void testExecuteIndexMissing() throws Exception { assertNull(actualResult.getInfomationContext()); } + public void testStepCompletableIfAllShardsActive() { + Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + Map requires = AllocateActionTests.randomMap(1, 5); + Settings.Builder existingSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT.id) + .put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._id", "node1") + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()); + Settings.Builder expectedSettings = Settings.builder(); + Settings.Builder node1Settings = Settings.builder(); + Settings.Builder node2Settings = Settings.builder(); + requires.forEach((k, v) -> { + existingSettings.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v); + expectedSettings.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v); + node1Settings.put(Node.NODE_ATTRIBUTES.getKey() + k, v); + }); + + IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index) + .addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED)); + + CheckShrinkReadyStep step = createRandomInstance(); + IndexMetadata indexMetadata = IndexMetadata.builder(index.getName()).settings(existingSettings).numberOfShards(1) + .numberOfReplicas(1).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata); + + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .metadata(Metadata.builder() + .indices(indices.build()) + .putCustom(NodesShutdownMetadata.TYPE, new NodesShutdownMetadata(Collections.singletonMap("node1", + SingleNodeShutdownMetadata.builder() + .setType(SingleNodeShutdownMetadata.Type.REMOVE) + .setStartedAtMillis(randomNonNegativeLong()) + .setReason("test") + .setNodeId("node1") + .build())))) + .nodes(DiscoveryNodes.builder() + .add(DiscoveryNode.createLocal(Settings.builder().put(node1Settings.build()) + .put(Node.NODE_NAME_SETTING.getKey(), "node1").build(), + new TransportAddress(TransportAddress.META_ADDRESS, 9200), + "node1")) + .add(DiscoveryNode.createLocal(Settings.builder().put(node2Settings.build()) + .put(Node.NODE_NAME_SETTING.getKey(), "node2").build(), + new TransportAddress(TransportAddress.META_ADDRESS, 9201), + "node2"))) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build(); + assertTrue(step.isCompletable()); + ClusterStateWaitStep.Result actualResult = step.isConditionMet(index, clusterState); + assertTrue(actualResult.isComplete()); + assertTrue(step.isCompletable()); + } + + public void testStepBecomesUncompletable() { + Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + Map requires = AllocateActionTests.randomMap(1, 5); + Settings.Builder existingSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT.id) + .put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._id", "node1") + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()); + Settings.Builder expectedSettings = Settings.builder(); + Settings.Builder node1Settings = Settings.builder(); + Settings.Builder node2Settings = Settings.builder(); + requires.forEach((k, v) -> { + existingSettings.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v); + expectedSettings.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v); + node1Settings.put(Node.NODE_ATTRIBUTES.getKey() + k, v); + }); + + IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index) + .addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.INITIALIZING)); + + CheckShrinkReadyStep step = createRandomInstance(); + IndexMetadata indexMetadata = IndexMetadata.builder(index.getName()).settings(existingSettings).numberOfShards(1) + .numberOfReplicas(1).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata); + + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .metadata(Metadata.builder() + .indices(indices.build()) + .putCustom(NodesShutdownMetadata.TYPE, new NodesShutdownMetadata(Collections.singletonMap("node1", + SingleNodeShutdownMetadata.builder() + .setType(SingleNodeShutdownMetadata.Type.REMOVE) + .setStartedAtMillis(randomNonNegativeLong()) + .setReason("test") + .setNodeId("node1") + .build())))) + .nodes(DiscoveryNodes.builder() + .add(DiscoveryNode.createLocal(Settings.builder().put(node1Settings.build()) + .put(Node.NODE_NAME_SETTING.getKey(), "node1").build(), + new TransportAddress(TransportAddress.META_ADDRESS, 9200), + "node1")) + .add(DiscoveryNode.createLocal(Settings.builder().put(node2Settings.build()) + .put(Node.NODE_NAME_SETTING.getKey(), "node2").build(), + new TransportAddress(TransportAddress.META_ADDRESS, 9201), + "node2"))) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build(); + assertTrue(step.isCompletable()); + ClusterStateWaitStep.Result actualResult = step.isConditionMet(index, clusterState); + assertFalse(actualResult.isComplete()); + assertThat(Strings.toString(actualResult.getInfomationContext()), + containsString("node with id [node1] is currently marked as shutting down")); + assertFalse(step.isCompletable()); + } + private void assertAllocateStatus(Index index, int shards, int replicas, CheckShrinkReadyStep step, Settings.Builder existingSettings, Settings.Builder node1Settings, Settings.Builder node2Settings, IndexRoutingTable.Builder indexRoutingTable, ClusterStateWaitStep.Result expectedResult) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitUntilThresholdStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitUntilThresholdStepTests.java index 1a6d6cc57a512..d3354f7f41897 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitUntilThresholdStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitUntilThresholdStepTests.java @@ -225,4 +225,61 @@ public void testWaitedMoreThanThresholdLevelMath() { assertThat(thresholdBreached, is(false)); } } + + public void testIsCompletableBreaches() { + IndexMetadata indexMetadata = IndexMetadata.builder("index") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .putCustom(ILM_CUSTOM_METADATA_KEY, Collections.singletonMap("step_time", String.valueOf(Clock.systemUTC().millis()))) + .build(); + + ClusterState clusterState = ClusterState.builder(new ClusterName("cluster")) + .metadata(Metadata.builder().put(indexMetadata, true).build()) + .build(); + + ClusterStateWaitUntilThresholdStep step = new ClusterStateWaitUntilThresholdStep( + new ClusterStateWaitStep(new StepKey("phase" , "action", "key"), + new StepKey("phase", "action", "next-key")) { + @Override + public Result isConditionMet(Index index, ClusterState clusterState) { + return new Result(false, new SingleMessageFieldInfo("")); + } + + @Override + public boolean isCompletable() { + return true; + } + + @Override + public boolean isRetryable() { + return true; + } + }, new StepKey("phase", "action", "breached")); + + assertFalse(step.isConditionMet(indexMetadata.getIndex(), clusterState).isComplete()); + + assertThat(step.getNextStepKey().getName(), equalTo("next-key")); + + step = new ClusterStateWaitUntilThresholdStep( + new ClusterStateWaitStep(new StepKey("phase" , "action", "key"), + new StepKey("phase", "action", "next-key")) { + @Override + public Result isConditionMet(Index index, ClusterState clusterState) { + return new Result(false, new SingleMessageFieldInfo("")); + } + + @Override + public boolean isCompletable() { + return false; + } + + @Override + public boolean isRetryable() { + return true; + } + }, new StepKey("phase", "action", "breached")); + assertTrue(step.isConditionMet(indexMetadata.getIndex(), clusterState).isComplete()); + assertThat(step.getNextStepKey().getName(), equalTo("breached")); + } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java index 0e2b74f006b09..0074348fad376 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; @@ -28,23 +29,33 @@ import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.IndexEventListener; +import org.elasticsearch.plugins.ShutdownAwarePlugin; +import org.elasticsearch.shutdown.PluginShutdownService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.XPackField; +import org.elasticsearch.xpack.core.ilm.CheckShrinkReadyStep; import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata; import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.ilm.OperationMode; import org.elasticsearch.xpack.core.ilm.RollupStep; +import org.elasticsearch.xpack.core.ilm.SetSingleNodeAllocateStep; +import org.elasticsearch.xpack.core.ilm.ShrinkAction; import org.elasticsearch.xpack.core.ilm.ShrinkStep; +import org.elasticsearch.xpack.core.ilm.ShrunkShardsAllocatedStep; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.ilm.history.ILMHistoryStore; import java.io.Closeable; import java.time.Clock; +import java.util.Collection; +import java.util.Collections; import java.util.Set; import java.util.function.LongSupplier; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.elasticsearch.xpack.core.ilm.IndexLifecycleOriginationDateParser.parseIndexNameAndExtractDate; import static org.elasticsearch.xpack.core.ilm.IndexLifecycleOriginationDateParser.shouldParseIndexName; @@ -53,7 +64,7 @@ * A service which runs the {@link LifecyclePolicy}s associated with indexes. */ public class IndexLifecycleService - implements ClusterStateListener, ClusterStateApplier, SchedulerEngine.Listener, Closeable, IndexEventListener { + implements ClusterStateListener, ClusterStateApplier, SchedulerEngine.Listener, Closeable, IndexEventListener, ShutdownAwarePlugin { private static final Logger logger = LogManager.getLogger(IndexLifecycleService.class); private static final Set IGNORE_STEPS_MAINTENANCE_REQUESTED = Sets.newHashSet(ShrinkStep.NAME, RollupStep.NAME); private volatile boolean isMaster = false; @@ -390,4 +401,56 @@ private boolean isClusterServiceStoppedOrClosed() { PolicyStepsRegistry getPolicyRegistry() { return policyRegistry; } + + static Set indicesOnShuttingDownNodesInDangerousStep(ClusterState state, String nodeId) { + final Set shutdownNodes = PluginShutdownService.shutdownTypeNodes(state, SingleNodeShutdownMetadata.Type.REMOVE); + if (shutdownNodes.isEmpty()) { + return Collections.emptySet(); + } + + Set indicesPreventingShutdown = StreamSupport.stream(state.metadata().indices().spliterator(), false) + // Filter out to only consider managed indices + .filter(indexToMetadata -> Strings.hasText(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexToMetadata.value.getSettings()))) + // Only look at indices in the shrink action + .filter(indexToMetadata -> + ShrinkAction.NAME.equals(LifecycleExecutionState.fromIndexMetadata(indexToMetadata.value).getAction())) + // Only look at indices on a step that may potentially be dangerous if we removed the node + .filter(indexToMetadata -> { + String step = LifecycleExecutionState.fromIndexMetadata(indexToMetadata.value).getStep(); + return SetSingleNodeAllocateStep.NAME.equals(step) || + CheckShrinkReadyStep.NAME.equals(step) || + ShrinkStep.NAME.equals(step) || + ShrunkShardsAllocatedStep.NAME.equals(step); + }) + // Only look at indices where the node picked for the shrink is the node marked as shutting down + .filter(indexToMetadata -> { + String nodePicked = indexToMetadata.value.getSettings() + .get(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id"); + return nodeId.equals(nodePicked); + }) + .map(indexToMetadata -> indexToMetadata.key) + .collect(Collectors.toSet()); + logger.trace("with nodes marked as shutdown for removal {}, indices {} are preventing shutdown", + shutdownNodes, indicesPreventingShutdown); + return indicesPreventingShutdown; + } + + @Override + public boolean safeToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shutdownType) { + switch (shutdownType) { + case RESTART: + // It is safe to restart during ILM operation + return true; + case REMOVE: + Set indices = indicesOnShuttingDownNodesInDangerousStep(clusterService.state(), nodeId); + return indices.isEmpty(); + default: + throw new IllegalArgumentException("unknown shutdown type: " + shutdownType); + } + } + + @Override + public void signalShutdown(Collection shutdownNodeIds) { + // TODO: in the future we could take proactive measures for when a shutdown is actually triggered + } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java index 64df62bcde0ee..07d43efde3b5a 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java @@ -17,6 +17,8 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -32,6 +34,8 @@ import org.elasticsearch.test.NodeRoles; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ilm.CheckShrinkReadyStep; +import org.elasticsearch.xpack.core.ilm.GenerateUniqueIndexNameStep; import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata; import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; @@ -40,8 +44,10 @@ import org.elasticsearch.xpack.core.ilm.MockAction; import org.elasticsearch.xpack.core.ilm.OperationMode; import org.elasticsearch.xpack.core.ilm.Phase; +import org.elasticsearch.xpack.core.ilm.SetSingleNodeAllocateStep; import org.elasticsearch.xpack.core.ilm.ShrinkAction; import org.elasticsearch.xpack.core.ilm.ShrinkStep; +import org.elasticsearch.xpack.core.ilm.ShrunkShardsAllocatedStep; import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.hamcrest.Description; @@ -497,4 +503,97 @@ public void testParsingOriginationDateBeforeIndexCreation() { fail("Did not expect the before index validation to throw an exception as the parse origination date setting was not set"); } } + + public void testIndicesOnShuttingDownNodesInDangerousStep() { + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).build(); + assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "regular_node"), + equalTo(Collections.emptySet())); + assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "shutdown_node"), + equalTo(Collections.emptySet())); + + IndexMetadata nonDangerousIndex = IndexMetadata.builder("no_danger") + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey(), "mypolicy")) + .putCustom(ILM_CUSTOM_METADATA_KEY, LifecycleExecutionState.builder() + .setPhase("warm") + .setAction("shrink") + .setStep(GenerateUniqueIndexNameStep.NAME) + .build().asMap()) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + IndexMetadata dangerousIndex = IndexMetadata.builder("danger") + .settings(settings(Version.CURRENT) + .put(LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey(), "mypolicy") + .put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", "shutdown_node")) + .putCustom(ILM_CUSTOM_METADATA_KEY, LifecycleExecutionState.builder() + .setPhase("warm") + .setAction("shrink") + .setStep(randomFrom(SetSingleNodeAllocateStep.NAME, CheckShrinkReadyStep.NAME, + ShrinkStep.NAME, ShrunkShardsAllocatedStep.NAME)) + .build().asMap()) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder() + .fPut("no_danger", nonDangerousIndex) + .fPut("danger", dangerousIndex); + + Metadata metadata = Metadata.builder() + .putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING)) + .indices(indices.build()) + .persistentSettings(settings(Version.CURRENT).build()) + .build(); + + state = ClusterState.builder(ClusterName.DEFAULT) + .metadata(metadata) + .nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId) + .add(masterNode) + .add(DiscoveryNode.createLocal( + NodeRoles.masterNode(settings(Version.CURRENT).build()), + new TransportAddress(TransportAddress.META_ADDRESS, 9301), + "regular_node")) + .add(DiscoveryNode.createLocal( + NodeRoles.masterNode(settings(Version.CURRENT).build()), + new TransportAddress(TransportAddress.META_ADDRESS, 9302), + "shutdown_node")) + .build()) + .build(); + + // No danger yet, because no node is shutting down + assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "regular_node"), + equalTo(Collections.emptySet())); + assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "shutdown_node"), + equalTo(Collections.emptySet())); + + state = ClusterState.builder(state) + .metadata(Metadata.builder(state.metadata()) + .putCustom(NodesShutdownMetadata.TYPE, new NodesShutdownMetadata(Collections.singletonMap("shutdown_node", + SingleNodeShutdownMetadata.builder() + .setNodeId("shutdown_node") + .setReason("shut down for test") + .setStartedAtMillis(randomNonNegativeLong()) + .setType(SingleNodeShutdownMetadata.Type.RESTART) + .build()))) + .build()) + .build(); + + assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "regular_node"), + equalTo(Collections.emptySet())); + // No danger, because this is a "RESTART" type shutdown + assertThat("restart type shutdowns are not considered dangerous", + IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "shutdown_node"), + equalTo(Collections.emptySet())); + + state = ClusterState.builder(state) + .metadata(Metadata.builder(state.metadata()) + .putCustom(NodesShutdownMetadata.TYPE, new NodesShutdownMetadata(Collections.singletonMap("shutdown_node", + SingleNodeShutdownMetadata.builder() + .setNodeId("shutdown_node") + .setReason("shut down for test") + .setStartedAtMillis(randomNonNegativeLong()) + .setType(SingleNodeShutdownMetadata.Type.REMOVE) + .build()))) + .build()) + .build(); + + // The dangerous index should be calculated as being in danger now + assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "shutdown_node"), + equalTo(Collections.singleton("danger"))); + } } diff --git a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java new file mode 100644 index 0000000000000..2291d0ef73aad --- /dev/null +++ b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java @@ -0,0 +1,134 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.shutdown; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.ShutdownAwarePlugin; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) +public class NodeShutdownPluginsIT extends ESIntegTestCase { + private static final Logger logger = LogManager.getLogger(NodeShutdownPluginsIT.class); + + public static final AtomicBoolean safe = new AtomicBoolean(true); + public static final AtomicReference> triggeredNodes = new AtomicReference<>(null); + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(ShutdownEnabledPlugin.class, TestShutdownAwarePlugin.class); + } + + public void testShutdownAwarePlugin() throws Exception { + // Start two nodes, one will be marked as shutting down + final String node1 = internalCluster().startNode(Settings.EMPTY); + final String node2 = internalCluster().startNode(Settings.EMPTY); + + final String shutdownNode; + final String remainNode; + NodesInfoResponse nodes = client().admin().cluster().prepareNodesInfo().clear().get(); + final String node1Id = nodes.getNodes() + .stream() + .map(NodeInfo::getNode) + .filter(node -> node.getName().equals(node1)) + .map(DiscoveryNode::getId) + .findFirst() + .orElseThrow(() -> new AssertionError("fail")); + final String node2Id = nodes.getNodes() + .stream() + .map(NodeInfo::getNode) + .filter(node -> node.getName().equals(node2)) + .map(DiscoveryNode::getId) + .findFirst() + .orElseThrow(() -> new AssertionError("fail")); + + if (randomBoolean()) { + shutdownNode = node1Id; + remainNode = node2Id; + } else { + shutdownNode = node2Id; + remainNode = node1Id; + } + logger.info("--> node {} will be shut down, {} will remain", shutdownNode, remainNode); + + // First, mark the plugin as not yet safe + safe.set(false); + + // Mark the node as shutting down + client().execute( + PutShutdownNodeAction.INSTANCE, + new PutShutdownNodeAction.Request(shutdownNode, SingleNodeShutdownMetadata.Type.REMOVE, "removal for testing") + ).get(); + + GetShutdownStatusAction.Response getResp = client().execute( + GetShutdownStatusAction.INSTANCE, + new GetShutdownStatusAction.Request(remainNode) + ).get(); + + assertTrue(getResp.getShutdownStatuses().isEmpty()); + + // The plugin should be in progress + getResp = client().execute(GetShutdownStatusAction.INSTANCE, new GetShutdownStatusAction.Request(shutdownNode)).get(); + assertThat( + getResp.getShutdownStatuses().get(0).pluginsStatus().getStatus(), + equalTo(SingleNodeShutdownMetadata.Status.IN_PROGRESS) + ); + + // Change the plugin to be "done" shutting down + safe.set(true); + + // The plugin should be complete + getResp = client().execute(GetShutdownStatusAction.INSTANCE, new GetShutdownStatusAction.Request(shutdownNode)).get(); + assertThat(getResp.getShutdownStatuses().get(0).pluginsStatus().getStatus(), equalTo(SingleNodeShutdownMetadata.Status.COMPLETE)); + + // The shutdown node should be in the triggered list + assertThat(triggeredNodes.get(), contains(shutdownNode)); + + client().execute(DeleteShutdownNodeAction.INSTANCE, new DeleteShutdownNodeAction.Request(shutdownNode)).get(); + + // The shutdown node should now not in the triggered list + assertThat(triggeredNodes.get(), empty()); + } + + public static class ShutdownEnabledPlugin extends ShutdownPlugin { + @Override + public boolean isEnabled() { + return true; + } + } + + public static class TestShutdownAwarePlugin extends Plugin implements ShutdownAwarePlugin { + + @Override + public boolean safeToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shutdownType) { + logger.info("--> checking whether safe to shutdown for node [{}], type [{}] answer: ({})", nodeId, shutdownType, safe.get()); + return safe.get(); + } + + @Override + public void signalShutdown(Collection shutdownNodeIds) { + logger.info("--> shutdown triggered for {}", shutdownNodeIds); + triggeredNodes.set(shutdownNodeIds); + } + } +} diff --git a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java index 94a10b12faabc..73db7488c3131 100644 --- a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java +++ b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java @@ -42,7 +42,7 @@ public static class Request extends AcknowledgedRequest { private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "put_node_shutdown_request", false, - (a, nodeId) -> new Request(nodeId, SingleNodeShutdownMetadata.Type.valueOf((String) a[0]), (String) a[1]) + (a, nodeId) -> new Request(nodeId, SingleNodeShutdownMetadata.Type.parse((String) a[0]), (String) a[1]) ); static { diff --git a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/SingleNodeShutdownStatus.java b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/SingleNodeShutdownStatus.java index c49d70abbbffa..7629242e28091 100644 --- a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/SingleNodeShutdownStatus.java +++ b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/SingleNodeShutdownStatus.java @@ -62,8 +62,23 @@ public void writeTo(StreamOutput out) throws IOException { } public SingleNodeShutdownMetadata.Status overallStatus() { - // TODO: make this value calculated based on the status of all other pieces - return SingleNodeShutdownMetadata.Status.IN_PROGRESS; + return SingleNodeShutdownMetadata.Status.combine( + migrationStatus().getStatus(), + pluginsStatus().getStatus(), + persistentTasksStatus().getStatus() + ); + } + + public ShutdownShardMigrationStatus migrationStatus() { + return this.shardMigrationStatus; + } + + public ShutdownPersistentTasksStatus persistentTasksStatus() { + return this.persistentTasksStatus; + } + + public ShutdownPluginsStatus pluginsStatus() { + return this.pluginsStatus; } @Override diff --git a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java index 6711d84c3fe85..7e050943600bd 100644 --- a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java +++ b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.shutdown.PluginShutdownService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -34,13 +35,17 @@ public class TransportGetShutdownStatusAction extends TransportMasterNodeAction< GetShutdownStatusAction.Request, GetShutdownStatusAction.Response> { + + private final PluginShutdownService pluginShutdownService; + @Inject public TransportGetShutdownStatusAction( TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver + IndexNameExpressionResolver indexNameExpressionResolver, + PluginShutdownService pluginShutdownService ) { super( GetShutdownStatusAction.NAME, @@ -53,6 +58,8 @@ public TransportGetShutdownStatusAction( GetShutdownStatusAction.Response::new, ThreadPool.Names.SAME ); + + this.pluginShutdownService = pluginShutdownService; } @Override @@ -75,7 +82,7 @@ protected void masterOperation( ns, new ShutdownShardMigrationStatus(), new ShutdownPersistentTasksStatus(), - new ShutdownPluginsStatus() + new ShutdownPluginsStatus(pluginShutdownService.readyToShutdown(ns.getNodeId(), ns.getType())) ) ) .collect(Collectors.toList()); @@ -91,7 +98,7 @@ protected void masterOperation( ns, new ShutdownShardMigrationStatus(), new ShutdownPersistentTasksStatus(), - new ShutdownPluginsStatus() + new ShutdownPluginsStatus(pluginShutdownService.readyToShutdown(ns.getNodeId(), ns.getType())) ) ) diff --git a/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/GetShutdownStatusResponseTests.java b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/GetShutdownStatusResponseTests.java index d4522b3f9519b..642502fe10e03 100644 --- a/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/GetShutdownStatusResponseTests.java +++ b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/GetShutdownStatusResponseTests.java @@ -59,7 +59,7 @@ public static SingleNodeShutdownStatus randomNodeShutdownStatus() { randomNodeShutdownMetadata(), new ShutdownShardMigrationStatus(), new ShutdownPersistentTasksStatus(), - new ShutdownPluginsStatus() + new ShutdownPluginsStatus(randomBoolean()) ); } From a716255a8e367846c836b1fbecd54f6c7018224c Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 7 Jun 2021 09:52:30 -0600 Subject: [PATCH 2/2] Adjust annotation --- .../org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java index 2291d0ef73aad..ed3281fc7b195 100644 --- a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java +++ b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java @@ -27,7 +27,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0) public class NodeShutdownPluginsIT extends ESIntegTestCase { private static final Logger logger = LogManager.getLogger(NodeShutdownPluginsIT.class);