Skip to content

Commit c85d7dd

Browse files
committed
Add an allocation decider to prevent allocating shards to nodes which are preparing for shutdown (elastic#71658)
This PR adds an allocation decider which uses the metadata managed by the Node Shutdown API to prevent shards from being allocated to nodes which are preparing to be removed from the cluster. Additionally, shards will not be auto-expanded to nodes which are preparing to restart, instead waiting until after the restart is complete to expand the shard replication.
1 parent 8d12eaf commit c85d7dd

File tree

5 files changed

+408
-0
lines changed

5 files changed

+408
-0
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
3939
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
4040
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
41+
import org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider;
4142
import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
4243
import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider;
4344
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
@@ -232,6 +233,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(Settings se
232233
addAllocationDecider(deciders, new NodeVersionAllocationDecider());
233234
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
234235
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
236+
addAllocationDecider(deciders, new NodeShutdownAllocationDecider());
235237
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
236238
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
237239
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.cluster.routing.allocation.decider;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.cluster.metadata.IndexMetadata;
14+
import org.elasticsearch.cluster.metadata.Metadata;
15+
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
16+
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
17+
import org.elasticsearch.cluster.node.DiscoveryNode;
18+
import org.elasticsearch.cluster.routing.RoutingNode;
19+
import org.elasticsearch.cluster.routing.ShardRouting;
20+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
21+
import org.elasticsearch.common.Nullable;
22+
23+
/**
24+
* An allocation decider that prevents shards from being allocated to a
25+
* node that is in the process of shutting down.
26+
*
27+
* In short: No shards can be allocated to, or remain on, a node which is
28+
* shutting down for removal. Primary shards cannot be allocated to, or remain
29+
* on, a node which is shutting down for restart.
30+
*/
31+
public class NodeShutdownAllocationDecider extends AllocationDecider {
32+
private static final Logger logger = LogManager.getLogger(NodeShutdownAllocationDecider.class);
33+
34+
private static final String NAME = "node_shutdown";
35+
36+
/**
37+
* Determines if a shard can be allocated to a particular node, based on whether that node is shutting down or not.
38+
*/
39+
@Override
40+
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
41+
final SingleNodeShutdownMetadata thisNodeShutdownMetadata = getNodeShutdownMetadata(allocation.metadata(), node.nodeId());
42+
43+
if (thisNodeShutdownMetadata == null) {
44+
// There's no shutdown metadata for this node, return yes.
45+
return allocation.decision(Decision.YES, NAME, "this node is not currently shutting down");
46+
}
47+
48+
switch (thisNodeShutdownMetadata.getType()) {
49+
case REMOVE:
50+
return allocation.decision(Decision.NO, NAME, "node [%s] is preparing to be removed from the cluster", node.nodeId());
51+
case RESTART:
52+
return allocation.decision(
53+
Decision.YES,
54+
NAME,
55+
"node [%s] is preparing to restart, but will remain in the cluster",
56+
node.nodeId()
57+
);
58+
default:
59+
logger.debug(
60+
"found unrecognized node shutdown type [{}] while deciding allocation for [{}] shard [{}][{}] on node [{}]",
61+
thisNodeShutdownMetadata.getType(),
62+
shardRouting.primary() ? "primary" : "replica",
63+
shardRouting.getIndexName(),
64+
shardRouting.getId(),
65+
node.nodeId()
66+
);
67+
assert false : "node shutdown type not recognized: " + thisNodeShutdownMetadata.getType();
68+
return Decision.YES;
69+
}
70+
}
71+
72+
/**
73+
* Applies the same rules as {@link NodeShutdownAllocationDecider#canAllocate(ShardRouting, RoutingNode, RoutingAllocation)} to
74+
* determine if shards can remain on their current node.
75+
*/
76+
@Override
77+
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
78+
return this.canAllocate(shardRouting, node, allocation);
79+
}
80+
81+
/**
82+
* Prevents indices from being auto-expanded to nodes which are in the process of shutting down, regardless of whether they're shutting
83+
* down for restart or removal.
84+
*/
85+
@Override
86+
public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
87+
SingleNodeShutdownMetadata thisNodeShutdownMetadata = getNodeShutdownMetadata(allocation.metadata(), node.getId());
88+
89+
if (thisNodeShutdownMetadata == null) {
90+
return allocation.decision(Decision.YES, NAME, "node [%s] is not preparing for removal from the cluster");
91+
}
92+
93+
switch (thisNodeShutdownMetadata.getType()) {
94+
case RESTART:
95+
return allocation.decision(
96+
Decision.NO,
97+
NAME,
98+
"node [%s] is preparing to restart, auto-expansion waiting until it is complete",
99+
node.getId()
100+
);
101+
case REMOVE:
102+
return allocation.decision(Decision.NO, NAME, "node [%s] is preparing for removal from the cluster", node.getId());
103+
default:
104+
logger.debug(
105+
"found unrecognized node shutdown type [{}] while deciding auto-expansion for index [{}] on node [{}]",
106+
thisNodeShutdownMetadata.getType(),
107+
indexMetadata.getIndex().getName(),
108+
node.getId()
109+
);
110+
assert false : "node shutdown type not recognized: " + thisNodeShutdownMetadata.getType();
111+
return Decision.YES;
112+
}
113+
}
114+
115+
@Nullable
116+
private static SingleNodeShutdownMetadata getNodeShutdownMetadata(Metadata metadata, String nodeId) {
117+
NodesShutdownMetadata nodesShutdownMetadata = metadata.custom(NodesShutdownMetadata.TYPE);
118+
if (nodesShutdownMetadata == null || nodesShutdownMetadata.getAllNodeMetadataMap() == null) {
119+
// There are no nodes in the process of shutting down, return null.
120+
return null;
121+
}
122+
123+
return nodesShutdownMetadata.getAllNodeMetadataMap().get(nodeId);
124+
}
125+
}

server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
2424
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
2525
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
26+
import org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider;
2627
import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
2728
import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider;
2829
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
@@ -197,6 +198,7 @@ public void testAllocationDeciderOrder() {
197198
NodeVersionAllocationDecider.class,
198199
SnapshotInProgressAllocationDecider.class,
199200
RestoreInProgressAllocationDecider.class,
201+
NodeShutdownAllocationDecider.class,
200202
FilterAllocationDecider.class,
201203
SameShardAllocationDecider.class,
202204
DiskThresholdDecider.class,
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.cluster.routing.allocation.decider;
10+
11+
import org.elasticsearch.Version;
12+
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.ESAllocationTestCase;
14+
import org.elasticsearch.cluster.EmptyClusterInfoService;
15+
import org.elasticsearch.cluster.metadata.IndexMetadata;
16+
import org.elasticsearch.cluster.metadata.Metadata;
17+
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
18+
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
19+
import org.elasticsearch.cluster.node.DiscoveryNode;
20+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
21+
import org.elasticsearch.cluster.node.DiscoveryNodes;
22+
import org.elasticsearch.cluster.routing.RecoverySource;
23+
import org.elasticsearch.cluster.routing.RoutingNode;
24+
import org.elasticsearch.cluster.routing.ShardRouting;
25+
import org.elasticsearch.cluster.routing.UnassignedInfo;
26+
import org.elasticsearch.cluster.routing.allocation.AllocationService;
27+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
28+
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
29+
import org.elasticsearch.common.settings.ClusterSettings;
30+
import org.elasticsearch.common.settings.Settings;
31+
import org.elasticsearch.index.shard.ShardId;
32+
import org.elasticsearch.snapshots.EmptySnapshotsInfoService;
33+
import org.elasticsearch.test.gateway.TestGatewayAllocator;
34+
35+
import java.util.Arrays;
36+
import java.util.Collections;
37+
import java.util.HashMap;
38+
import java.util.Map;
39+
40+
import static org.hamcrest.Matchers.equalTo;
41+
42+
public class NodeShutdownAllocationDeciderTests extends ESAllocationTestCase {
43+
private static final DiscoveryNode DATA_NODE = newNode("node-data", Collections.singleton(DiscoveryNodeRole.DATA_ROLE));
44+
private final ShardRouting shard = ShardRouting.newUnassigned(
45+
new ShardId("myindex", "myindex", 0),
46+
true,
47+
RecoverySource.EmptyStoreRecoverySource.INSTANCE,
48+
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "index created")
49+
);
50+
private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
51+
private NodeShutdownAllocationDecider decider = new NodeShutdownAllocationDecider();
52+
private final AllocationDeciders allocationDeciders = new AllocationDeciders(
53+
Arrays.asList(
54+
decider,
55+
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
56+
new ReplicaAfterPrimaryActiveAllocationDecider()
57+
)
58+
);
59+
private final AllocationService service = new AllocationService(
60+
allocationDeciders,
61+
new TestGatewayAllocator(),
62+
new BalancedShardsAllocator(Settings.EMPTY),
63+
EmptyClusterInfoService.INSTANCE,
64+
EmptySnapshotsInfoService.INSTANCE
65+
);
66+
67+
private final String idxName = "test-idx";
68+
private final String idxUuid = "test-idx-uuid";
69+
private final IndexMetadata indexMetadata = IndexMetadata.builder(idxName)
70+
.settings(
71+
Settings.builder()
72+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
73+
.put(IndexMetadata.SETTING_INDEX_UUID, idxUuid)
74+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
75+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
76+
.build()
77+
)
78+
.build();
79+
80+
public void testCanAllocateShardsToRestartingNode() {
81+
ClusterState state = prepareState(
82+
service.reroute(ClusterState.EMPTY_STATE, "initial state"),
83+
SingleNodeShutdownMetadata.Type.RESTART
84+
);
85+
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
86+
RoutingNode routingNode = new RoutingNode(DATA_NODE.getId(), DATA_NODE, shard);
87+
allocation.debugDecision(true);
88+
89+
Decision decision = decider.canAllocate(shard, routingNode, allocation);
90+
assertThat(decision.type(), equalTo(Decision.Type.YES));
91+
assertThat(
92+
decision.getExplanation(),
93+
equalTo("node [" + DATA_NODE.getId() + "] is preparing to restart, but will remain in the cluster")
94+
);
95+
}
96+
97+
98+
public void testCannotAllocateShardsToRemovingNode() {
99+
ClusterState state = prepareState(
100+
service.reroute(ClusterState.EMPTY_STATE, "initial state"),
101+
SingleNodeShutdownMetadata.Type.REMOVE
102+
);
103+
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
104+
RoutingNode routingNode = new RoutingNode(DATA_NODE.getId(), DATA_NODE, shard);
105+
allocation.debugDecision(true);
106+
107+
Decision decision = decider.canAllocate(shard, routingNode, allocation);
108+
assertThat(decision.type(), equalTo(Decision.Type.NO));
109+
assertThat(
110+
decision.getExplanation(),
111+
equalTo("node [" + DATA_NODE.getId() + "] is preparing to be removed from the cluster")
112+
);
113+
}
114+
115+
public void testShardsCanRemainOnRestartingNode() {
116+
ClusterState state = prepareState(
117+
service.reroute(ClusterState.EMPTY_STATE, "initial state"),
118+
SingleNodeShutdownMetadata.Type.RESTART
119+
);
120+
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
121+
RoutingNode routingNode = new RoutingNode(DATA_NODE.getId(), DATA_NODE, shard);
122+
allocation.debugDecision(true);
123+
124+
Decision decision = decider.canRemain(shard, routingNode, allocation);
125+
assertThat(decision.type(), equalTo(Decision.Type.YES));
126+
assertThat(
127+
decision.getExplanation(),
128+
equalTo("node [" + DATA_NODE.getId() + "] is preparing to restart, but will remain in the cluster")
129+
);
130+
}
131+
132+
public void testShardsCannotRemainOnRemovingNode() {
133+
ClusterState state = prepareState(
134+
service.reroute(ClusterState.EMPTY_STATE, "initial state"),
135+
SingleNodeShutdownMetadata.Type.REMOVE
136+
);
137+
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
138+
RoutingNode routingNode = new RoutingNode(DATA_NODE.getId(), DATA_NODE, shard);
139+
allocation.debugDecision(true);
140+
141+
Decision decision = decider.canRemain(shard, routingNode, allocation);
142+
assertThat(decision.type(), equalTo(Decision.Type.NO));
143+
assertThat(
144+
decision.getExplanation(),
145+
equalTo("node [" + DATA_NODE.getId() + "] is preparing to be removed from the cluster")
146+
);
147+
}
148+
149+
public void testCannotAutoExpandToRestartingNode() {
150+
ClusterState state = prepareState(
151+
service.reroute(ClusterState.EMPTY_STATE, "initial state"),
152+
SingleNodeShutdownMetadata.Type.RESTART
153+
);
154+
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
155+
allocation.debugDecision(true);
156+
157+
Decision decision = decider.shouldAutoExpandToNode(indexMetadata, DATA_NODE, allocation);
158+
assertThat(decision.type(), equalTo(Decision.Type.NO));
159+
assertThat(
160+
decision.getExplanation(),
161+
equalTo("node [" + DATA_NODE.getId() + "] is preparing to restart, auto-expansion waiting until it is complete")
162+
);
163+
}
164+
165+
public void testCannotAutoExpandToRemovingNode() {
166+
ClusterState state = prepareState(
167+
service.reroute(ClusterState.EMPTY_STATE, "initial state"),
168+
SingleNodeShutdownMetadata.Type.REMOVE
169+
);
170+
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
171+
allocation.debugDecision(true);
172+
173+
Decision decision = decider.shouldAutoExpandToNode(indexMetadata, DATA_NODE, allocation);
174+
assertThat(decision.type(), equalTo(Decision.Type.NO));
175+
assertThat(decision.getExplanation(), equalTo("node [" + DATA_NODE.getId() + "] is preparing for removal from the cluster"));
176+
}
177+
178+
private ClusterState prepareState(ClusterState initialState, SingleNodeShutdownMetadata.Type shutdownType) {
179+
Map<String, SingleNodeShutdownMetadata> nodesShutdownInfo = new HashMap<>();
180+
181+
final SingleNodeShutdownMetadata nodeShutdownMetadata = SingleNodeShutdownMetadata.builder()
182+
.setNodeId(DATA_NODE.getId())
183+
.setType(shutdownType)
184+
.setReason(this.getTestName())
185+
.setStartedAtMillis(1L)
186+
.build();
187+
NodesShutdownMetadata nodesShutdownMetadata = new NodesShutdownMetadata(new HashMap<>()).putSingleNodeMetadata(
188+
nodeShutdownMetadata
189+
);
190+
return ClusterState.builder(initialState)
191+
.nodes(DiscoveryNodes.builder().add(DATA_NODE).build())
192+
.metadata(
193+
Metadata.builder().put(IndexMetadata.builder(indexMetadata)).putCustom(NodesShutdownMetadata.TYPE, nodesShutdownMetadata)
194+
)
195+
.build();
196+
}
197+
}

0 commit comments

Comments
 (0)