Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
Expand Down Expand Up @@ -200,6 +201,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(Settings se
addAllocationDecider(deciders, new NodeVersionAllocationDecider());
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new NodeShutdownAllocationDecider());
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.routing.allocation.decider;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Nullable;

/**
* An allocation decider that prevents shards from being allocated to a
* node that is in the process of shutting down.
*
* In short: No shards can be allocated to, or remain on, a node which is
* shutting down for removal. Primary shards cannot be allocated to, or remain
* on, a node which is shutting down for restart.
*/
public class NodeShutdownAllocationDecider extends AllocationDecider {
private static final Logger logger = LogManager.getLogger(NodeShutdownAllocationDecider.class);

private static final String NAME = "node_shutdown";

/**
* Determines if a shard can be allocated to a particular node, based on whether that node is shutting down or not.
*/
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
final SingleNodeShutdownMetadata thisNodeShutdownMetadata = getNodeShutdownMetadata(allocation.metadata(), node.nodeId());

if (thisNodeShutdownMetadata == null) {
// There's no shutdown metadata for this node, return yes.
return allocation.decision(Decision.YES, NAME, "this node is not currently shutting down");
}

switch (thisNodeShutdownMetadata.getType()) {
case REMOVE:
return allocation.decision(Decision.NO, NAME, "node [%s] is preparing to be removed from the cluster", node.nodeId());
case RESTART:
return allocation.decision(
Decision.YES,
NAME,
"node [%s] is preparing to restart, but will remain in the cluster",
node.nodeId()
);
default:
logger.debug(
"found unrecognized node shutdown type [{}] while deciding allocation for [{}] shard [{}][{}] on node [{}]",
thisNodeShutdownMetadata.getType(),
shardRouting.primary() ? "primary" : "replica",
shardRouting.getIndexName(),
shardRouting.getId(),
node.nodeId()
);
assert false : "node shutdown type not recognized: " + thisNodeShutdownMetadata.getType();
return Decision.YES;
}
}

/**
* Applies the same rules as {@link NodeShutdownAllocationDecider#canAllocate(ShardRouting, RoutingNode, RoutingAllocation)} to
* determine if shards can remain on their current node.
*/
@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return this.canAllocate(shardRouting, node, allocation);
}

/**
* Prevents indices from being auto-expanded to nodes which are in the process of shutting down, regardless of whether they're shutting
* down for restart or removal.
*/
@Override
public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
SingleNodeShutdownMetadata thisNodeShutdownMetadata = getNodeShutdownMetadata(allocation.metadata(), node.getId());

if (thisNodeShutdownMetadata == null) {
return allocation.decision(Decision.YES, NAME, "node [%s] is not preparing for removal from the cluster");
}

switch (thisNodeShutdownMetadata.getType()) {
case RESTART:
return allocation.decision(
Decision.NO,
NAME,
"node [%s] is preparing to restart, auto-expansion waiting until it is complete",
node.getId()
);
case REMOVE:
return allocation.decision(Decision.NO, NAME, "node [%s] is preparing for removal from the cluster", node.getId());
default:
logger.debug(
"found unrecognized node shutdown type [{}] while deciding auto-expansion for index [{}] on node [{}]",
thisNodeShutdownMetadata.getType(),
indexMetadata.getIndex().getName(),
node.getId()
);
assert false : "node shutdown type not recognized: " + thisNodeShutdownMetadata.getType();
return Decision.YES;
}
}

@Nullable
private static SingleNodeShutdownMetadata getNodeShutdownMetadata(Metadata metadata, String nodeId) {
NodesShutdownMetadata nodesShutdownMetadata = metadata.custom(NodesShutdownMetadata.TYPE);
if (nodesShutdownMetadata == null || nodesShutdownMetadata.getAllNodeMetadataMap() == null) {
// There are no nodes in the process of shutting down, return null.
return null;
}

return nodesShutdownMetadata.getAllNodeMetadataMap().get(nodeId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
Expand Down Expand Up @@ -193,6 +194,7 @@ public void testAllocationDeciderOrder() {
NodeVersionAllocationDecider.class,
SnapshotInProgressAllocationDecider.class,
RestoreInProgressAllocationDecider.class,
NodeShutdownAllocationDecider.class,
FilterAllocationDecider.class,
SameShardAllocationDecider.class,
DiskThresholdDecider.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.EmptySnapshotsInfoService;
import org.elasticsearch.test.gateway.TestGatewayAllocator;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

public class NodeShutdownAllocationDeciderTests extends ESAllocationTestCase {
private static final DiscoveryNode DATA_NODE = newNode("node-data", Collections.singleton(DiscoveryNodeRole.DATA_ROLE));
private final ShardRouting shard = ShardRouting.newUnassigned(
new ShardId("myindex", "myindex", 0),
true,
RecoverySource.EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "index created")
);
private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
private NodeShutdownAllocationDecider decider = new NodeShutdownAllocationDecider();
private final AllocationDeciders allocationDeciders = new AllocationDeciders(
Arrays.asList(
decider,
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
new ReplicaAfterPrimaryActiveAllocationDecider()
)
);
private final AllocationService service = new AllocationService(
allocationDeciders,
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE
);

private final String idxName = "test-idx";
private final String idxUuid = "test-idx-uuid";
private final IndexMetadata indexMetadata = IndexMetadata.builder(idxName)
.settings(
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_INDEX_UUID, idxUuid)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
)
.build();

public void testCanAllocateShardsToRestartingNode() {
ClusterState state = prepareState(
service.reroute(ClusterState.EMPTY_STATE, "initial state"),
SingleNodeShutdownMetadata.Type.RESTART
);
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
RoutingNode routingNode = new RoutingNode(DATA_NODE.getId(), DATA_NODE, shard);
allocation.debugDecision(true);

Decision decision = decider.canAllocate(shard, routingNode, allocation);
assertThat(decision.type(), equalTo(Decision.Type.YES));
assertThat(
decision.getExplanation(),
equalTo("node [" + DATA_NODE.getId() + "] is preparing to restart, but will remain in the cluster")
);
}


public void testCannotAllocateShardsToRemovingNode() {
ClusterState state = prepareState(
service.reroute(ClusterState.EMPTY_STATE, "initial state"),
SingleNodeShutdownMetadata.Type.REMOVE
);
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
RoutingNode routingNode = new RoutingNode(DATA_NODE.getId(), DATA_NODE, shard);
allocation.debugDecision(true);

Decision decision = decider.canAllocate(shard, routingNode, allocation);
assertThat(decision.type(), equalTo(Decision.Type.NO));
assertThat(
decision.getExplanation(),
equalTo("node [" + DATA_NODE.getId() + "] is preparing to be removed from the cluster")
);
}

public void testShardsCanRemainOnRestartingNode() {
ClusterState state = prepareState(
service.reroute(ClusterState.EMPTY_STATE, "initial state"),
SingleNodeShutdownMetadata.Type.RESTART
);
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
RoutingNode routingNode = new RoutingNode(DATA_NODE.getId(), DATA_NODE, shard);
allocation.debugDecision(true);

Decision decision = decider.canRemain(shard, routingNode, allocation);
assertThat(decision.type(), equalTo(Decision.Type.YES));
assertThat(
decision.getExplanation(),
equalTo("node [" + DATA_NODE.getId() + "] is preparing to restart, but will remain in the cluster")
);
}

public void testShardsCannotRemainOnRemovingNode() {
ClusterState state = prepareState(
service.reroute(ClusterState.EMPTY_STATE, "initial state"),
SingleNodeShutdownMetadata.Type.REMOVE
);
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
RoutingNode routingNode = new RoutingNode(DATA_NODE.getId(), DATA_NODE, shard);
allocation.debugDecision(true);

Decision decision = decider.canRemain(shard, routingNode, allocation);
assertThat(decision.type(), equalTo(Decision.Type.NO));
assertThat(
decision.getExplanation(),
equalTo("node [" + DATA_NODE.getId() + "] is preparing to be removed from the cluster")
);
}

public void testCannotAutoExpandToRestartingNode() {
ClusterState state = prepareState(
service.reroute(ClusterState.EMPTY_STATE, "initial state"),
SingleNodeShutdownMetadata.Type.RESTART
);
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
allocation.debugDecision(true);

Decision decision = decider.shouldAutoExpandToNode(indexMetadata, DATA_NODE, allocation);
assertThat(decision.type(), equalTo(Decision.Type.NO));
assertThat(
decision.getExplanation(),
equalTo("node [" + DATA_NODE.getId() + "] is preparing to restart, auto-expansion waiting until it is complete")
);
}

public void testCannotAutoExpandToRemovingNode() {
ClusterState state = prepareState(
service.reroute(ClusterState.EMPTY_STATE, "initial state"),
SingleNodeShutdownMetadata.Type.REMOVE
);
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
allocation.debugDecision(true);

Decision decision = decider.shouldAutoExpandToNode(indexMetadata, DATA_NODE, allocation);
assertThat(decision.type(), equalTo(Decision.Type.NO));
assertThat(decision.getExplanation(), equalTo("node [" + DATA_NODE.getId() + "] is preparing for removal from the cluster"));
}

private ClusterState prepareState(ClusterState initialState, SingleNodeShutdownMetadata.Type shutdownType) {
Map<String, SingleNodeShutdownMetadata> nodesShutdownInfo = new HashMap<>();

final SingleNodeShutdownMetadata nodeShutdownMetadata = SingleNodeShutdownMetadata.builder()
.setNodeId(DATA_NODE.getId())
.setType(shutdownType)
.setReason(this.getTestName())
.setStartedAtMillis(1L)
.build();
NodesShutdownMetadata nodesShutdownMetadata = new NodesShutdownMetadata(new HashMap<>()).putSingleNodeMetadata(
nodeShutdownMetadata
);
return ClusterState.builder(initialState)
.nodes(DiscoveryNodes.builder().add(DATA_NODE).build())
.metadata(
Metadata.builder().put(IndexMetadata.builder(indexMetadata)).putCustom(NodesShutdownMetadata.TYPE, nodesShutdownMetadata)
)
.build();
}
}
Loading