diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPersistentTasksStatus.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPersistentTasksStatus.java index bee04444f9667..05430a9fa770f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPersistentTasksStatus.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPersistentTasksStatus.java @@ -22,11 +22,11 @@ public class ShutdownPersistentTasksStatus implements Writeable, ToXContentObjec private final SingleNodeShutdownMetadata.Status status; public ShutdownPersistentTasksStatus() { - this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS; + this.status = SingleNodeShutdownMetadata.Status.COMPLETE; } public ShutdownPersistentTasksStatus(StreamInput in) throws IOException { - this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS; + this.status = SingleNodeShutdownMetadata.Status.COMPLETE; } @Override @@ -42,6 +42,10 @@ public void writeTo(StreamOutput out) throws IOException { } + public SingleNodeShutdownMetadata.Status getStatus() { + return status; + } + @Override public int hashCode() { return status.hashCode(); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPluginsStatus.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPluginsStatus.java index be129c281632c..964f9f93b531b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPluginsStatus.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPluginsStatus.java @@ -21,12 +21,17 @@ public class ShutdownPluginsStatus implements Writeable, ToXContentObject { private final SingleNodeShutdownMetadata.Status status; - public ShutdownPluginsStatus() { - this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS; + public ShutdownPluginsStatus(boolean safeToShutdown) { + this.status = safeToShutdown ? SingleNodeShutdownMetadata.Status.COMPLETE : + SingleNodeShutdownMetadata.Status.IN_PROGRESS; } public ShutdownPluginsStatus(StreamInput in) throws IOException { - this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS; + this.status = in.readEnum(SingleNodeShutdownMetadata.Status.class); + } + + public SingleNodeShutdownMetadata.Status getStatus() { + return this.status; } @Override @@ -39,7 +44,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public void writeTo(StreamOutput out) throws IOException { - + out.writeEnum(this.status); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownShardMigrationStatus.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownShardMigrationStatus.java index 270f244a8f7dd..60065747f2249 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownShardMigrationStatus.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownShardMigrationStatus.java @@ -22,11 +22,11 @@ public class ShutdownShardMigrationStatus implements Writeable, ToXContentObject private final SingleNodeShutdownMetadata.Status status; public ShutdownShardMigrationStatus() { - this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS; + this.status = SingleNodeShutdownMetadata.Status.COMPLETE; } public ShutdownShardMigrationStatus(StreamInput in) throws IOException { - this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS; + this.status = SingleNodeShutdownMetadata.Status.COMPLETE; } @Override @@ -42,6 +42,10 @@ public void writeTo(StreamOutput out) throws IOException { } + public SingleNodeShutdownMetadata.Status getStatus() { + return status; + } + @Override public int hashCode() { return status.hashCode(); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java index 1d1caeead9076..8e61a80d3c8a2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; +import java.util.Locale; import java.util.Objects; /** @@ -234,16 +235,52 @@ public SingleNodeShutdownMetadata build() { */ public enum Type { REMOVE, - RESTART + RESTART; + + public static Type parse(String type) { + if ("remove".equals(type.toLowerCase(Locale.ROOT))) { + return REMOVE; + } else if ("restart".equals(type.toLowerCase(Locale.ROOT))) { + return RESTART; + } else { + throw new IllegalArgumentException("unknown shutdown type: " + type); + } + } } /** * Describes the status of a component of shutdown. */ public enum Status { + // These are ordered (see #combine(...)) NOT_STARTED, IN_PROGRESS, STALLED, - COMPLETE + COMPLETE; + + /** + * Merges multiple statuses into a single, final, status + * + * For example, if called with NOT_STARTED, IN_PROGRESS, and STALLED, the returned state is STALLED. + * Called with IN_PROGRESS, IN_PROGRESS, NOT_STARTED, the returned state is IN_PROGRESS. + * Called with IN_PROGRESS, NOT_STARTED, COMPLETE, the returned state is IN_PROGRESS + * Called with COMPLETE, COMPLETE, COMPLETE, the returned state is COMPLETE + * Called with an empty array, the returned state is COMPLETE + */ + public static Status combine(Status... statuses) { + int statusOrd = -1; + for (Status status : statuses) { + // Max the status up to, but not including, "complete" + if (status != COMPLETE) { + statusOrd = Math.max(status.ordinal(), statusOrd); + } + } + if (statusOrd == -1) { + // Either all the statuses were complete, or there were no statuses given + return COMPLETE; + } else { + return Status.values()[statusOrd]; + } + } } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index f0d78585e90a7..7b4b405c92868 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -140,6 +140,7 @@ import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.plugins.ScriptPlugin; import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.plugins.ShutdownAwarePlugin; import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.repositories.RepositoriesModule; import org.elasticsearch.repositories.RepositoriesService; @@ -152,6 +153,7 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.search.aggregations.support.AggregationUsageService; import org.elasticsearch.search.fetch.FetchPhase; +import org.elasticsearch.shutdown.PluginShutdownService; import org.elasticsearch.snapshots.InternalSnapshotsInfoService; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotShardsService; @@ -697,6 +699,10 @@ protected Node(final Environment initialEnvironment, resourcesToClose.add(persistentTasksClusterService); final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); + final List shutdownAwarePlugins = pluginsService.filterPlugins(ShutdownAwarePlugin.class); + final PluginShutdownService pluginShutdownService = new PluginShutdownService(shutdownAwarePlugins); + clusterService.addListener(pluginShutdownService); + modules.add(b -> { b.bind(Node.class).toInstance(this); b.bind(NodeService.class).toInstance(nodeService); @@ -759,6 +765,7 @@ protected Node(final Environment initialEnvironment, b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator); b.bind(FsHealthService.class).toInstance(fsHealthService); b.bind(SystemIndices.class).toInstance(systemIndices); + b.bind(PluginShutdownService.class).toInstance(pluginShutdownService); b.bind(ExecutorSelector.class).toInstance(executorSelector); } ); diff --git a/server/src/main/java/org/elasticsearch/plugins/ShutdownAwarePlugin.java b/server/src/main/java/org/elasticsearch/plugins/ShutdownAwarePlugin.java new file mode 100644 index 0000000000000..1603f1ab60bc1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/plugins/ShutdownAwarePlugin.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.plugins; + +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; + +import java.util.Collection; + +/** + * A {@link ShutdownAwarePlugin} is a plugin that can be made aware of a shutdown. It comprises two + * parts, one part used for telling plugins that a set of nodes are going to be shut down + * ({@link #signalShutdown(Collection)}), the other for retrieving the status of those plugins + * as to whether it is safe to shut down ({@link #safeToShutdown(String, SingleNodeShutdownMetadata.Type)} + */ +public interface ShutdownAwarePlugin { + + /** + * Whether the plugin is considered safe to shut down. This method is called when the status of + * a shutdown is retrieved via the API, and it is only called on the master node. + */ + boolean safeToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shutdownType); + + /** + * A trigger to notify the plugin that a shutdown for the nodes has been triggered. This method + * will be called on every node for each cluster state, so it should return quickly. + */ + void signalShutdown(Collection shutdownNodeIds); +} diff --git a/server/src/main/java/org/elasticsearch/shutdown/PluginShutdownService.java b/server/src/main/java/org/elasticsearch/shutdown/PluginShutdownService.java new file mode 100644 index 0000000000000..e84b3dbb1a721 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/shutdown/PluginShutdownService.java @@ -0,0 +1,108 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.shutdown; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.plugins.ShutdownAwarePlugin; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * The {@link PluginShutdownService} is used for the node shutdown infrastructure to signal to + * plugins that a shutdown is occurring, and to check whether it is safe to shut down. + */ +public class PluginShutdownService implements ClusterStateListener { + + private static final Logger logger = LogManager.getLogger(PluginShutdownService.class); + public List plugins; + + public PluginShutdownService(@Nullable List plugins) { + this.plugins = plugins == null ? Collections.emptyList() : plugins; + } + + /** + * Return all nodes shutting down from the given cluster state + */ + public static Set shutdownNodes(final ClusterState clusterState) { + return NodesShutdownMetadata.getShutdowns(clusterState) + .map(NodesShutdownMetadata::getAllNodeMetadataMap) + .map(Map::keySet) + .orElse(Collections.emptySet()); + } + + /** + * Return all nodes shutting down with the given shutdown type from the given cluster state + */ + public static Set shutdownTypeNodes(final ClusterState clusterState, final SingleNodeShutdownMetadata.Type shutdownType) { + return NodesShutdownMetadata.getShutdowns(clusterState) + .map(NodesShutdownMetadata::getAllNodeMetadataMap) + .map(m -> m.entrySet().stream() + .filter(e -> e.getValue().getType() == shutdownType) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) + .map(Map::keySet) + .orElse(Collections.emptySet()); + } + + /** + * Check with registered plugins whether the shutdown is safe for the given node id and type + */ + public boolean readyToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shutdownType) { + // TODO: consider adding debugging information (a message about why not?) + // TODO: consider adding more fine-grained status rather than true/false + for (ShutdownAwarePlugin plugin : plugins) { + try { + if (plugin.safeToShutdown(nodeId, shutdownType) == false) { + logger.trace("shutdown aware plugin [{}] is not yet ready for shutdown", plugin); + return false; + } + } catch (Exception e) { + logger.warn("uncaught exception when retrieving whether plugin is ready for node shutdown", e); + } + } + return true; + } + + /** + * Signal to plugins the nodes that are currently shutting down + */ + public void signalShutdown(final ClusterState state) { + Set shutdownNodes = shutdownNodes(state); + for (ShutdownAwarePlugin plugin : plugins) { + try { + plugin.signalShutdown(shutdownNodes); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("uncaught exception when notifying plugins of nodes {} shutdown", shutdownNodes), e); + } + } + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + signalShutdown(event.state()); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadataTests.java new file mode 100644 index 0000000000000..82438fb0a19f6 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadataTests.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.test.ESTestCase; + +public class SingleNodeShutdownMetadataTests extends ESTestCase { + public void testStatusComination() { + SingleNodeShutdownMetadata.Status status; + + status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.NOT_STARTED, + SingleNodeShutdownMetadata.Status.IN_PROGRESS, + SingleNodeShutdownMetadata.Status.STALLED); + assertEquals(status, SingleNodeShutdownMetadata.Status.STALLED); + + status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.NOT_STARTED, + SingleNodeShutdownMetadata.Status.IN_PROGRESS, + SingleNodeShutdownMetadata.Status.NOT_STARTED); + assertEquals(status, SingleNodeShutdownMetadata.Status.IN_PROGRESS); + + status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.NOT_STARTED, + SingleNodeShutdownMetadata.Status.NOT_STARTED, + SingleNodeShutdownMetadata.Status.NOT_STARTED); + assertEquals(status, SingleNodeShutdownMetadata.Status.NOT_STARTED); + + status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.IN_PROGRESS, + SingleNodeShutdownMetadata.Status.IN_PROGRESS, + SingleNodeShutdownMetadata.Status.COMPLETE); + assertEquals(status, SingleNodeShutdownMetadata.Status.IN_PROGRESS); + + status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.COMPLETE, + SingleNodeShutdownMetadata.Status.COMPLETE, + SingleNodeShutdownMetadata.Status.COMPLETE); + assertEquals(status, SingleNodeShutdownMetadata.Status.COMPLETE); + + status = SingleNodeShutdownMetadata.Status.combine(); + assertEquals(status, SingleNodeShutdownMetadata.Status.COMPLETE); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStep.java index 1c413ac1b8e3c..3203a03228628 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStep.java @@ -11,6 +11,8 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -20,6 +22,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.Index; +import org.elasticsearch.xpack.core.ilm.step.info.SingleMessageFieldInfo; import java.io.IOException; import java.util.Locale; @@ -34,6 +37,7 @@ public class CheckShrinkReadyStep extends ClusterStateWaitStep { public static final String NAME = "check-shrink-allocation"; private static final Logger logger = LogManager.getLogger(CheckShrinkReadyStep.class); + private boolean completable = true; CheckShrinkReadyStep(StepKey key, StepKey nextStepKey) { super(key, nextStepKey); @@ -44,6 +48,11 @@ public boolean isRetryable() { return true; } + @Override + public boolean isCompletable() { + return completable; + } + @Override public Result isConditionMet(Index index, ClusterState clusterState) { IndexMetadata idxMeta = clusterState.metadata().index(index); @@ -64,6 +73,12 @@ public Result isConditionMet(Index index, ClusterState clusterState) { throw new IllegalStateException("Cannot check shrink allocation as there are no allocation rules by _id"); } + boolean nodeBeingRemoved = NodesShutdownMetadata.getShutdowns(clusterState) + .map(NodesShutdownMetadata::getAllNodeMetadataMap) + .map(shutdownMetadataMap -> shutdownMetadataMap.get(idShardsShouldBeOn)) + .map(singleNodeShutdown -> singleNodeShutdown.getType() == SingleNodeShutdownMetadata.Type.REMOVE) + .orElse(false); + final IndexRoutingTable routingTable = clusterState.getRoutingTable().index(index); int foundShards = 0; for (ShardRouting shard : routingTable.shardsWithState(ShardRoutingState.STARTED)) { @@ -81,6 +96,12 @@ public Result isConditionMet(Index index, ClusterState clusterState) { index, expectedShardCount, idShardsShouldBeOn, getKey().getAction()); return new Result(true, null); } else { + if (nodeBeingRemoved) { + completable = false; + return new Result(false, new SingleMessageFieldInfo("node with id [" + idShardsShouldBeOn + + "] is currently marked as shutting down for removal")); + } + logger.trace("{} failed to find {} allocated shards (found {}) on node [{}] for shrink readiness ({})", index, expectedShardCount, foundShards, idShardsShouldBeOn, getKey().getAction()); return new Result(false, new CheckShrinkReadyStep.Info(idShardsShouldBeOn, expectedShardCount, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitStep.java index 702c99bf1e3e5..e6e95d1110a7f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitStep.java @@ -23,6 +23,16 @@ public ClusterStateWaitStep(StepKey key, StepKey nextStepKey) { public abstract Result isConditionMet(Index index, ClusterState clusterState); + /** + * Whether the step can be completed at all. This only affects the + * {@link ClusterStateWaitUntilThresholdStep} which waits for a threshold to be met before + * retrying. Setting this to false means that ILM should retry the sequence immediately without + * waiting for the threshold to be met. + */ + public boolean isCompletable() { + return true; + } + public static class Result { private final boolean complete; private final ToXContentObject infomationContext; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitUntilThresholdStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitUntilThresholdStep.java index 0fee72e060556..7ee3e2311420c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitUntilThresholdStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitUntilThresholdStep.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.xpack.core.ilm.step.info.SingleMessageFieldInfo; @@ -69,6 +70,21 @@ public Result isConditionMet(Index index, ClusterState clusterState) { // wonderful thing) TimeValue retryThreshold = LifecycleSettings.LIFECYCLE_STEP_WAIT_TIME_THRESHOLD_SETTING.get(idxMeta.getSettings()); LifecycleExecutionState lifecycleState = fromIndexMetadata(idxMeta); + if (stepToExecute.isCompletable() == false) { + // we may not have passed the time threshold, but the step is not completable due to a different reason + thresholdPassed.set(true); + + String message = String.format(Locale.ROOT, "[%s] lifecycle step, as part of [%s] action, for index [%s] Is not " + + "completable, reason: [%s]. Abandoning execution and moving to the next fallback step [%s]", + getKey().getName(), + getKey().getAction(), + idxMeta.getIndex().getName(), + Strings.toString(stepResult.getInfomationContext()), + nextKeyOnThresholdBreach); + logger.debug(message); + + return new Result(true, new SingleMessageFieldInfo(message)); + } if (waitedMoreThanThresholdLevel(retryThreshold, lifecycleState, Clock.systemUTC())) { // we retried this step enough, next step will be the configured to {@code nextKeyOnThresholdBreach} thresholdPassed.set(true); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java index ba0b85d52689e..1490ee240d257 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.ClusterSettings; @@ -78,7 +79,8 @@ public void performAction(IndexMetadata indexMetadata, ClusterState clusterState new FilterAllocationDecider(clusterState.getMetadata().settings(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), new DataTierAllocationDecider(clusterState.getMetadata().settings(), new ClusterSettings(Settings.EMPTY, ALL_CLUSTER_SETTINGS)), - new NodeVersionAllocationDecider() + new NodeVersionAllocationDecider(), + new NodeShutdownAllocationDecider() )); final RoutingNodes routingNodes = clusterState.getRoutingNodes(); RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState, null, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStepTests.java index 0d26726807cc4..006a2a8be11c4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStepTests.java @@ -11,6 +11,8 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -19,6 +21,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -328,6 +331,110 @@ public void testExecuteIndexMissing() throws Exception { assertNull(actualResult.getInfomationContext()); } + public void testStepCompletableIfAllShardsActive() { + Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + Map requires = AllocateActionTests.randomMap(1, 5); + Settings.Builder existingSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT.id) + .put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._id", "node1") + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()); + Settings.Builder expectedSettings = Settings.builder(); + Settings.Builder node1Settings = Settings.builder(); + Settings.Builder node2Settings = Settings.builder(); + requires.forEach((k, v) -> { + existingSettings.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v); + expectedSettings.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v); + node1Settings.put(Node.NODE_ATTRIBUTES.getKey() + k, v); + }); + + IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index) + .addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED)); + + CheckShrinkReadyStep step = createRandomInstance(); + IndexMetadata indexMetadata = IndexMetadata.builder(index.getName()).settings(existingSettings).numberOfShards(1) + .numberOfReplicas(1).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata); + + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .metadata(Metadata.builder() + .indices(indices.build()) + .putCustom(NodesShutdownMetadata.TYPE, new NodesShutdownMetadata(Collections.singletonMap("node1", + SingleNodeShutdownMetadata.builder() + .setType(SingleNodeShutdownMetadata.Type.REMOVE) + .setStartedAtMillis(randomNonNegativeLong()) + .setReason("test") + .setNodeId("node1") + .build())))) + .nodes(DiscoveryNodes.builder() + .add(DiscoveryNode.createLocal(Settings.builder().put(node1Settings.build()) + .put(Node.NODE_NAME_SETTING.getKey(), "node1").build(), + new TransportAddress(TransportAddress.META_ADDRESS, 9200), + "node1")) + .add(DiscoveryNode.createLocal(Settings.builder().put(node2Settings.build()) + .put(Node.NODE_NAME_SETTING.getKey(), "node2").build(), + new TransportAddress(TransportAddress.META_ADDRESS, 9201), + "node2"))) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build(); + assertTrue(step.isCompletable()); + ClusterStateWaitStep.Result actualResult = step.isConditionMet(index, clusterState); + assertTrue(actualResult.isComplete()); + assertTrue(step.isCompletable()); + } + + public void testStepBecomesUncompletable() { + Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + Map requires = AllocateActionTests.randomMap(1, 5); + Settings.Builder existingSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT.id) + .put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._id", "node1") + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()); + Settings.Builder expectedSettings = Settings.builder(); + Settings.Builder node1Settings = Settings.builder(); + Settings.Builder node2Settings = Settings.builder(); + requires.forEach((k, v) -> { + existingSettings.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v); + expectedSettings.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v); + node1Settings.put(Node.NODE_ATTRIBUTES.getKey() + k, v); + }); + + IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index) + .addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.INITIALIZING)); + + CheckShrinkReadyStep step = createRandomInstance(); + IndexMetadata indexMetadata = IndexMetadata.builder(index.getName()).settings(existingSettings).numberOfShards(1) + .numberOfReplicas(1).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata); + + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .metadata(Metadata.builder() + .indices(indices.build()) + .putCustom(NodesShutdownMetadata.TYPE, new NodesShutdownMetadata(Collections.singletonMap("node1", + SingleNodeShutdownMetadata.builder() + .setType(SingleNodeShutdownMetadata.Type.REMOVE) + .setStartedAtMillis(randomNonNegativeLong()) + .setReason("test") + .setNodeId("node1") + .build())))) + .nodes(DiscoveryNodes.builder() + .add(DiscoveryNode.createLocal(Settings.builder().put(node1Settings.build()) + .put(Node.NODE_NAME_SETTING.getKey(), "node1").build(), + new TransportAddress(TransportAddress.META_ADDRESS, 9200), + "node1")) + .add(DiscoveryNode.createLocal(Settings.builder().put(node2Settings.build()) + .put(Node.NODE_NAME_SETTING.getKey(), "node2").build(), + new TransportAddress(TransportAddress.META_ADDRESS, 9201), + "node2"))) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build(); + assertTrue(step.isCompletable()); + ClusterStateWaitStep.Result actualResult = step.isConditionMet(index, clusterState); + assertFalse(actualResult.isComplete()); + assertThat(Strings.toString(actualResult.getInfomationContext()), + containsString("node with id [node1] is currently marked as shutting down")); + assertFalse(step.isCompletable()); + } + private void assertAllocateStatus(Index index, int shards, int replicas, CheckShrinkReadyStep step, Settings.Builder existingSettings, Settings.Builder node1Settings, Settings.Builder node2Settings, IndexRoutingTable.Builder indexRoutingTable, ClusterStateWaitStep.Result expectedResult) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitUntilThresholdStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitUntilThresholdStepTests.java index 1a6d6cc57a512..d3354f7f41897 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitUntilThresholdStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ClusterStateWaitUntilThresholdStepTests.java @@ -225,4 +225,61 @@ public void testWaitedMoreThanThresholdLevelMath() { assertThat(thresholdBreached, is(false)); } } + + public void testIsCompletableBreaches() { + IndexMetadata indexMetadata = IndexMetadata.builder("index") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .putCustom(ILM_CUSTOM_METADATA_KEY, Collections.singletonMap("step_time", String.valueOf(Clock.systemUTC().millis()))) + .build(); + + ClusterState clusterState = ClusterState.builder(new ClusterName("cluster")) + .metadata(Metadata.builder().put(indexMetadata, true).build()) + .build(); + + ClusterStateWaitUntilThresholdStep step = new ClusterStateWaitUntilThresholdStep( + new ClusterStateWaitStep(new StepKey("phase" , "action", "key"), + new StepKey("phase", "action", "next-key")) { + @Override + public Result isConditionMet(Index index, ClusterState clusterState) { + return new Result(false, new SingleMessageFieldInfo("")); + } + + @Override + public boolean isCompletable() { + return true; + } + + @Override + public boolean isRetryable() { + return true; + } + }, new StepKey("phase", "action", "breached")); + + assertFalse(step.isConditionMet(indexMetadata.getIndex(), clusterState).isComplete()); + + assertThat(step.getNextStepKey().getName(), equalTo("next-key")); + + step = new ClusterStateWaitUntilThresholdStep( + new ClusterStateWaitStep(new StepKey("phase" , "action", "key"), + new StepKey("phase", "action", "next-key")) { + @Override + public Result isConditionMet(Index index, ClusterState clusterState) { + return new Result(false, new SingleMessageFieldInfo("")); + } + + @Override + public boolean isCompletable() { + return false; + } + + @Override + public boolean isRetryable() { + return true; + } + }, new StepKey("phase", "action", "breached")); + assertTrue(step.isConditionMet(indexMetadata.getIndex(), clusterState).isComplete()); + assertThat(step.getNextStepKey().getName(), equalTo("breached")); + } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java index 0e2b74f006b09..0074348fad376 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; @@ -28,23 +29,33 @@ import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.IndexEventListener; +import org.elasticsearch.plugins.ShutdownAwarePlugin; +import org.elasticsearch.shutdown.PluginShutdownService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.XPackField; +import org.elasticsearch.xpack.core.ilm.CheckShrinkReadyStep; import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata; import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.ilm.OperationMode; import org.elasticsearch.xpack.core.ilm.RollupStep; +import org.elasticsearch.xpack.core.ilm.SetSingleNodeAllocateStep; +import org.elasticsearch.xpack.core.ilm.ShrinkAction; import org.elasticsearch.xpack.core.ilm.ShrinkStep; +import org.elasticsearch.xpack.core.ilm.ShrunkShardsAllocatedStep; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.ilm.history.ILMHistoryStore; import java.io.Closeable; import java.time.Clock; +import java.util.Collection; +import java.util.Collections; import java.util.Set; import java.util.function.LongSupplier; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.elasticsearch.xpack.core.ilm.IndexLifecycleOriginationDateParser.parseIndexNameAndExtractDate; import static org.elasticsearch.xpack.core.ilm.IndexLifecycleOriginationDateParser.shouldParseIndexName; @@ -53,7 +64,7 @@ * A service which runs the {@link LifecyclePolicy}s associated with indexes. */ public class IndexLifecycleService - implements ClusterStateListener, ClusterStateApplier, SchedulerEngine.Listener, Closeable, IndexEventListener { + implements ClusterStateListener, ClusterStateApplier, SchedulerEngine.Listener, Closeable, IndexEventListener, ShutdownAwarePlugin { private static final Logger logger = LogManager.getLogger(IndexLifecycleService.class); private static final Set IGNORE_STEPS_MAINTENANCE_REQUESTED = Sets.newHashSet(ShrinkStep.NAME, RollupStep.NAME); private volatile boolean isMaster = false; @@ -390,4 +401,56 @@ private boolean isClusterServiceStoppedOrClosed() { PolicyStepsRegistry getPolicyRegistry() { return policyRegistry; } + + static Set indicesOnShuttingDownNodesInDangerousStep(ClusterState state, String nodeId) { + final Set shutdownNodes = PluginShutdownService.shutdownTypeNodes(state, SingleNodeShutdownMetadata.Type.REMOVE); + if (shutdownNodes.isEmpty()) { + return Collections.emptySet(); + } + + Set indicesPreventingShutdown = StreamSupport.stream(state.metadata().indices().spliterator(), false) + // Filter out to only consider managed indices + .filter(indexToMetadata -> Strings.hasText(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexToMetadata.value.getSettings()))) + // Only look at indices in the shrink action + .filter(indexToMetadata -> + ShrinkAction.NAME.equals(LifecycleExecutionState.fromIndexMetadata(indexToMetadata.value).getAction())) + // Only look at indices on a step that may potentially be dangerous if we removed the node + .filter(indexToMetadata -> { + String step = LifecycleExecutionState.fromIndexMetadata(indexToMetadata.value).getStep(); + return SetSingleNodeAllocateStep.NAME.equals(step) || + CheckShrinkReadyStep.NAME.equals(step) || + ShrinkStep.NAME.equals(step) || + ShrunkShardsAllocatedStep.NAME.equals(step); + }) + // Only look at indices where the node picked for the shrink is the node marked as shutting down + .filter(indexToMetadata -> { + String nodePicked = indexToMetadata.value.getSettings() + .get(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id"); + return nodeId.equals(nodePicked); + }) + .map(indexToMetadata -> indexToMetadata.key) + .collect(Collectors.toSet()); + logger.trace("with nodes marked as shutdown for removal {}, indices {} are preventing shutdown", + shutdownNodes, indicesPreventingShutdown); + return indicesPreventingShutdown; + } + + @Override + public boolean safeToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shutdownType) { + switch (shutdownType) { + case RESTART: + // It is safe to restart during ILM operation + return true; + case REMOVE: + Set indices = indicesOnShuttingDownNodesInDangerousStep(clusterService.state(), nodeId); + return indices.isEmpty(); + default: + throw new IllegalArgumentException("unknown shutdown type: " + shutdownType); + } + } + + @Override + public void signalShutdown(Collection shutdownNodeIds) { + // TODO: in the future we could take proactive measures for when a shutdown is actually triggered + } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java index 64df62bcde0ee..07d43efde3b5a 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java @@ -17,6 +17,8 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -32,6 +34,8 @@ import org.elasticsearch.test.NodeRoles; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ilm.CheckShrinkReadyStep; +import org.elasticsearch.xpack.core.ilm.GenerateUniqueIndexNameStep; import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata; import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; @@ -40,8 +44,10 @@ import org.elasticsearch.xpack.core.ilm.MockAction; import org.elasticsearch.xpack.core.ilm.OperationMode; import org.elasticsearch.xpack.core.ilm.Phase; +import org.elasticsearch.xpack.core.ilm.SetSingleNodeAllocateStep; import org.elasticsearch.xpack.core.ilm.ShrinkAction; import org.elasticsearch.xpack.core.ilm.ShrinkStep; +import org.elasticsearch.xpack.core.ilm.ShrunkShardsAllocatedStep; import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.hamcrest.Description; @@ -497,4 +503,97 @@ public void testParsingOriginationDateBeforeIndexCreation() { fail("Did not expect the before index validation to throw an exception as the parse origination date setting was not set"); } } + + public void testIndicesOnShuttingDownNodesInDangerousStep() { + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).build(); + assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "regular_node"), + equalTo(Collections.emptySet())); + assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "shutdown_node"), + equalTo(Collections.emptySet())); + + IndexMetadata nonDangerousIndex = IndexMetadata.builder("no_danger") + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey(), "mypolicy")) + .putCustom(ILM_CUSTOM_METADATA_KEY, LifecycleExecutionState.builder() + .setPhase("warm") + .setAction("shrink") + .setStep(GenerateUniqueIndexNameStep.NAME) + .build().asMap()) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + IndexMetadata dangerousIndex = IndexMetadata.builder("danger") + .settings(settings(Version.CURRENT) + .put(LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey(), "mypolicy") + .put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", "shutdown_node")) + .putCustom(ILM_CUSTOM_METADATA_KEY, LifecycleExecutionState.builder() + .setPhase("warm") + .setAction("shrink") + .setStep(randomFrom(SetSingleNodeAllocateStep.NAME, CheckShrinkReadyStep.NAME, + ShrinkStep.NAME, ShrunkShardsAllocatedStep.NAME)) + .build().asMap()) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder() + .fPut("no_danger", nonDangerousIndex) + .fPut("danger", dangerousIndex); + + Metadata metadata = Metadata.builder() + .putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING)) + .indices(indices.build()) + .persistentSettings(settings(Version.CURRENT).build()) + .build(); + + state = ClusterState.builder(ClusterName.DEFAULT) + .metadata(metadata) + .nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId) + .add(masterNode) + .add(DiscoveryNode.createLocal( + NodeRoles.masterNode(settings(Version.CURRENT).build()), + new TransportAddress(TransportAddress.META_ADDRESS, 9301), + "regular_node")) + .add(DiscoveryNode.createLocal( + NodeRoles.masterNode(settings(Version.CURRENT).build()), + new TransportAddress(TransportAddress.META_ADDRESS, 9302), + "shutdown_node")) + .build()) + .build(); + + // No danger yet, because no node is shutting down + assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "regular_node"), + equalTo(Collections.emptySet())); + assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "shutdown_node"), + equalTo(Collections.emptySet())); + + state = ClusterState.builder(state) + .metadata(Metadata.builder(state.metadata()) + .putCustom(NodesShutdownMetadata.TYPE, new NodesShutdownMetadata(Collections.singletonMap("shutdown_node", + SingleNodeShutdownMetadata.builder() + .setNodeId("shutdown_node") + .setReason("shut down for test") + .setStartedAtMillis(randomNonNegativeLong()) + .setType(SingleNodeShutdownMetadata.Type.RESTART) + .build()))) + .build()) + .build(); + + assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "regular_node"), + equalTo(Collections.emptySet())); + // No danger, because this is a "RESTART" type shutdown + assertThat("restart type shutdowns are not considered dangerous", + IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "shutdown_node"), + equalTo(Collections.emptySet())); + + state = ClusterState.builder(state) + .metadata(Metadata.builder(state.metadata()) + .putCustom(NodesShutdownMetadata.TYPE, new NodesShutdownMetadata(Collections.singletonMap("shutdown_node", + SingleNodeShutdownMetadata.builder() + .setNodeId("shutdown_node") + .setReason("shut down for test") + .setStartedAtMillis(randomNonNegativeLong()) + .setType(SingleNodeShutdownMetadata.Type.REMOVE) + .build()))) + .build()) + .build(); + + // The dangerous index should be calculated as being in danger now + assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "shutdown_node"), + equalTo(Collections.singleton("danger"))); + } } diff --git a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java new file mode 100644 index 0000000000000..ed3281fc7b195 --- /dev/null +++ b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java @@ -0,0 +1,134 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.shutdown; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.ShutdownAwarePlugin; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0) +public class NodeShutdownPluginsIT extends ESIntegTestCase { + private static final Logger logger = LogManager.getLogger(NodeShutdownPluginsIT.class); + + public static final AtomicBoolean safe = new AtomicBoolean(true); + public static final AtomicReference> triggeredNodes = new AtomicReference<>(null); + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(ShutdownEnabledPlugin.class, TestShutdownAwarePlugin.class); + } + + public void testShutdownAwarePlugin() throws Exception { + // Start two nodes, one will be marked as shutting down + final String node1 = internalCluster().startNode(Settings.EMPTY); + final String node2 = internalCluster().startNode(Settings.EMPTY); + + final String shutdownNode; + final String remainNode; + NodesInfoResponse nodes = client().admin().cluster().prepareNodesInfo().clear().get(); + final String node1Id = nodes.getNodes() + .stream() + .map(NodeInfo::getNode) + .filter(node -> node.getName().equals(node1)) + .map(DiscoveryNode::getId) + .findFirst() + .orElseThrow(() -> new AssertionError("fail")); + final String node2Id = nodes.getNodes() + .stream() + .map(NodeInfo::getNode) + .filter(node -> node.getName().equals(node2)) + .map(DiscoveryNode::getId) + .findFirst() + .orElseThrow(() -> new AssertionError("fail")); + + if (randomBoolean()) { + shutdownNode = node1Id; + remainNode = node2Id; + } else { + shutdownNode = node2Id; + remainNode = node1Id; + } + logger.info("--> node {} will be shut down, {} will remain", shutdownNode, remainNode); + + // First, mark the plugin as not yet safe + safe.set(false); + + // Mark the node as shutting down + client().execute( + PutShutdownNodeAction.INSTANCE, + new PutShutdownNodeAction.Request(shutdownNode, SingleNodeShutdownMetadata.Type.REMOVE, "removal for testing") + ).get(); + + GetShutdownStatusAction.Response getResp = client().execute( + GetShutdownStatusAction.INSTANCE, + new GetShutdownStatusAction.Request(remainNode) + ).get(); + + assertTrue(getResp.getShutdownStatuses().isEmpty()); + + // The plugin should be in progress + getResp = client().execute(GetShutdownStatusAction.INSTANCE, new GetShutdownStatusAction.Request(shutdownNode)).get(); + assertThat( + getResp.getShutdownStatuses().get(0).pluginsStatus().getStatus(), + equalTo(SingleNodeShutdownMetadata.Status.IN_PROGRESS) + ); + + // Change the plugin to be "done" shutting down + safe.set(true); + + // The plugin should be complete + getResp = client().execute(GetShutdownStatusAction.INSTANCE, new GetShutdownStatusAction.Request(shutdownNode)).get(); + assertThat(getResp.getShutdownStatuses().get(0).pluginsStatus().getStatus(), equalTo(SingleNodeShutdownMetadata.Status.COMPLETE)); + + // The shutdown node should be in the triggered list + assertThat(triggeredNodes.get(), contains(shutdownNode)); + + client().execute(DeleteShutdownNodeAction.INSTANCE, new DeleteShutdownNodeAction.Request(shutdownNode)).get(); + + // The shutdown node should now not in the triggered list + assertThat(triggeredNodes.get(), empty()); + } + + public static class ShutdownEnabledPlugin extends ShutdownPlugin { + @Override + public boolean isEnabled() { + return true; + } + } + + public static class TestShutdownAwarePlugin extends Plugin implements ShutdownAwarePlugin { + + @Override + public boolean safeToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shutdownType) { + logger.info("--> checking whether safe to shutdown for node [{}], type [{}] answer: ({})", nodeId, shutdownType, safe.get()); + return safe.get(); + } + + @Override + public void signalShutdown(Collection shutdownNodeIds) { + logger.info("--> shutdown triggered for {}", shutdownNodeIds); + triggeredNodes.set(shutdownNodeIds); + } + } +} diff --git a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java index 94a10b12faabc..73db7488c3131 100644 --- a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java +++ b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java @@ -42,7 +42,7 @@ public static class Request extends AcknowledgedRequest { private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "put_node_shutdown_request", false, - (a, nodeId) -> new Request(nodeId, SingleNodeShutdownMetadata.Type.valueOf((String) a[0]), (String) a[1]) + (a, nodeId) -> new Request(nodeId, SingleNodeShutdownMetadata.Type.parse((String) a[0]), (String) a[1]) ); static { diff --git a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/SingleNodeShutdownStatus.java b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/SingleNodeShutdownStatus.java index c49d70abbbffa..7629242e28091 100644 --- a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/SingleNodeShutdownStatus.java +++ b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/SingleNodeShutdownStatus.java @@ -62,8 +62,23 @@ public void writeTo(StreamOutput out) throws IOException { } public SingleNodeShutdownMetadata.Status overallStatus() { - // TODO: make this value calculated based on the status of all other pieces - return SingleNodeShutdownMetadata.Status.IN_PROGRESS; + return SingleNodeShutdownMetadata.Status.combine( + migrationStatus().getStatus(), + pluginsStatus().getStatus(), + persistentTasksStatus().getStatus() + ); + } + + public ShutdownShardMigrationStatus migrationStatus() { + return this.shardMigrationStatus; + } + + public ShutdownPersistentTasksStatus persistentTasksStatus() { + return this.persistentTasksStatus; + } + + public ShutdownPluginsStatus pluginsStatus() { + return this.pluginsStatus; } @Override diff --git a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java index 6711d84c3fe85..7e050943600bd 100644 --- a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java +++ b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.shutdown.PluginShutdownService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -34,13 +35,17 @@ public class TransportGetShutdownStatusAction extends TransportMasterNodeAction< GetShutdownStatusAction.Request, GetShutdownStatusAction.Response> { + + private final PluginShutdownService pluginShutdownService; + @Inject public TransportGetShutdownStatusAction( TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver + IndexNameExpressionResolver indexNameExpressionResolver, + PluginShutdownService pluginShutdownService ) { super( GetShutdownStatusAction.NAME, @@ -53,6 +58,8 @@ public TransportGetShutdownStatusAction( GetShutdownStatusAction.Response::new, ThreadPool.Names.SAME ); + + this.pluginShutdownService = pluginShutdownService; } @Override @@ -75,7 +82,7 @@ protected void masterOperation( ns, new ShutdownShardMigrationStatus(), new ShutdownPersistentTasksStatus(), - new ShutdownPluginsStatus() + new ShutdownPluginsStatus(pluginShutdownService.readyToShutdown(ns.getNodeId(), ns.getType())) ) ) .collect(Collectors.toList()); @@ -91,7 +98,7 @@ protected void masterOperation( ns, new ShutdownShardMigrationStatus(), new ShutdownPersistentTasksStatus(), - new ShutdownPluginsStatus() + new ShutdownPluginsStatus(pluginShutdownService.readyToShutdown(ns.getNodeId(), ns.getType())) ) ) diff --git a/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/GetShutdownStatusResponseTests.java b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/GetShutdownStatusResponseTests.java index d4522b3f9519b..642502fe10e03 100644 --- a/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/GetShutdownStatusResponseTests.java +++ b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/GetShutdownStatusResponseTests.java @@ -59,7 +59,7 @@ public static SingleNodeShutdownStatus randomNodeShutdownStatus() { randomNodeShutdownMetadata(), new ShutdownShardMigrationStatus(), new ShutdownPersistentTasksStatus(), - new ShutdownPluginsStatus() + new ShutdownPluginsStatus(randomBoolean()) ); }