diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 3584e39347108..59e908179fbe9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -47,7 +47,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.node.NodeClosedException; @@ -72,7 +71,6 @@ import java.util.Objects; import java.util.Set; import java.util.function.Predicate; -import java.util.function.Supplier; public class ShardStateAction { @@ -81,34 +79,10 @@ public class ShardStateAction { public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started"; public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure"; - /** - * Adjusts the priority of the followup reroute task. NORMAL is right for reasonable clusters, but in a badly configured cluster it may - * be necessary to raise this higher to recover the older behaviour of rerouting after processing every shard-started task. Deliberately - * undocumented, since this is a last-resort escape hatch for experts rather than something we want to expose to anyone, and deprecated - * since we will remove it once we have confirmed from experience that this priority is appropriate in all cases. - */ - public static final Setting FOLLOW_UP_REROUTE_PRIORITY_SETTING - = new Setting<>("cluster.routing.allocation.shard_state.reroute.priority", Priority.NORMAL.toString(), - ShardStateAction::parseReroutePriority, Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated); - - private static Priority parseReroutePriority(String priorityString) { - final Priority priority = Priority.valueOf(priorityString.toUpperCase(Locale.ROOT)); - switch (priority) { - case NORMAL: - case HIGH: - case URGENT: - return priority; - } - throw new IllegalArgumentException( - "priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]"); - } - private final TransportService transportService; private final ClusterService clusterService; private final ThreadPool threadPool; - private volatile Priority followUpRerouteTaskPriority; - // a list of shards that failed during replication // we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard. private final TransportRequestDeduplicator remoteFailedShardsDeduplicator = new TransportRequestDeduplicator<>(); @@ -120,17 +94,13 @@ public ShardStateAction(ClusterService clusterService, TransportService transpor this.clusterService = clusterService; this.threadPool = threadPool; - followUpRerouteTaskPriority = FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(clusterService.getSettings()); - clusterService.getClusterSettings().addSettingsUpdateConsumer(FOLLOW_UP_REROUTE_PRIORITY_SETTING, - this::setFollowUpRerouteTaskPriority); - transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ThreadPool.Names.SAME, StartedShardEntry::new, new ShardStartedTransportHandler(clusterService, - new ShardStartedClusterStateTaskExecutor(allocationService, rerouteService, () -> followUpRerouteTaskPriority, logger), + new ShardStartedClusterStateTaskExecutor(allocationService, rerouteService, logger), logger)); transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ThreadPool.Names.SAME, FailedShardEntry::new, new ShardFailedTransportHandler(clusterService, - new ShardFailedClusterStateTaskExecutor(allocationService, rerouteService, () -> followUpRerouteTaskPriority, logger), + new ShardFailedClusterStateTaskExecutor(allocationService, rerouteService, logger), logger)); } @@ -248,10 +218,6 @@ public void onTimeout(TimeValue timeout) { }, changePredicate); } - private void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) { - this.followUpRerouteTaskPriority = followUpRerouteTaskPriority; - } - private static class ShardFailedTransportHandler implements TransportRequestHandler { private final ClusterService clusterService; private final ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor; @@ -319,14 +285,11 @@ public static class ShardFailedClusterStateTaskExecutor implements ClusterStateT private final AllocationService allocationService; private final RerouteService rerouteService; private final Logger logger; - private final Supplier prioritySupplier; - public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService, - Supplier prioritySupplier, Logger logger) { + public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService, Logger logger) { this.allocationService = allocationService; this.rerouteService = rerouteService; this.logger = logger; - this.prioritySupplier = prioritySupplier; } @Override @@ -420,7 +383,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { // assign it again, even if that means putting it back on the node on which it previously failed: final String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards); logger.trace("{}, scheduling a reroute", reason); - rerouteService.reroute(reason, prioritySupplier.get(), ActionListener.wrap( + rerouteService.reroute(reason, Priority.NORMAL, ActionListener.wrap( r -> logger.trace("{}, reroute completed", reason), e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e))); } @@ -552,14 +515,11 @@ public static class ShardStartedClusterStateTaskExecutor private final AllocationService allocationService; private final Logger logger; private final RerouteService rerouteService; - private final Supplier prioritySupplier; - public ShardStartedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService, - Supplier prioritySupplier, Logger logger) { + public ShardStartedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService, Logger logger) { this.allocationService = allocationService; this.logger = logger; this.rerouteService = rerouteService; - this.prioritySupplier = prioritySupplier; } @Override @@ -637,7 +597,7 @@ public void onFailure(String source, Exception e) { @Override public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { - rerouteService.reroute("reroute after starting shards", prioritySupplier.get(), ActionListener.wrap( + rerouteService.reroute("reroute after starting shards", Priority.NORMAL, ActionListener.wrap( r -> logger.trace("reroute after starting shards succeeded"), e -> logger.debug("reroute after starting shards failed", e))); } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 88b309f7026c6..867b628a5f97c 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -31,7 +31,6 @@ import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; -import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.coordination.ClusterBootstrapService; import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper; import org.elasticsearch.cluster.coordination.Coordinator; @@ -214,7 +213,6 @@ public void apply(Settings value, Settings current, Settings previous) { DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING, - ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING, InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING, DestructiveOperations.REQUIRES_NAME_SETTING, diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java index da31791413b57..54f080aeef2b8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java @@ -41,7 +41,6 @@ import org.elasticsearch.cluster.routing.allocation.FailedShard; import org.elasticsearch.cluster.routing.allocation.StaleShard; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; @@ -87,7 +86,7 @@ public void setUp() throws Exception { .build(); clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .metaData(metaData).routingTable(routingTable).build(); - executor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, () -> Priority.NORMAL, logger); + executor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger); } public void testEmptyTaskListProducesSameClusterState() throws Exception { @@ -119,7 +118,7 @@ public void testTriviallySuccessfulTasksBatchedWithFailingTasks() throws Excepti List failingTasks = createExistingShards(currentState, reason); List nonExistentTasks = createNonExistentShards(currentState, reason); ShardStateAction.ShardFailedClusterStateTaskExecutor failingExecutor = - new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, () -> Priority.NORMAL, logger) { + new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger) { @Override ClusterState applyFailedShards(ClusterState currentState, List failedShards, List staleShards) { throw new RuntimeException("simulated applyFailedShards failure"); diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java index 8b68516f61a96..51ee06b0f3e0e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java @@ -66,7 +66,7 @@ public void setUp() throws Exception { .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), Integer.MAX_VALUE) .build()); executor = new ShardStateAction.ShardStartedClusterStateTaskExecutor(allocationService, - ShardStartedClusterStateTaskExecutorTests::neverReroutes, () -> Priority.NORMAL, logger); + ShardStartedClusterStateTaskExecutorTests::neverReroutes, logger); } public void testEmptyTaskListProducesSameClusterState() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionIT.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionIT.java deleted file mode 100644 index 02bea81d484e1..0000000000000 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionIT.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.cluster.action.shard; - -import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Priority; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.transport.MockTransportService; - -import java.util.Collection; -import java.util.Collections; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; - -public class ShardStateActionIT extends ESIntegTestCase { - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - final Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal)); - if (randomBoolean()) { - builder.put(ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey(), randomPriority()); - } - return builder.build(); - } - - @Override - protected Collection> nodePlugins() { - return Collections.singletonList(MockTransportService.TestPlugin.class); - } - - public void testFollowupRerouteAlwaysOccursEventually() { - // Shows that no matter how cluster.routing.allocation.shard_state.reroute.priority is set, a follow-up reroute eventually occurs. - // Can be removed when this setting is removed, as we copiously test the default case. - - internalCluster().ensureAtLeastNumDataNodes(2); - - if (randomBoolean()) { - assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() - .put(ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey(), randomPriority()))); - } - - createIndex("test"); - final ClusterHealthResponse clusterHealthResponse - = client().admin().cluster().prepareHealth().setWaitForNoInitializingShards(true).setWaitForEvents(Priority.LANGUID).get(); - assertFalse(clusterHealthResponse.isTimedOut()); - assertThat(clusterHealthResponse.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - - assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() - .putNull(ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey()))); - } - - public void testFollowupRerouteCanBeSetToHigherPriority() { - // Shows that in a cluster under unbearable pressure we can still assign replicas (for now at least) by setting - // cluster.routing.allocation.shard_state.reroute.priority to a higher priority. Can be removed when this setting is removed, as - // we should at that point be confident that the default priority is appropriate for all clusters. - - internalCluster().ensureAtLeastNumDataNodes(2); - - assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() - .put(ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey(), "urgent"))); - - // ensure that the master always has a HIGH priority pending task - final AtomicBoolean stopSpammingMaster = new AtomicBoolean(); - final ClusterService masterClusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName()); - masterClusterService.submitStateUpdateTask("spam", - new ClusterStateUpdateTask(Priority.HIGH) { - @Override - public ClusterState execute(ClusterState currentState) { - return currentState; - } - - @Override - public void onFailure(String source, Exception e) { - throw new AssertionError(source, e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (stopSpammingMaster.get() == false) { - masterClusterService.submitStateUpdateTask("spam", this); - } - } - }); - - // even with the master under such pressure, all shards of the index can be assigned; in particular, after the primaries have - // started there's a follow-up reroute at a higher priority than the spam - createIndex("test"); - assertFalse(client().admin().cluster().prepareHealth().setWaitForGreenStatus().get().isTimedOut()); - - stopSpammingMaster.set(true); - assertFalse(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get().isTimedOut()); - - assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() - .putNull(ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey()))); - } - - public void testFollowupRerouteRejectsInvalidPriorities() { - final String invalidPriority = randomFrom("IMMEDIATE", "LOW", "LANGUID"); - final ActionFuture responseFuture = client().admin().cluster().prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey(), invalidPriority)) - .execute(); - assertThat(expectThrows(IllegalArgumentException.class, responseFuture::actionGet).getMessage(), - allOf(containsString(invalidPriority), containsString(ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey()))); - } - - private String randomPriority() { - return randomFrom("normal", "high", "urgent", "NORMAL", "HIGH", "URGENT"); - // not "languid" (because we use that to wait for no pending tasks) nor "low" or "immediate" (because these are unreasonable) - } - -} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/InSyncAllocationIdTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/InSyncAllocationIdTests.java index e5af47119cc87..8ab5d7bcbade0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/InSyncAllocationIdTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/InSyncAllocationIdTests.java @@ -32,7 +32,6 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.junit.Before; @@ -59,7 +58,7 @@ public class InSyncAllocationIdTests extends ESAllocationTestCase { public void setupAllocationService() { allocation = createAllocationService(); failedClusterStateTaskExecutor - = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocation, null, () -> Priority.NORMAL, logger); + = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocation, null, logger); } public void testInSyncAllocationIdsUpdated() { @@ -165,7 +164,7 @@ public void testDeadNodesBeforeReplicaFailed() throws Exception { logger.info("fail replica (for which there is no shard routing in the CS anymore)"); assertNull(clusterState.getRoutingNodes().getByAllocationId(replicaShard.shardId(), replicaShard.allocationId().getId())); ShardStateAction.ShardFailedClusterStateTaskExecutor failedClusterStateTaskExecutor = - new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocation, null, () -> Priority.NORMAL, logger); + new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocation, null, logger); long primaryTerm = clusterState.metaData().index("test").primaryTerm(0); clusterState = failedClusterStateTaskExecutor.execute(clusterState, Arrays.asList( new FailedShardEntry(shardRoutingTable.shardId(), replicaShard.allocationId().getId(), primaryTerm, "dummy", null, true)) diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 6e04f03e45641..273a956170354 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -77,7 +77,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -148,9 +147,9 @@ public ClusterStateChanges(NamedXContentRegistry xContentRegistry, ThreadPool th new TestGatewayAllocator(), new BalancedShardsAllocator(SETTINGS), EmptyClusterInfoService.INSTANCE); shardFailedClusterStateTaskExecutor - = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, () -> Priority.NORMAL, logger); + = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger); shardStartedClusterStateTaskExecutor - = new ShardStateAction.ShardStartedClusterStateTaskExecutor(allocationService, null, () -> Priority.NORMAL, logger); + = new ShardStateAction.ShardStartedClusterStateTaskExecutor(allocationService, null, logger); ActionFilters actionFilters = new ActionFilters(Collections.emptySet()); IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(); DestructiveOperations destructiveOperations = new DestructiveOperations(SETTINGS, clusterSettings);