diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 27fad07235e9f..915ca94fbac4e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; @@ -200,6 +201,7 @@ public static Collection createAllocationDeciders(Settings se addAllocationDecider(deciders, new NodeVersionAllocationDecider()); addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider()); addAllocationDecider(deciders, new RestoreInProgressAllocationDecider()); + addAllocationDecider(deciders, new NodeShutdownAllocationDecider()); addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings)); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeShutdownAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeShutdownAllocationDecider.java new file mode 100644 index 0000000000000..feaff722e1aa8 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeShutdownAllocationDecider.java @@ -0,0 +1,125 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.Nullable; + +/** + * An allocation decider that prevents shards from being allocated to a + * node that is in the process of shutting down. + * + * In short: No shards can be allocated to, or remain on, a node which is + * shutting down for removal. Primary shards cannot be allocated to, or remain + * on, a node which is shutting down for restart. + */ +public class NodeShutdownAllocationDecider extends AllocationDecider { + private static final Logger logger = LogManager.getLogger(NodeShutdownAllocationDecider.class); + + private static final String NAME = "node_shutdown"; + + /** + * Determines if a shard can be allocated to a particular node, based on whether that node is shutting down or not. + */ + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + final SingleNodeShutdownMetadata thisNodeShutdownMetadata = getNodeShutdownMetadata(allocation.metadata(), node.nodeId()); + + if (thisNodeShutdownMetadata == null) { + // There's no shutdown metadata for this node, return yes. + return allocation.decision(Decision.YES, NAME, "this node is not currently shutting down"); + } + + switch (thisNodeShutdownMetadata.getType()) { + case REMOVE: + return allocation.decision(Decision.NO, NAME, "node [%s] is preparing to be removed from the cluster", node.nodeId()); + case RESTART: + return allocation.decision( + Decision.YES, + NAME, + "node [%s] is preparing to restart, but will remain in the cluster", + node.nodeId() + ); + default: + logger.debug( + "found unrecognized node shutdown type [{}] while deciding allocation for [{}] shard [{}][{}] on node [{}]", + thisNodeShutdownMetadata.getType(), + shardRouting.primary() ? "primary" : "replica", + shardRouting.getIndexName(), + shardRouting.getId(), + node.nodeId() + ); + assert false : "node shutdown type not recognized: " + thisNodeShutdownMetadata.getType(); + return Decision.YES; + } + } + + /** + * Applies the same rules as {@link NodeShutdownAllocationDecider#canAllocate(ShardRouting, RoutingNode, RoutingAllocation)} to + * determine if shards can remain on their current node. + */ + @Override + public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return this.canAllocate(shardRouting, node, allocation); + } + + /** + * Prevents indices from being auto-expanded to nodes which are in the process of shutting down, regardless of whether they're shutting + * down for restart or removal. + */ + @Override + public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) { + SingleNodeShutdownMetadata thisNodeShutdownMetadata = getNodeShutdownMetadata(allocation.metadata(), node.getId()); + + if (thisNodeShutdownMetadata == null) { + return allocation.decision(Decision.YES, NAME, "node [%s] is not preparing for removal from the cluster"); + } + + switch (thisNodeShutdownMetadata.getType()) { + case RESTART: + return allocation.decision( + Decision.NO, + NAME, + "node [%s] is preparing to restart, auto-expansion waiting until it is complete", + node.getId() + ); + case REMOVE: + return allocation.decision(Decision.NO, NAME, "node [%s] is preparing for removal from the cluster", node.getId()); + default: + logger.debug( + "found unrecognized node shutdown type [{}] while deciding auto-expansion for index [{}] on node [{}]", + thisNodeShutdownMetadata.getType(), + indexMetadata.getIndex().getName(), + node.getId() + ); + assert false : "node shutdown type not recognized: " + thisNodeShutdownMetadata.getType(); + return Decision.YES; + } + } + + @Nullable + private static SingleNodeShutdownMetadata getNodeShutdownMetadata(Metadata metadata, String nodeId) { + NodesShutdownMetadata nodesShutdownMetadata = metadata.custom(NodesShutdownMetadata.TYPE); + if (nodesShutdownMetadata == null || nodesShutdownMetadata.getAllNodeMetadataMap() == null) { + // There are no nodes in the process of shutting down, return null. + return null; + } + + return nodesShutdownMetadata.getAllNodeMetadataMap().get(nodeId); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java index 9a219755f4848..b966d428ce0c6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; @@ -193,6 +194,7 @@ public void testAllocationDeciderOrder() { NodeVersionAllocationDecider.class, SnapshotInProgressAllocationDecider.class, RestoreInProgressAllocationDecider.class, + NodeShutdownAllocationDecider.class, FilterAllocationDecider.class, SameShardAllocationDecider.class, DiskThresholdDecider.class, diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeShutdownAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeShutdownAllocationDeciderTests.java new file mode 100644 index 0000000000000..c74d5ee68e4ea --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeShutdownAllocationDeciderTests.java @@ -0,0 +1,197 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.snapshots.EmptySnapshotsInfoService; +import org.elasticsearch.test.gateway.TestGatewayAllocator; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class NodeShutdownAllocationDeciderTests extends ESAllocationTestCase { + private static final DiscoveryNode DATA_NODE = newNode("node-data", Collections.singleton(DiscoveryNodeRole.DATA_ROLE)); + private final ShardRouting shard = ShardRouting.newUnassigned( + new ShardId("myindex", "myindex", 0), + true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "index created") + ); + private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + private NodeShutdownAllocationDecider decider = new NodeShutdownAllocationDecider(); + private final AllocationDeciders allocationDeciders = new AllocationDeciders( + Arrays.asList( + decider, + new SameShardAllocationDecider(Settings.EMPTY, clusterSettings), + new ReplicaAfterPrimaryActiveAllocationDecider() + ) + ); + private final AllocationService service = new AllocationService( + allocationDeciders, + new TestGatewayAllocator(), + new BalancedShardsAllocator(Settings.EMPTY), + EmptyClusterInfoService.INSTANCE, + EmptySnapshotsInfoService.INSTANCE + ); + + private final String idxName = "test-idx"; + private final String idxUuid = "test-idx-uuid"; + private final IndexMetadata indexMetadata = IndexMetadata.builder(idxName) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, idxUuid) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ) + .build(); + + public void testCanAllocateShardsToRestartingNode() { + ClusterState state = prepareState( + service.reroute(ClusterState.EMPTY_STATE, "initial state"), + SingleNodeShutdownMetadata.Type.RESTART + ); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + RoutingNode routingNode = new RoutingNode(DATA_NODE.getId(), DATA_NODE, shard); + allocation.debugDecision(true); + + Decision decision = decider.canAllocate(shard, routingNode, allocation); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + assertThat( + decision.getExplanation(), + equalTo("node [" + DATA_NODE.getId() + "] is preparing to restart, but will remain in the cluster") + ); + } + + + public void testCannotAllocateShardsToRemovingNode() { + ClusterState state = prepareState( + service.reroute(ClusterState.EMPTY_STATE, "initial state"), + SingleNodeShutdownMetadata.Type.REMOVE + ); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + RoutingNode routingNode = new RoutingNode(DATA_NODE.getId(), DATA_NODE, shard); + allocation.debugDecision(true); + + Decision decision = decider.canAllocate(shard, routingNode, allocation); + assertThat(decision.type(), equalTo(Decision.Type.NO)); + assertThat( + decision.getExplanation(), + equalTo("node [" + DATA_NODE.getId() + "] is preparing to be removed from the cluster") + ); + } + + public void testShardsCanRemainOnRestartingNode() { + ClusterState state = prepareState( + service.reroute(ClusterState.EMPTY_STATE, "initial state"), + SingleNodeShutdownMetadata.Type.RESTART + ); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + RoutingNode routingNode = new RoutingNode(DATA_NODE.getId(), DATA_NODE, shard); + allocation.debugDecision(true); + + Decision decision = decider.canRemain(shard, routingNode, allocation); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + assertThat( + decision.getExplanation(), + equalTo("node [" + DATA_NODE.getId() + "] is preparing to restart, but will remain in the cluster") + ); + } + + public void testShardsCannotRemainOnRemovingNode() { + ClusterState state = prepareState( + service.reroute(ClusterState.EMPTY_STATE, "initial state"), + SingleNodeShutdownMetadata.Type.REMOVE + ); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + RoutingNode routingNode = new RoutingNode(DATA_NODE.getId(), DATA_NODE, shard); + allocation.debugDecision(true); + + Decision decision = decider.canRemain(shard, routingNode, allocation); + assertThat(decision.type(), equalTo(Decision.Type.NO)); + assertThat( + decision.getExplanation(), + equalTo("node [" + DATA_NODE.getId() + "] is preparing to be removed from the cluster") + ); + } + + public void testCannotAutoExpandToRestartingNode() { + ClusterState state = prepareState( + service.reroute(ClusterState.EMPTY_STATE, "initial state"), + SingleNodeShutdownMetadata.Type.RESTART + ); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + allocation.debugDecision(true); + + Decision decision = decider.shouldAutoExpandToNode(indexMetadata, DATA_NODE, allocation); + assertThat(decision.type(), equalTo(Decision.Type.NO)); + assertThat( + decision.getExplanation(), + equalTo("node [" + DATA_NODE.getId() + "] is preparing to restart, auto-expansion waiting until it is complete") + ); + } + + public void testCannotAutoExpandToRemovingNode() { + ClusterState state = prepareState( + service.reroute(ClusterState.EMPTY_STATE, "initial state"), + SingleNodeShutdownMetadata.Type.REMOVE + ); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + allocation.debugDecision(true); + + Decision decision = decider.shouldAutoExpandToNode(indexMetadata, DATA_NODE, allocation); + assertThat(decision.type(), equalTo(Decision.Type.NO)); + assertThat(decision.getExplanation(), equalTo("node [" + DATA_NODE.getId() + "] is preparing for removal from the cluster")); + } + + private ClusterState prepareState(ClusterState initialState, SingleNodeShutdownMetadata.Type shutdownType) { + Map nodesShutdownInfo = new HashMap<>(); + + final SingleNodeShutdownMetadata nodeShutdownMetadata = SingleNodeShutdownMetadata.builder() + .setNodeId(DATA_NODE.getId()) + .setType(shutdownType) + .setReason(this.getTestName()) + .setStartedAtMillis(1L) + .build(); + NodesShutdownMetadata nodesShutdownMetadata = new NodesShutdownMetadata(new HashMap<>()).putSingleNodeMetadata( + nodeShutdownMetadata + ); + return ClusterState.builder(initialState) + .nodes(DiscoveryNodes.builder().add(DATA_NODE).build()) + .metadata( + Metadata.builder().put(IndexMetadata.builder(indexMetadata)).putCustom(NodesShutdownMetadata.TYPE, nodesShutdownMetadata) + ) + .build(); + } +} diff --git a/x-pack/plugin/shutdown/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownIT.java b/x-pack/plugin/shutdown/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownIT.java index b99571e69db75..9af10e692ecde 100644 --- a/x-pack/plugin/shutdown/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownIT.java +++ b/x-pack/plugin/shutdown/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownIT.java @@ -16,10 +16,14 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; public class NodeShutdownIT extends ESRestTestCase { @@ -58,6 +62,84 @@ public void testCRUD() throws Exception { assertNoShuttingDownNodes(nodeIdToShutdown); } + /** + * A very basic smoke test to make sure the allocation decider is working. + */ + @SuppressWarnings("unchecked") + public void testAllocationPreventedForRemoval() throws Exception { + Request nodesRequest = new Request("GET", "_nodes"); + Map nodesResponse = responseAsMap(client().performRequest(nodesRequest)); + Map nodesObject = (Map) nodesResponse.get("nodes"); + + String nodeIdToShutdown = randomFrom(nodesObject.keySet()); + String reason = "testing node shutdown allocation rules"; + String type = "REMOVE"; + + // Put a shutdown request + Request putShutdown = new Request("PUT", "_nodes/" + nodeIdToShutdown + "/shutdown"); + putShutdown.setJsonEntity("{\"type\": \"" + type + "\", \"reason\": \"" + reason + "\"}"); + assertOK(client().performRequest(putShutdown)); + + // Create an index with 1s/2r + final String indexName = "test-idx"; + Request createIndexRequest = new Request("PUT", indexName); + createIndexRequest.setJsonEntity("{\"settings\": {\"number_of_shards\": 1, \"number_of_replicas\": 3}}"); + assertOK(client().performRequest(createIndexRequest)); + + // Watch to ensure no shards gets allocated to the node that's shutting down + Request checkShardsRequest = new Request("GET", "_cat/shards/" + indexName); + checkShardsRequest.addParameter("format", "json"); + checkShardsRequest.addParameter("h", "index,shard,prirep,id,state"); + + assertBusy(() -> { + List shardsResponse = entityAsList(client().performRequest(checkShardsRequest)); + int startedShards = 0; + int unassignedShards = 0; + for (Object shard : shardsResponse) { + Map shardMap = (Map) shard; + assertThat( + "no shards should be assigned to a node shutting down for removal", + shardMap.get("id"), + not(equalTo(nodeIdToShutdown)) + ); + + if (shardMap.get("id") == null) { + unassignedShards++; + } else if (nodeIdToShutdown.equals(shardMap.get("id")) == false) { + assertThat("all other shards should be started", shardMap.get("state"), equalTo("STARTED")); + startedShards++; + } + } + assertThat(unassignedShards, equalTo(1)); + assertThat(startedShards, equalTo(3)); + }); + // Now that we know all shards of the test index are assigned except one, + // make sure it's unassigned because of the allocation decider. + + Request allocationExplainRequest = new Request("GET", "_cluster/allocation/explain"); + allocationExplainRequest.setJsonEntity("{\"index\": \"" + indexName + "\", \"shard\": 0, \"primary\": false}"); + Map allocationExplainMap = entityAsMap(client().performRequest(allocationExplainRequest)); + List> decisions = (List>) allocationExplainMap.get("node_allocation_decisions"); + assertThat(decisions, notNullValue()); + + Optional> maybeDecision = decisions.stream() + .filter(decision -> nodeIdToShutdown.equals(decision.get("node_id"))) + .findFirst(); + assertThat("expected decisions for node, but not found", maybeDecision.isPresent(), is(true)); + + Map decision = maybeDecision.get(); + assertThat("node should have deciders", decision.containsKey("deciders"), is(true)); + + List> deciders = (List>) decision.get("deciders"); + assertThat( + "the node_shutdown allocation decider should have decided NO", + deciders.stream() + .filter(decider -> "node_shutdown".equals(decider.get("decider"))) + .allMatch(decider -> "NO".equals(decider.get("decision"))), + is(true) + ); + } + @SuppressWarnings("unchecked") private void assertNoShuttingDownNodes(String nodeIdToShutdown) throws IOException { Request getShutdownStatus = new Request("GET", "_nodes/" + nodeIdToShutdown + "/shutdown");