Skip to content

Commit f67c0f0

Browse files
[7.x] Make ILM aware of node shutdown (#73690) (#73847)
* Make ILM aware of node shutdown (#73690) This commit makes ILM aware of different parts of the node shutdown lifecycle. It consists are two main parts, reacting to the state during execution, and signaling the status of shutdown from ILM. Reacting to shutdown state ILM now considers nodes that are going to be shut down when deciding which node to assign for the shrink action. It uses the `NodeShutdownAllocationDecider` within the `SetSingleNodeAllocateStep` to not assign shards to a node that will be removed. If an index is already past this step and waiting for allocation, this commit adds an `isCompletable` method to the `ClusterStateWaitUntilThresholdStep` so that an allocation that cannot happen can be rewound and retried on another (non-shutdown) node. Signaling shutdown status This commit introduces the `PluginShutdownService` which deals with `ShutdownAwarePlugin` classes. This class is used to signal shutdowns to plugins, and also to gather the status of a shutdown from these plugins. ILM implements this `ShutdownAwarePlugin` to signal if an index is in a step that is unsafe, such as the actual shrink step, so that shutdown will wait until after the allocation rules have been removed by ILM. This commit also hooks up the get shutdown API response to consider the statuses of its parts (see `SingleNodeShutdownMetadata.Status#combine`) when creating a response. Relates to #70338 * Adjust annotation Co-authored-by: Elastic Machine <[email protected]>
1 parent 67663df commit f67c0f0

File tree

21 files changed

+794
-19
lines changed

21 files changed

+794
-19
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPersistentTasksStatus.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ public class ShutdownPersistentTasksStatus implements Writeable, ToXContentObjec
2222
private final SingleNodeShutdownMetadata.Status status;
2323

2424
public ShutdownPersistentTasksStatus() {
25-
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
25+
this.status = SingleNodeShutdownMetadata.Status.COMPLETE;
2626
}
2727

2828
public ShutdownPersistentTasksStatus(StreamInput in) throws IOException {
29-
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
29+
this.status = SingleNodeShutdownMetadata.Status.COMPLETE;
3030
}
3131

3232
@Override
@@ -42,6 +42,10 @@ public void writeTo(StreamOutput out) throws IOException {
4242

4343
}
4444

45+
public SingleNodeShutdownMetadata.Status getStatus() {
46+
return status;
47+
}
48+
4549
@Override
4650
public int hashCode() {
4751
return status.hashCode();

server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPluginsStatus.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,17 @@ public class ShutdownPluginsStatus implements Writeable, ToXContentObject {
2121

2222
private final SingleNodeShutdownMetadata.Status status;
2323

24-
public ShutdownPluginsStatus() {
25-
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
24+
public ShutdownPluginsStatus(boolean safeToShutdown) {
25+
this.status = safeToShutdown ? SingleNodeShutdownMetadata.Status.COMPLETE :
26+
SingleNodeShutdownMetadata.Status.IN_PROGRESS;
2627
}
2728

2829
public ShutdownPluginsStatus(StreamInput in) throws IOException {
29-
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
30+
this.status = in.readEnum(SingleNodeShutdownMetadata.Status.class);
31+
}
32+
33+
public SingleNodeShutdownMetadata.Status getStatus() {
34+
return this.status;
3035
}
3136

3237
@Override
@@ -39,7 +44,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
3944

4045
@Override
4146
public void writeTo(StreamOutput out) throws IOException {
42-
47+
out.writeEnum(this.status);
4348
}
4449

4550
@Override

server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownShardMigrationStatus.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ public class ShutdownShardMigrationStatus implements Writeable, ToXContentObject
2222
private final SingleNodeShutdownMetadata.Status status;
2323

2424
public ShutdownShardMigrationStatus() {
25-
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
25+
this.status = SingleNodeShutdownMetadata.Status.COMPLETE;
2626
}
2727

2828
public ShutdownShardMigrationStatus(StreamInput in) throws IOException {
29-
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
29+
this.status = SingleNodeShutdownMetadata.Status.COMPLETE;
3030
}
3131

3232
@Override
@@ -42,6 +42,10 @@ public void writeTo(StreamOutput out) throws IOException {
4242

4343
}
4444

45+
public SingleNodeShutdownMetadata.Status getStatus() {
46+
return status;
47+
}
48+
4549
@Override
4650
public int hashCode() {
4751
return status.hashCode();

server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.xcontent.XContentParser;
2020

2121
import java.io.IOException;
22+
import java.util.Locale;
2223
import java.util.Objects;
2324

2425
/**
@@ -234,16 +235,52 @@ public SingleNodeShutdownMetadata build() {
234235
*/
235236
public enum Type {
236237
REMOVE,
237-
RESTART
238+
RESTART;
239+
240+
public static Type parse(String type) {
241+
if ("remove".equals(type.toLowerCase(Locale.ROOT))) {
242+
return REMOVE;
243+
} else if ("restart".equals(type.toLowerCase(Locale.ROOT))) {
244+
return RESTART;
245+
} else {
246+
throw new IllegalArgumentException("unknown shutdown type: " + type);
247+
}
248+
}
238249
}
239250

240251
/**
241252
* Describes the status of a component of shutdown.
242253
*/
243254
public enum Status {
255+
// These are ordered (see #combine(...))
244256
NOT_STARTED,
245257
IN_PROGRESS,
246258
STALLED,
247-
COMPLETE
259+
COMPLETE;
260+
261+
/**
262+
* Merges multiple statuses into a single, final, status
263+
*
264+
* For example, if called with NOT_STARTED, IN_PROGRESS, and STALLED, the returned state is STALLED.
265+
* Called with IN_PROGRESS, IN_PROGRESS, NOT_STARTED, the returned state is IN_PROGRESS.
266+
* Called with IN_PROGRESS, NOT_STARTED, COMPLETE, the returned state is IN_PROGRESS
267+
* Called with COMPLETE, COMPLETE, COMPLETE, the returned state is COMPLETE
268+
* Called with an empty array, the returned state is COMPLETE
269+
*/
270+
public static Status combine(Status... statuses) {
271+
int statusOrd = -1;
272+
for (Status status : statuses) {
273+
// Max the status up to, but not including, "complete"
274+
if (status != COMPLETE) {
275+
statusOrd = Math.max(status.ordinal(), statusOrd);
276+
}
277+
}
278+
if (statusOrd == -1) {
279+
// Either all the statuses were complete, or there were no statuses given
280+
return COMPLETE;
281+
} else {
282+
return Status.values()[statusOrd];
283+
}
284+
}
248285
}
249286
}

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@
140140
import org.elasticsearch.plugins.RepositoryPlugin;
141141
import org.elasticsearch.plugins.ScriptPlugin;
142142
import org.elasticsearch.plugins.SearchPlugin;
143+
import org.elasticsearch.plugins.ShutdownAwarePlugin;
143144
import org.elasticsearch.plugins.SystemIndexPlugin;
144145
import org.elasticsearch.repositories.RepositoriesModule;
145146
import org.elasticsearch.repositories.RepositoriesService;
@@ -152,6 +153,7 @@
152153
import org.elasticsearch.search.SearchService;
153154
import org.elasticsearch.search.aggregations.support.AggregationUsageService;
154155
import org.elasticsearch.search.fetch.FetchPhase;
156+
import org.elasticsearch.shutdown.PluginShutdownService;
155157
import org.elasticsearch.snapshots.InternalSnapshotsInfoService;
156158
import org.elasticsearch.snapshots.RestoreService;
157159
import org.elasticsearch.snapshots.SnapshotShardsService;
@@ -697,6 +699,10 @@ protected Node(final Environment initialEnvironment,
697699
resourcesToClose.add(persistentTasksClusterService);
698700
final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
699701

702+
final List<ShutdownAwarePlugin> shutdownAwarePlugins = pluginsService.filterPlugins(ShutdownAwarePlugin.class);
703+
final PluginShutdownService pluginShutdownService = new PluginShutdownService(shutdownAwarePlugins);
704+
clusterService.addListener(pluginShutdownService);
705+
700706
modules.add(b -> {
701707
b.bind(Node.class).toInstance(this);
702708
b.bind(NodeService.class).toInstance(nodeService);
@@ -759,6 +765,7 @@ protected Node(final Environment initialEnvironment,
759765
b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
760766
b.bind(FsHealthService.class).toInstance(fsHealthService);
761767
b.bind(SystemIndices.class).toInstance(systemIndices);
768+
b.bind(PluginShutdownService.class).toInstance(pluginShutdownService);
762769
b.bind(ExecutorSelector.class).toInstance(executorSelector);
763770
}
764771
);
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.plugins;
10+
11+
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
12+
13+
import java.util.Collection;
14+
15+
/**
16+
* A {@link ShutdownAwarePlugin} is a plugin that can be made aware of a shutdown. It comprises two
17+
* parts, one part used for telling plugins that a set of nodes are going to be shut down
18+
* ({@link #signalShutdown(Collection)}), the other for retrieving the status of those plugins
19+
* as to whether it is safe to shut down ({@link #safeToShutdown(String, SingleNodeShutdownMetadata.Type)}
20+
*/
21+
public interface ShutdownAwarePlugin {
22+
23+
/**
24+
* Whether the plugin is considered safe to shut down. This method is called when the status of
25+
* a shutdown is retrieved via the API, and it is only called on the master node.
26+
*/
27+
boolean safeToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shutdownType);
28+
29+
/**
30+
* A trigger to notify the plugin that a shutdown for the nodes has been triggered. This method
31+
* will be called on every node for each cluster state, so it should return quickly.
32+
*/
33+
void signalShutdown(Collection<String> shutdownNodeIds);
34+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
/*
10+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
11+
* or more contributor license agreements. Licensed under the Elastic License
12+
* 2.0; you may not use this file except in compliance with the Elastic License
13+
* 2.0.
14+
*/
15+
16+
package org.elasticsearch.shutdown;
17+
18+
import org.apache.logging.log4j.LogManager;
19+
import org.apache.logging.log4j.Logger;
20+
import org.apache.logging.log4j.message.ParameterizedMessage;
21+
import org.elasticsearch.cluster.ClusterChangedEvent;
22+
import org.elasticsearch.cluster.ClusterState;
23+
import org.elasticsearch.cluster.ClusterStateListener;
24+
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
25+
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
26+
import org.elasticsearch.common.Nullable;
27+
import org.elasticsearch.plugins.ShutdownAwarePlugin;
28+
29+
import java.util.Collections;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.Set;
33+
import java.util.stream.Collectors;
34+
35+
/**
36+
* The {@link PluginShutdownService} is used for the node shutdown infrastructure to signal to
37+
* plugins that a shutdown is occurring, and to check whether it is safe to shut down.
38+
*/
39+
public class PluginShutdownService implements ClusterStateListener {
40+
41+
private static final Logger logger = LogManager.getLogger(PluginShutdownService.class);
42+
public List<ShutdownAwarePlugin> plugins;
43+
44+
public PluginShutdownService(@Nullable List<ShutdownAwarePlugin> plugins) {
45+
this.plugins = plugins == null ? Collections.emptyList() : plugins;
46+
}
47+
48+
/**
49+
* Return all nodes shutting down from the given cluster state
50+
*/
51+
public static Set<String> shutdownNodes(final ClusterState clusterState) {
52+
return NodesShutdownMetadata.getShutdowns(clusterState)
53+
.map(NodesShutdownMetadata::getAllNodeMetadataMap)
54+
.map(Map::keySet)
55+
.orElse(Collections.emptySet());
56+
}
57+
58+
/**
59+
* Return all nodes shutting down with the given shutdown type from the given cluster state
60+
*/
61+
public static Set<String> shutdownTypeNodes(final ClusterState clusterState, final SingleNodeShutdownMetadata.Type shutdownType) {
62+
return NodesShutdownMetadata.getShutdowns(clusterState)
63+
.map(NodesShutdownMetadata::getAllNodeMetadataMap)
64+
.map(m -> m.entrySet().stream()
65+
.filter(e -> e.getValue().getType() == shutdownType)
66+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
67+
.map(Map::keySet)
68+
.orElse(Collections.emptySet());
69+
}
70+
71+
/**
72+
* Check with registered plugins whether the shutdown is safe for the given node id and type
73+
*/
74+
public boolean readyToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shutdownType) {
75+
// TODO: consider adding debugging information (a message about why not?)
76+
// TODO: consider adding more fine-grained status rather than true/false
77+
for (ShutdownAwarePlugin plugin : plugins) {
78+
try {
79+
if (plugin.safeToShutdown(nodeId, shutdownType) == false) {
80+
logger.trace("shutdown aware plugin [{}] is not yet ready for shutdown", plugin);
81+
return false;
82+
}
83+
} catch (Exception e) {
84+
logger.warn("uncaught exception when retrieving whether plugin is ready for node shutdown", e);
85+
}
86+
}
87+
return true;
88+
}
89+
90+
/**
91+
* Signal to plugins the nodes that are currently shutting down
92+
*/
93+
public void signalShutdown(final ClusterState state) {
94+
Set<String> shutdownNodes = shutdownNodes(state);
95+
for (ShutdownAwarePlugin plugin : plugins) {
96+
try {
97+
plugin.signalShutdown(shutdownNodes);
98+
} catch (Exception e) {
99+
logger.warn(new ParameterizedMessage("uncaught exception when notifying plugins of nodes {} shutdown", shutdownNodes), e);
100+
}
101+
}
102+
}
103+
104+
@Override
105+
public void clusterChanged(ClusterChangedEvent event) {
106+
signalShutdown(event.state());
107+
}
108+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.cluster.metadata;
10+
11+
import org.elasticsearch.test.ESTestCase;
12+
13+
public class SingleNodeShutdownMetadataTests extends ESTestCase {
14+
public void testStatusComination() {
15+
SingleNodeShutdownMetadata.Status status;
16+
17+
status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.NOT_STARTED,
18+
SingleNodeShutdownMetadata.Status.IN_PROGRESS,
19+
SingleNodeShutdownMetadata.Status.STALLED);
20+
assertEquals(status, SingleNodeShutdownMetadata.Status.STALLED);
21+
22+
status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.NOT_STARTED,
23+
SingleNodeShutdownMetadata.Status.IN_PROGRESS,
24+
SingleNodeShutdownMetadata.Status.NOT_STARTED);
25+
assertEquals(status, SingleNodeShutdownMetadata.Status.IN_PROGRESS);
26+
27+
status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.NOT_STARTED,
28+
SingleNodeShutdownMetadata.Status.NOT_STARTED,
29+
SingleNodeShutdownMetadata.Status.NOT_STARTED);
30+
assertEquals(status, SingleNodeShutdownMetadata.Status.NOT_STARTED);
31+
32+
status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.IN_PROGRESS,
33+
SingleNodeShutdownMetadata.Status.IN_PROGRESS,
34+
SingleNodeShutdownMetadata.Status.COMPLETE);
35+
assertEquals(status, SingleNodeShutdownMetadata.Status.IN_PROGRESS);
36+
37+
status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.COMPLETE,
38+
SingleNodeShutdownMetadata.Status.COMPLETE,
39+
SingleNodeShutdownMetadata.Status.COMPLETE);
40+
assertEquals(status, SingleNodeShutdownMetadata.Status.COMPLETE);
41+
42+
status = SingleNodeShutdownMetadata.Status.combine();
43+
assertEquals(status, SingleNodeShutdownMetadata.Status.COMPLETE);
44+
}
45+
}

0 commit comments

Comments
 (0)