Skip to content

Commit 965efa5

Browse files
authored
Allows failing shards without marking as stale (#28054)
Currently when failing a shard we also mark it as stale (eg. remove its allocationId from from the InSync set). However in some cases, we need to be able to fail shards but keep them InSync set. This commit adds such capacity. This is a preparatory change to make the primary-replica resync less lenient. Relates #24841
1 parent 13083e2 commit 965efa5

21 files changed

+328
-182
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -387,14 +387,14 @@ public void failShardIfNeeded(ShardRouting replica, String message, Exception ex
387387

388388
logger.warn((org.apache.logging.log4j.util.Supplier<?>)
389389
() -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
390-
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception,
390+
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception,
391391
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
392392
}
393393

394394
@Override
395395
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
396396
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
397-
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, "mark copy as stale", null,
397+
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null,
398398
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
399399
}
400400

server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 128 additions & 78 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,8 @@ protected ClusterState buildResultAndLogHealthChange(ClusterState oldState, Rout
138138
}
139139

140140
// Used for testing
141-
public ClusterState applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
142-
return applyFailedShards(clusterState, singletonList(new FailedShard(failedShard, null, null)), emptyList());
141+
public ClusterState applyFailedShard(ClusterState clusterState, ShardRouting failedShard, boolean markAsStale) {
142+
return applyFailedShards(clusterState, singletonList(new FailedShard(failedShard, null, null, markAsStale)), emptyList());
143143
}
144144

145145
// Used for testing
@@ -185,6 +185,9 @@ public ClusterState applyFailedShards(final ClusterState clusterState, final Lis
185185
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, message,
186186
failedShardEntry.getFailure(), failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false,
187187
AllocationStatus.NO_ATTEMPT);
188+
if (failedShardEntry.markAsStale()) {
189+
allocation.removeAllocationId(failedShard);
190+
}
188191
routingNodes.failShard(logger, failedShard, unassignedInfo, indexMetaData, allocation.changes());
189192
} else {
190193
logger.trace("{} shard routing failed in an earlier iteration (routing: {})", shardToFail.shardId(), shardToFail);

server/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedShard.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,20 @@ public class FailedShard {
3030
private final ShardRouting routingEntry;
3131
private final String message;
3232
private final Exception failure;
33+
private final boolean markAsStale;
3334

34-
public FailedShard(ShardRouting routingEntry, String message, Exception failure) {
35+
public FailedShard(ShardRouting routingEntry, String message, Exception failure, boolean markAsStale) {
3536
assert routingEntry.assignedToNode() : "only assigned shards can be failed " + routingEntry;
3637
this.routingEntry = routingEntry;
3738
this.message = message;
3839
this.failure = failure;
40+
this.markAsStale = markAsStale;
3941
}
4042

4143
@Override
4244
public String toString() {
4345
return "failed shard, shard " + routingEntry + ", message [" + message + "], failure [" +
44-
ExceptionsHelper.detailedMessage(failure) + "]";
46+
ExceptionsHelper.detailedMessage(failure) + "], markAsStale [" + markAsStale + "]";
4547
}
4648

4749
/**
@@ -66,4 +68,11 @@ public String getMessage() {
6668
public Exception getFailure() {
6769
return failure;
6870
}
71+
72+
/**
73+
* Whether or not to mark the shard as stale (eg. removing from in-sync set) when failing the shard.
74+
*/
75+
public boolean markAsStale() {
76+
return markAsStale;
77+
}
6978
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -72,19 +72,12 @@ public void shardStarted(ShardRouting initializingShard, ShardRouting startedSha
7272

7373
@Override
7474
public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) {
75-
if (failedShard.active() && unassignedInfo.getReason() != UnassignedInfo.Reason.NODE_LEFT) {
76-
removeAllocationId(failedShard);
77-
78-
if (failedShard.primary()) {
79-
Updates updates = changes(failedShard.shardId());
80-
if (updates.firstFailedPrimary == null) {
81-
// more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...)
82-
updates.firstFailedPrimary = failedShard;
83-
}
84-
}
85-
}
86-
8775
if (failedShard.active() && failedShard.primary()) {
76+
Updates updates = changes(failedShard.shardId());
77+
if (updates.firstFailedPrimary == null) {
78+
// more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...)
79+
updates.firstFailedPrimary = failedShard;
80+
}
8881
increasePrimaryTerm(failedShard.shardId());
8982
}
9083
}
@@ -286,8 +279,10 @@ private Updates changes(ShardId shardId) {
286279
/**
287280
* Remove allocation id of this shard from the set of in-sync shard copies
288281
*/
289-
private void removeAllocationId(ShardRouting shardRouting) {
290-
changes(shardRouting.shardId()).removedAllocationIds.add(shardRouting.allocationId().getId());
282+
void removeAllocationId(ShardRouting shardRouting) {
283+
if (shardRouting.active()) {
284+
changes(shardRouting.shardId()).removedAllocationIds.add(shardRouting.allocationId().getId());
285+
}
291286
}
292287

293288
/**

server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
2828
import org.elasticsearch.cluster.routing.RoutingNodes;
2929
import org.elasticsearch.cluster.routing.RoutingTable;
30+
import org.elasticsearch.cluster.routing.ShardRouting;
3031
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
3132
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
3233
import org.elasticsearch.common.collect.ImmutableOpenMap;
@@ -222,6 +223,13 @@ public Set<String> getIgnoreNodes(ShardId shardId) {
222223
return unmodifiableSet(new HashSet<>(ignore));
223224
}
224225

226+
/**
227+
* Remove the allocation id of the provided shard from the set of in-sync shard copies
228+
*/
229+
public void removeAllocationId(ShardRouting shardRouting) {
230+
indexMetaDataUpdater.removeAllocationId(shardRouting);
231+
}
232+
225233
/**
226234
* Returns observer to use for changes made to the routing nodes
227235
*/

server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,8 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
156156
}
157157
routingNodes.failShard(Loggers.getLogger(CancelAllocationCommand.class), shardRouting,
158158
new UnassignedInfo(UnassignedInfo.Reason.REROUTE_CANCELLED, null), indexMetaData, allocation.changes());
159+
// TODO: We don't have to remove a cancelled shard from in-sync set once we have a strict resync implementation.
160+
allocation.removeAllocationId(shardRouting);
159161
return new RerouteExplanation(this, allocation.decision(Decision.YES, "cancel_allocation_command",
160162
"shard " + shardId + " on node " + discoNode + " can be cancelled"));
161163
}

server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public void onFailure(Exception e) {
120120
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), i);
121121
List<FailedShard> failedShards = Collections.singletonList(
122122
new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i,
123-
new UnsupportedOperationException()));
123+
new UnsupportedOperationException(), randomBoolean()));
124124
newState = allocationService.applyFailedShards(clusterState, failedShards);
125125
assertThat(newState, not(equalTo(clusterState)));
126126
clusterState = newState;

server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException {
315315
// A write replication action proxy should fail the shard
316316
assertEquals(1, shardFailedRequests.length);
317317
CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0];
318-
ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) shardFailedRequest.request;
318+
ShardStateAction.FailedShardEntry shardEntry = (ShardStateAction.FailedShardEntry) shardFailedRequest.request;
319319
// the shard the request was sent to and the shard to be failed should be the same
320320
assertEquals(shardEntry.getShardId(), replica.shardId());
321321
assertEquals(shardEntry.getAllocationId(), replica.allocationId().getId());

0 commit comments

Comments
 (0)