Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
Expand All @@ -72,7 +71,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.Supplier;

public class ShardStateAction {

Expand All @@ -81,34 +79,10 @@ public class ShardStateAction {
public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started";
public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure";

/**
* Adjusts the priority of the followup reroute task. NORMAL is right for reasonable clusters, but in a badly configured cluster it may
* be necessary to raise this higher to recover the older behaviour of rerouting after processing every shard-started task. Deliberately
* undocumented, since this is a last-resort escape hatch for experts rather than something we want to expose to anyone, and deprecated
* since we will remove it once we have confirmed from experience that this priority is appropriate in all cases.
*/
public static final Setting<Priority> FOLLOW_UP_REROUTE_PRIORITY_SETTING
= new Setting<>("cluster.routing.allocation.shard_state.reroute.priority", Priority.NORMAL.toString(),
ShardStateAction::parseReroutePriority, Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated);

private static Priority parseReroutePriority(String priorityString) {
final Priority priority = Priority.valueOf(priorityString.toUpperCase(Locale.ROOT));
switch (priority) {
case NORMAL:
case HIGH:
case URGENT:
return priority;
}
throw new IllegalArgumentException(
"priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]");
}

private final TransportService transportService;
private final ClusterService clusterService;
private final ThreadPool threadPool;

private volatile Priority followUpRerouteTaskPriority;

// a list of shards that failed during replication
// we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard.
private final TransportRequestDeduplicator<FailedShardEntry> remoteFailedShardsDeduplicator = new TransportRequestDeduplicator<>();
Expand All @@ -120,17 +94,13 @@ public ShardStateAction(ClusterService clusterService, TransportService transpor
this.clusterService = clusterService;
this.threadPool = threadPool;

followUpRerouteTaskPriority = FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(FOLLOW_UP_REROUTE_PRIORITY_SETTING,
this::setFollowUpRerouteTaskPriority);

transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ThreadPool.Names.SAME, StartedShardEntry::new,
new ShardStartedTransportHandler(clusterService,
new ShardStartedClusterStateTaskExecutor(allocationService, rerouteService, () -> followUpRerouteTaskPriority, logger),
new ShardStartedClusterStateTaskExecutor(allocationService, rerouteService, logger),
logger));
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ThreadPool.Names.SAME, FailedShardEntry::new,
new ShardFailedTransportHandler(clusterService,
new ShardFailedClusterStateTaskExecutor(allocationService, rerouteService, () -> followUpRerouteTaskPriority, logger),
new ShardFailedClusterStateTaskExecutor(allocationService, rerouteService, logger),
logger));
}

Expand Down Expand Up @@ -248,10 +218,6 @@ public void onTimeout(TimeValue timeout) {
}, changePredicate);
}

private void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) {
this.followUpRerouteTaskPriority = followUpRerouteTaskPriority;
}

private static class ShardFailedTransportHandler implements TransportRequestHandler<FailedShardEntry> {
private final ClusterService clusterService;
private final ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor;
Expand Down Expand Up @@ -319,14 +285,11 @@ public static class ShardFailedClusterStateTaskExecutor implements ClusterStateT
private final AllocationService allocationService;
private final RerouteService rerouteService;
private final Logger logger;
private final Supplier<Priority> prioritySupplier;

public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService,
Supplier<Priority> prioritySupplier, Logger logger) {
public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService, Logger logger) {
this.allocationService = allocationService;
this.rerouteService = rerouteService;
this.logger = logger;
this.prioritySupplier = prioritySupplier;
}

@Override
Expand Down Expand Up @@ -420,7 +383,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
// assign it again, even if that means putting it back on the node on which it previously failed:
final String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards);
logger.trace("{}, scheduling a reroute", reason);
rerouteService.reroute(reason, prioritySupplier.get(), ActionListener.wrap(
rerouteService.reroute(reason, Priority.NORMAL, ActionListener.wrap(
r -> logger.trace("{}, reroute completed", reason),
e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e)));
}
Expand Down Expand Up @@ -552,14 +515,11 @@ public static class ShardStartedClusterStateTaskExecutor
private final AllocationService allocationService;
private final Logger logger;
private final RerouteService rerouteService;
private final Supplier<Priority> prioritySupplier;

public ShardStartedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService,
Supplier<Priority> prioritySupplier, Logger logger) {
public ShardStartedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService, Logger logger) {
this.allocationService = allocationService;
this.logger = logger;
this.rerouteService = rerouteService;
this.prioritySupplier = prioritySupplier;
}

@Override
Expand Down Expand Up @@ -637,7 +597,7 @@ public void onFailure(String source, Exception e) {

@Override
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
rerouteService.reroute("reroute after starting shards", prioritySupplier.get(), ActionListener.wrap(
rerouteService.reroute("reroute after starting shards", Priority.NORMAL, ActionListener.wrap(
r -> logger.trace("reroute after starting shards succeeded"),
e -> logger.debug("reroute after starting shards failed", e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper;
import org.elasticsearch.cluster.coordination.Coordinator;
Expand Down Expand Up @@ -214,7 +213,6 @@ public void apply(Settings value, Settings current, Settings previous) {
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING,
SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING,
ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING,
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING,
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING,
DestructiveOperations.REQUIRES_NAME_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.elasticsearch.cluster.routing.allocation.FailedShard;
import org.elasticsearch.cluster.routing.allocation.StaleShard;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -87,7 +86,7 @@ public void setUp() throws Exception {
.build();
clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData).routingTable(routingTable).build();
executor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, () -> Priority.NORMAL, logger);
executor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger);
}

public void testEmptyTaskListProducesSameClusterState() throws Exception {
Expand Down Expand Up @@ -119,7 +118,7 @@ public void testTriviallySuccessfulTasksBatchedWithFailingTasks() throws Excepti
List<FailedShardEntry> failingTasks = createExistingShards(currentState, reason);
List<FailedShardEntry> nonExistentTasks = createNonExistentShards(currentState, reason);
ShardStateAction.ShardFailedClusterStateTaskExecutor failingExecutor =
new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, () -> Priority.NORMAL, logger) {
new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger) {
@Override
ClusterState applyFailedShards(ClusterState currentState, List<FailedShard> failedShards, List<StaleShard> staleShards) {
throw new RuntimeException("simulated applyFailedShards failure");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void setUp() throws Exception {
.put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), Integer.MAX_VALUE)
.build());
executor = new ShardStateAction.ShardStartedClusterStateTaskExecutor(allocationService,
ShardStartedClusterStateTaskExecutorTests::neverReroutes, () -> Priority.NORMAL, logger);
ShardStartedClusterStateTaskExecutorTests::neverReroutes, logger);
}

public void testEmptyTaskListProducesSameClusterState() throws Exception {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.junit.Before;
Expand All @@ -59,7 +58,7 @@ public class InSyncAllocationIdTests extends ESAllocationTestCase {
public void setupAllocationService() {
allocation = createAllocationService();
failedClusterStateTaskExecutor
= new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocation, null, () -> Priority.NORMAL, logger);
= new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocation, null, logger);
}

public void testInSyncAllocationIdsUpdated() {
Expand Down Expand Up @@ -165,7 +164,7 @@ public void testDeadNodesBeforeReplicaFailed() throws Exception {
logger.info("fail replica (for which there is no shard routing in the CS anymore)");
assertNull(clusterState.getRoutingNodes().getByAllocationId(replicaShard.shardId(), replicaShard.allocationId().getId()));
ShardStateAction.ShardFailedClusterStateTaskExecutor failedClusterStateTaskExecutor =
new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocation, null, () -> Priority.NORMAL, logger);
new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocation, null, logger);
long primaryTerm = clusterState.metaData().index("test").primaryTerm(0);
clusterState = failedClusterStateTaskExecutor.execute(clusterState, Arrays.asList(
new FailedShardEntry(shardRoutingTable.shardId(), replicaShard.allocationId().getId(), primaryTerm, "dummy", null, true))
Expand Down
Loading