Skip to content

Commit 2bf2bdd

Browse files
authored
Make ILM aware of node shutdown (#73690)
This commit makes ILM aware of different parts of the node shutdown lifecycle. It consists are two main parts, reacting to the state during execution, and signaling the status of shutdown from ILM. Reacting to shutdown state ILM now considers nodes that are going to be shut down when deciding which node to assign for the shrink action. It uses the `NodeShutdownAllocationDecider` within the `SetSingleNodeAllocateStep` to not assign shards to a node that will be removed. If an index is already past this step and waiting for allocation, this commit adds an `isCompletable` method to the `ClusterStateWaitUntilThresholdStep` so that an allocation that cannot happen can be rewound and retried on another (non-shutdown) node. Signaling shutdown status This commit introduces the `PluginShutdownService` which deals with `ShutdownAwarePlugin` classes. This class is used to signal shutdowns to plugins, and also to gather the status of a shutdown from these plugins. ILM implements this `ShutdownAwarePlugin` to signal if an index is in a step that is unsafe, such as the actual shrink step, so that shutdown will wait until after the allocation rules have been removed by ILM. This commit also hooks up the get shutdown API response to consider the statuses of its parts (see `SingleNodeShutdownMetadata.Status#combine`) when creating a response. Relates to #70338
1 parent 3d55269 commit 2bf2bdd

File tree

21 files changed

+794
-19
lines changed

21 files changed

+794
-19
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPersistentTasksStatus.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ public class ShutdownPersistentTasksStatus implements Writeable, ToXContentObjec
2222
private final SingleNodeShutdownMetadata.Status status;
2323

2424
public ShutdownPersistentTasksStatus() {
25-
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
25+
this.status = SingleNodeShutdownMetadata.Status.COMPLETE;
2626
}
2727

2828
public ShutdownPersistentTasksStatus(StreamInput in) throws IOException {
29-
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
29+
this.status = SingleNodeShutdownMetadata.Status.COMPLETE;
3030
}
3131

3232
@Override
@@ -42,6 +42,10 @@ public void writeTo(StreamOutput out) throws IOException {
4242

4343
}
4444

45+
public SingleNodeShutdownMetadata.Status getStatus() {
46+
return status;
47+
}
48+
4549
@Override
4650
public int hashCode() {
4751
return status.hashCode();

server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownPluginsStatus.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,17 @@ public class ShutdownPluginsStatus implements Writeable, ToXContentObject {
2121

2222
private final SingleNodeShutdownMetadata.Status status;
2323

24-
public ShutdownPluginsStatus() {
25-
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
24+
public ShutdownPluginsStatus(boolean safeToShutdown) {
25+
this.status = safeToShutdown ? SingleNodeShutdownMetadata.Status.COMPLETE :
26+
SingleNodeShutdownMetadata.Status.IN_PROGRESS;
2627
}
2728

2829
public ShutdownPluginsStatus(StreamInput in) throws IOException {
29-
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
30+
this.status = in.readEnum(SingleNodeShutdownMetadata.Status.class);
31+
}
32+
33+
public SingleNodeShutdownMetadata.Status getStatus() {
34+
return this.status;
3035
}
3136

3237
@Override
@@ -39,7 +44,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
3944

4045
@Override
4146
public void writeTo(StreamOutput out) throws IOException {
42-
47+
out.writeEnum(this.status);
4348
}
4449

4550
@Override

server/src/main/java/org/elasticsearch/cluster/metadata/ShutdownShardMigrationStatus.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ public class ShutdownShardMigrationStatus implements Writeable, ToXContentObject
2222
private final SingleNodeShutdownMetadata.Status status;
2323

2424
public ShutdownShardMigrationStatus() {
25-
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
25+
this.status = SingleNodeShutdownMetadata.Status.COMPLETE;
2626
}
2727

2828
public ShutdownShardMigrationStatus(StreamInput in) throws IOException {
29-
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
29+
this.status = SingleNodeShutdownMetadata.Status.COMPLETE;
3030
}
3131

3232
@Override
@@ -42,6 +42,10 @@ public void writeTo(StreamOutput out) throws IOException {
4242

4343
}
4444

45+
public SingleNodeShutdownMetadata.Status getStatus() {
46+
return status;
47+
}
48+
4549
@Override
4650
public int hashCode() {
4751
return status.hashCode();

server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.xcontent.XContentParser;
2020

2121
import java.io.IOException;
22+
import java.util.Locale;
2223
import java.util.Objects;
2324

2425
/**
@@ -234,16 +235,52 @@ public SingleNodeShutdownMetadata build() {
234235
*/
235236
public enum Type {
236237
REMOVE,
237-
RESTART
238+
RESTART;
239+
240+
public static Type parse(String type) {
241+
if ("remove".equals(type.toLowerCase(Locale.ROOT))) {
242+
return REMOVE;
243+
} else if ("restart".equals(type.toLowerCase(Locale.ROOT))) {
244+
return RESTART;
245+
} else {
246+
throw new IllegalArgumentException("unknown shutdown type: " + type);
247+
}
248+
}
238249
}
239250

240251
/**
241252
* Describes the status of a component of shutdown.
242253
*/
243254
public enum Status {
255+
// These are ordered (see #combine(...))
244256
NOT_STARTED,
245257
IN_PROGRESS,
246258
STALLED,
247-
COMPLETE
259+
COMPLETE;
260+
261+
/**
262+
* Merges multiple statuses into a single, final, status
263+
*
264+
* For example, if called with NOT_STARTED, IN_PROGRESS, and STALLED, the returned state is STALLED.
265+
* Called with IN_PROGRESS, IN_PROGRESS, NOT_STARTED, the returned state is IN_PROGRESS.
266+
* Called with IN_PROGRESS, NOT_STARTED, COMPLETE, the returned state is IN_PROGRESS
267+
* Called with COMPLETE, COMPLETE, COMPLETE, the returned state is COMPLETE
268+
* Called with an empty array, the returned state is COMPLETE
269+
*/
270+
public static Status combine(Status... statuses) {
271+
int statusOrd = -1;
272+
for (Status status : statuses) {
273+
// Max the status up to, but not including, "complete"
274+
if (status != COMPLETE) {
275+
statusOrd = Math.max(status.ordinal(), statusOrd);
276+
}
277+
}
278+
if (statusOrd == -1) {
279+
// Either all the statuses were complete, or there were no statuses given
280+
return COMPLETE;
281+
} else {
282+
return Status.values()[statusOrd];
283+
}
284+
}
248285
}
249286
}

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@
137137
import org.elasticsearch.plugins.RepositoryPlugin;
138138
import org.elasticsearch.plugins.ScriptPlugin;
139139
import org.elasticsearch.plugins.SearchPlugin;
140+
import org.elasticsearch.plugins.ShutdownAwarePlugin;
140141
import org.elasticsearch.plugins.SystemIndexPlugin;
141142
import org.elasticsearch.repositories.RepositoriesModule;
142143
import org.elasticsearch.repositories.RepositoriesService;
@@ -149,6 +150,7 @@
149150
import org.elasticsearch.search.SearchService;
150151
import org.elasticsearch.search.aggregations.support.AggregationUsageService;
151152
import org.elasticsearch.search.fetch.FetchPhase;
153+
import org.elasticsearch.shutdown.PluginShutdownService;
152154
import org.elasticsearch.snapshots.InternalSnapshotsInfoService;
153155
import org.elasticsearch.snapshots.RestoreService;
154156
import org.elasticsearch.snapshots.SnapshotShardsService;
@@ -628,6 +630,10 @@ protected Node(final Environment initialEnvironment,
628630
resourcesToClose.add(persistentTasksClusterService);
629631
final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
630632

633+
final List<ShutdownAwarePlugin> shutdownAwarePlugins = pluginsService.filterPlugins(ShutdownAwarePlugin.class);
634+
final PluginShutdownService pluginShutdownService = new PluginShutdownService(shutdownAwarePlugins);
635+
clusterService.addListener(pluginShutdownService);
636+
631637
modules.add(b -> {
632638
b.bind(Node.class).toInstance(this);
633639
b.bind(NodeService.class).toInstance(nodeService);
@@ -689,6 +695,7 @@ protected Node(final Environment initialEnvironment,
689695
b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
690696
b.bind(FsHealthService.class).toInstance(fsHealthService);
691697
b.bind(SystemIndices.class).toInstance(systemIndices);
698+
b.bind(PluginShutdownService.class).toInstance(pluginShutdownService);
692699
b.bind(ExecutorSelector.class).toInstance(executorSelector);
693700
}
694701
);
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.plugins;
10+
11+
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
12+
13+
import java.util.Collection;
14+
15+
/**
16+
* A {@link ShutdownAwarePlugin} is a plugin that can be made aware of a shutdown. It comprises two
17+
* parts, one part used for telling plugins that a set of nodes are going to be shut down
18+
* ({@link #signalShutdown(Collection)}), the other for retrieving the status of those plugins
19+
* as to whether it is safe to shut down ({@link #safeToShutdown(String, SingleNodeShutdownMetadata.Type)}
20+
*/
21+
public interface ShutdownAwarePlugin {
22+
23+
/**
24+
* Whether the plugin is considered safe to shut down. This method is called when the status of
25+
* a shutdown is retrieved via the API, and it is only called on the master node.
26+
*/
27+
boolean safeToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shutdownType);
28+
29+
/**
30+
* A trigger to notify the plugin that a shutdown for the nodes has been triggered. This method
31+
* will be called on every node for each cluster state, so it should return quickly.
32+
*/
33+
void signalShutdown(Collection<String> shutdownNodeIds);
34+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
/*
10+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
11+
* or more contributor license agreements. Licensed under the Elastic License
12+
* 2.0; you may not use this file except in compliance with the Elastic License
13+
* 2.0.
14+
*/
15+
16+
package org.elasticsearch.shutdown;
17+
18+
import org.apache.logging.log4j.LogManager;
19+
import org.apache.logging.log4j.Logger;
20+
import org.apache.logging.log4j.message.ParameterizedMessage;
21+
import org.elasticsearch.cluster.ClusterChangedEvent;
22+
import org.elasticsearch.cluster.ClusterState;
23+
import org.elasticsearch.cluster.ClusterStateListener;
24+
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
25+
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
26+
import org.elasticsearch.common.Nullable;
27+
import org.elasticsearch.plugins.ShutdownAwarePlugin;
28+
29+
import java.util.Collections;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.Set;
33+
import java.util.stream.Collectors;
34+
35+
/**
36+
* The {@link PluginShutdownService} is used for the node shutdown infrastructure to signal to
37+
* plugins that a shutdown is occurring, and to check whether it is safe to shut down.
38+
*/
39+
public class PluginShutdownService implements ClusterStateListener {
40+
41+
private static final Logger logger = LogManager.getLogger(PluginShutdownService.class);
42+
public List<ShutdownAwarePlugin> plugins;
43+
44+
public PluginShutdownService(@Nullable List<ShutdownAwarePlugin> plugins) {
45+
this.plugins = plugins == null ? Collections.emptyList() : plugins;
46+
}
47+
48+
/**
49+
* Return all nodes shutting down from the given cluster state
50+
*/
51+
public static Set<String> shutdownNodes(final ClusterState clusterState) {
52+
return NodesShutdownMetadata.getShutdowns(clusterState)
53+
.map(NodesShutdownMetadata::getAllNodeMetadataMap)
54+
.map(Map::keySet)
55+
.orElse(Collections.emptySet());
56+
}
57+
58+
/**
59+
* Return all nodes shutting down with the given shutdown type from the given cluster state
60+
*/
61+
public static Set<String> shutdownTypeNodes(final ClusterState clusterState, final SingleNodeShutdownMetadata.Type shutdownType) {
62+
return NodesShutdownMetadata.getShutdowns(clusterState)
63+
.map(NodesShutdownMetadata::getAllNodeMetadataMap)
64+
.map(m -> m.entrySet().stream()
65+
.filter(e -> e.getValue().getType() == shutdownType)
66+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
67+
.map(Map::keySet)
68+
.orElse(Collections.emptySet());
69+
}
70+
71+
/**
72+
* Check with registered plugins whether the shutdown is safe for the given node id and type
73+
*/
74+
public boolean readyToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shutdownType) {
75+
// TODO: consider adding debugging information (a message about why not?)
76+
// TODO: consider adding more fine-grained status rather than true/false
77+
for (ShutdownAwarePlugin plugin : plugins) {
78+
try {
79+
if (plugin.safeToShutdown(nodeId, shutdownType) == false) {
80+
logger.trace("shutdown aware plugin [{}] is not yet ready for shutdown", plugin);
81+
return false;
82+
}
83+
} catch (Exception e) {
84+
logger.warn("uncaught exception when retrieving whether plugin is ready for node shutdown", e);
85+
}
86+
}
87+
return true;
88+
}
89+
90+
/**
91+
* Signal to plugins the nodes that are currently shutting down
92+
*/
93+
public void signalShutdown(final ClusterState state) {
94+
Set<String> shutdownNodes = shutdownNodes(state);
95+
for (ShutdownAwarePlugin plugin : plugins) {
96+
try {
97+
plugin.signalShutdown(shutdownNodes);
98+
} catch (Exception e) {
99+
logger.warn(new ParameterizedMessage("uncaught exception when notifying plugins of nodes {} shutdown", shutdownNodes), e);
100+
}
101+
}
102+
}
103+
104+
@Override
105+
public void clusterChanged(ClusterChangedEvent event) {
106+
signalShutdown(event.state());
107+
}
108+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.cluster.metadata;
10+
11+
import org.elasticsearch.test.ESTestCase;
12+
13+
public class SingleNodeShutdownMetadataTests extends ESTestCase {
14+
public void testStatusComination() {
15+
SingleNodeShutdownMetadata.Status status;
16+
17+
status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.NOT_STARTED,
18+
SingleNodeShutdownMetadata.Status.IN_PROGRESS,
19+
SingleNodeShutdownMetadata.Status.STALLED);
20+
assertEquals(status, SingleNodeShutdownMetadata.Status.STALLED);
21+
22+
status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.NOT_STARTED,
23+
SingleNodeShutdownMetadata.Status.IN_PROGRESS,
24+
SingleNodeShutdownMetadata.Status.NOT_STARTED);
25+
assertEquals(status, SingleNodeShutdownMetadata.Status.IN_PROGRESS);
26+
27+
status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.NOT_STARTED,
28+
SingleNodeShutdownMetadata.Status.NOT_STARTED,
29+
SingleNodeShutdownMetadata.Status.NOT_STARTED);
30+
assertEquals(status, SingleNodeShutdownMetadata.Status.NOT_STARTED);
31+
32+
status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.IN_PROGRESS,
33+
SingleNodeShutdownMetadata.Status.IN_PROGRESS,
34+
SingleNodeShutdownMetadata.Status.COMPLETE);
35+
assertEquals(status, SingleNodeShutdownMetadata.Status.IN_PROGRESS);
36+
37+
status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.COMPLETE,
38+
SingleNodeShutdownMetadata.Status.COMPLETE,
39+
SingleNodeShutdownMetadata.Status.COMPLETE);
40+
assertEquals(status, SingleNodeShutdownMetadata.Status.COMPLETE);
41+
42+
status = SingleNodeShutdownMetadata.Status.combine();
43+
assertEquals(status, SingleNodeShutdownMetadata.Status.COMPLETE);
44+
}
45+
}

0 commit comments

Comments
 (0)