Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -387,14 +387,14 @@ public void failShardIfNeeded(ShardRouting replica, String message, Exception ex

logger.warn((org.apache.logging.log4j.util.Supplier<?>)
() -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception,
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception,
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}

@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, "mark copy as stale", null,
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null,
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ protected ClusterState buildResultAndLogHealthChange(ClusterState oldState, Rout
}

// Used for testing
public ClusterState applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
return applyFailedShards(clusterState, singletonList(new FailedShard(failedShard, null, null)), emptyList());
public ClusterState applyFailedShard(ClusterState clusterState, ShardRouting failedShard, boolean markAsStale) {
return applyFailedShards(clusterState, singletonList(new FailedShard(failedShard, null, null, markAsStale)), emptyList());
}

// Used for testing
Expand Down Expand Up @@ -185,6 +185,9 @@ public ClusterState applyFailedShards(final ClusterState clusterState, final Lis
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, message,
failedShardEntry.getFailure(), failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false,
AllocationStatus.NO_ATTEMPT);
if (failedShardEntry.markAsStale()) {
allocation.removeAllocationId(failedShard);
}
routingNodes.failShard(logger, failedShard, unassignedInfo, indexMetaData, allocation.changes());
} else {
logger.trace("{} shard routing failed in an earlier iteration (routing: {})", shardToFail.shardId(), shardToFail);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,20 @@ public class FailedShard {
private final ShardRouting routingEntry;
private final String message;
private final Exception failure;
private final boolean markAsStale;

public FailedShard(ShardRouting routingEntry, String message, Exception failure) {
public FailedShard(ShardRouting routingEntry, String message, Exception failure, boolean markAsStale) {
assert routingEntry.assignedToNode() : "only assigned shards can be failed " + routingEntry;
this.routingEntry = routingEntry;
this.message = message;
this.failure = failure;
this.markAsStale = markAsStale;
}

@Override
public String toString() {
return "failed shard, shard " + routingEntry + ", message [" + message + "], failure [" +
ExceptionsHelper.detailedMessage(failure) + "]";
ExceptionsHelper.detailedMessage(failure) + "], markAsStale [" + markAsStale + "]";
}

/**
Expand All @@ -66,4 +68,11 @@ public String getMessage() {
public Exception getFailure() {
return failure;
}

/**
* Whether or not to mark the shard as stale (eg. removing from in-sync set) when failing the shard.
*/
public boolean markAsStale() {
return markAsStale;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,12 @@ public void shardStarted(ShardRouting initializingShard, ShardRouting startedSha

@Override
public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) {
if (failedShard.active() && unassignedInfo.getReason() != UnassignedInfo.Reason.NODE_LEFT) {
removeAllocationId(failedShard);

if (failedShard.primary()) {
Updates updates = changes(failedShard.shardId());
if (updates.firstFailedPrimary == null) {
// more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...)
updates.firstFailedPrimary = failedShard;
}
}
}

if (failedShard.active() && failedShard.primary()) {
Updates updates = changes(failedShard.shardId());
if (updates.firstFailedPrimary == null) {
// more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...)
updates.firstFailedPrimary = failedShard;
}
increasePrimaryTerm(failedShard.shardId());
}
}
Expand Down Expand Up @@ -286,8 +279,10 @@ private Updates changes(ShardId shardId) {
/**
* Remove allocation id of this shard from the set of in-sync shard copies
*/
private void removeAllocationId(ShardRouting shardRouting) {
changes(shardRouting.shardId()).removedAllocationIds.add(shardRouting.allocationId().getId());
void removeAllocationId(ShardRouting shardRouting) {
if (shardRouting.active()) {
changes(shardRouting.shardId()).removedAllocationIds.add(shardRouting.allocationId().getId());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand Down Expand Up @@ -222,6 +223,13 @@ public Set<String> getIgnoreNodes(ShardId shardId) {
return unmodifiableSet(new HashSet<>(ignore));
}

/**
* Remove the allocation id of the provided shard from the set of in-sync shard copies
*/
public void removeAllocationId(ShardRouting shardRouting) {
indexMetaDataUpdater.removeAllocationId(shardRouting);
}

/**
* Returns observer to use for changes made to the routing nodes
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
}
routingNodes.failShard(Loggers.getLogger(CancelAllocationCommand.class), shardRouting,
new UnassignedInfo(UnassignedInfo.Reason.REROUTE_CANCELLED, null), indexMetaData, allocation.changes());
// TODO: We don't have to remove a cancelled shard from in-sync set once we have a strict resync implementation.
allocation.removeAllocationId(shardRouting);
return new RerouteExplanation(this, allocation.decision(Decision.YES, "cancel_allocation_command",
"shard " + shardId + " on node " + discoNode + " can be cancelled"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void onFailure(Exception e) {
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), i);
List<FailedShard> failedShards = Collections.singletonList(
new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i,
new UnsupportedOperationException()));
new UnsupportedOperationException(), randomBoolean()));
newState = allocationService.applyFailedShards(clusterState, failedShards);
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException {
// A write replication action proxy should fail the shard
assertEquals(1, shardFailedRequests.length);
CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0];
ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) shardFailedRequest.request;
ShardStateAction.FailedShardEntry shardEntry = (ShardStateAction.FailedShardEntry) shardFailedRequest.request;
// the shard the request was sent to and the shard to be failed should be the same
assertEquals(shardEntry.getShardId(), replica.shardId());
assertEquals(shardEntry.getAllocationId(), replica.allocationId().getId());
Expand Down
Loading