Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
Expand All @@ -71,6 +72,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.Supplier;

public class ShardStateAction {

Expand All @@ -79,10 +81,34 @@ public class ShardStateAction {
public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started";
public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure";

/**
* Adjusts the priority of the followup reroute task. NORMAL is right for reasonable clusters, but in a badly configured cluster it may
* be necessary to raise this higher to recover the older behaviour of rerouting after processing every shard-started task. Deliberately
* undocumented, since this is a last-resort escape hatch for experts rather than something we want to expose to anyone, and deprecated
* since we will remove it once we have confirmed from experience that this priority is appropriate in all cases.
*/
public static final Setting<Priority> FOLLOW_UP_REROUTE_PRIORITY_SETTING
= new Setting<>("cluster.routing.allocation.shard_state.reroute.priority", Priority.NORMAL.toString(),
ShardStateAction::parseReroutePriority, Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated);

private static Priority parseReroutePriority(String priorityString) {
final Priority priority = Priority.valueOf(priorityString.toUpperCase(Locale.ROOT));
switch (priority) {
case NORMAL:
case HIGH:
case URGENT:
return priority;
}
throw new IllegalArgumentException(
"priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]");
}

private final TransportService transportService;
private final ClusterService clusterService;
private final ThreadPool threadPool;

private volatile Priority followUpRerouteTaskPriority;

// a list of shards that failed during replication
// we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard.
private final TransportRequestDeduplicator<FailedShardEntry> remoteFailedShardsDeduplicator = new TransportRequestDeduplicator<>();
Expand All @@ -94,11 +120,18 @@ public ShardStateAction(ClusterService clusterService, TransportService transpor
this.clusterService = clusterService;
this.threadPool = threadPool;

followUpRerouteTaskPriority = FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(FOLLOW_UP_REROUTE_PRIORITY_SETTING,
this::setFollowUpRerouteTaskPriority);

transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ThreadPool.Names.SAME, StartedShardEntry::new,
new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger));
new ShardStartedTransportHandler(clusterService,
new ShardStartedClusterStateTaskExecutor(allocationService, rerouteService, () -> followUpRerouteTaskPriority, logger),
logger));
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ThreadPool.Names.SAME, FailedShardEntry::new,
new ShardFailedTransportHandler(clusterService,
new ShardFailedClusterStateTaskExecutor(allocationService, rerouteService, logger), logger));
new ShardFailedClusterStateTaskExecutor(allocationService, rerouteService, () -> followUpRerouteTaskPriority, logger),
logger));
}

private void sendShardAction(final String actionName, final ClusterState currentState,
Expand Down Expand Up @@ -215,6 +248,10 @@ public void onTimeout(TimeValue timeout) {
}, changePredicate);
}

private void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) {
this.followUpRerouteTaskPriority = followUpRerouteTaskPriority;
}

private static class ShardFailedTransportHandler implements TransportRequestHandler<FailedShardEntry> {
private final ClusterService clusterService;
private final ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor;
Expand Down Expand Up @@ -282,11 +319,14 @@ public static class ShardFailedClusterStateTaskExecutor implements ClusterStateT
private final AllocationService allocationService;
private final RerouteService rerouteService;
private final Logger logger;
private final Supplier<Priority> prioritySupplier;

public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService, Logger logger) {
public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService,
Supplier<Priority> prioritySupplier, Logger logger) {
this.allocationService = allocationService;
this.rerouteService = rerouteService;
this.logger = logger;
this.prioritySupplier = prioritySupplier;
}

@Override
Expand Down Expand Up @@ -380,7 +420,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
// assign it again, even if that means putting it back on the node on which it previously failed:
final String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards);
logger.trace("{}, scheduling a reroute", reason);
rerouteService.reroute(reason, Priority.HIGH, ActionListener.wrap(
rerouteService.reroute(reason, prioritySupplier.get(), ActionListener.wrap(
r -> logger.trace("{}, reroute completed", reason),
e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e)));
}
Expand Down Expand Up @@ -511,10 +551,15 @@ public static class ShardStartedClusterStateTaskExecutor
implements ClusterStateTaskExecutor<StartedShardEntry>, ClusterStateTaskListener {
private final AllocationService allocationService;
private final Logger logger;
private final RerouteService rerouteService;
private final Supplier<Priority> prioritySupplier;

public ShardStartedClusterStateTaskExecutor(AllocationService allocationService, Logger logger) {
public ShardStartedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService,
Supplier<Priority> prioritySupplier, Logger logger) {
this.allocationService = allocationService;
this.logger = logger;
this.rerouteService = rerouteService;
this.prioritySupplier = prioritySupplier;
}

@Override
Expand Down Expand Up @@ -589,6 +634,13 @@ public void onFailure(String source, Exception e) {
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
}
}

@Override
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
rerouteService.reroute("reroute after starting shards", prioritySupplier.get(), ActionListener.wrap(
r -> logger.trace("reroute after starting shards succeeded"),
e -> logger.debug("reroute after starting shards failed", e)));
}
}

public static class StartedShardEntry extends TransportRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRout
Collections.sort(startedShards, Comparator.comparing(ShardRouting::primary));
applyStartedShards(allocation, startedShards);
gatewayAllocator.applyStartedShards(allocation, startedShards);
reroute(allocation);
assert RoutingNodes.assertShardStats(allocation.routingNodes());
String startedShardsAsString
= firstListElementsToCommaDelimitedString(startedShards, s -> s.shardId().toString(), logger.isDebugEnabled());
return buildResultAndLogHealthChange(clusterState, allocation, "shards started [" + startedShardsAsString + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper;
import org.elasticsearch.cluster.coordination.Coordinator;
Expand Down Expand Up @@ -213,6 +214,7 @@ public void apply(Settings value, Settings current, Settings previous) {
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING,
SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING,
ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING,
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING,
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING,
DestructiveOperations.REQUIRES_NAME_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand All @@ -33,7 +34,6 @@
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
Expand Down Expand Up @@ -114,8 +114,7 @@ public void testErrorCondition() {
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// now we start the shard
routingTable = service.applyStartedShards(clusterState,
routingTable.index("source").shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
routingTable = ESAllocationTestCase.startInitializingShardsAndReroute(service, clusterState, "source").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

TransportResizeAction.prepareCreateIndexRequest(new ResizeRequest("target", "source"), clusterState,
Expand All @@ -133,8 +132,7 @@ public void testPassNumRoutingShards() {
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// now we start the shard
routingTable = service.applyStartedShards(clusterState,
routingTable.index("source").shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
routingTable = ESAllocationTestCase.startInitializingShardsAndReroute(service, clusterState, "source").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

ResizeRequest resizeRequest = new ResizeRequest("target", "source");
Expand Down Expand Up @@ -163,8 +161,7 @@ public void testPassNumRoutingShardsAndFail() {
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// now we start the shard
routingTable = service.applyStartedShards(clusterState,
routingTable.index("source").shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
routingTable = ESAllocationTestCase.startInitializingShardsAndReroute(service, clusterState, "source").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

ResizeRequest resizeRequest = new ResizeRequest("target", "source");
Expand Down Expand Up @@ -198,8 +195,7 @@ public void testShrinkIndexSettings() {
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// now we start the shard
routingTable = service.applyStartedShards(clusterState,
routingTable.index(indexName).shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
routingTable = ESAllocationTestCase.startInitializingShardsAndReroute(service, clusterState, indexName).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
int numSourceShards = clusterState.metaData().index(indexName).getNumberOfShards();
DocsStats stats = new DocsStats(between(0, (IndexWriter.MAX_DOCS) / numSourceShards), between(1, 1000), between(1, 10000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ public void testNoRerouteOnStaleClusterState() {
ShardRouting relocationTarget = clusterService.state().getRoutingTable().shardRoutingTable(shardId)
.shardsWithState(ShardRoutingState.INITIALIZING).get(0);
AllocationService allocationService = ESAllocationTestCase.createAllocationService();
ClusterState updatedState = allocationService.applyStartedShards(state, Collections.singletonList(relocationTarget));
ClusterState updatedState = ESAllocationTestCase.startShardsAndReroute(allocationService, state, relocationTarget);

setState(clusterService, updatedState);
logger.debug("--> relocation complete state:\n{}", clusterService.state());
Expand Down
Loading