From 4026c6fb8e697c88537c859f3a3c1bcc53edf00c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 18 Oct 2019 13:59:17 -0400 Subject: [PATCH 01/11] Do not cancel ongoing recovery for noop copy on failed node --- .../cluster/routing/RoutingNodes.java | 4 +- .../cluster/routing/UnassignedInfo.java | 49 ++++++++++++++----- .../routing/allocation/AllocationService.java | 15 ++++-- .../allocator/BalancedShardsAllocator.java | 2 +- ...AllocateEmptyPrimaryAllocationCommand.java | 2 +- .../gateway/ReplicaShardAllocator.java | 8 ++- .../cluster/routing/UnassignedInfoTests.java | 6 ++- ...storeInProgressAllocationDeciderTests.java | 2 +- .../gateway/ReplicaShardAllocatorIT.java | 36 ++++++++++++++ .../gateway/ReplicaShardAllocatorTests.java | 27 ++++++++-- .../indices/recovery/IndexRecoveryIT.java | 1 - 11 files changed, 124 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 4750476805d88..5cf491bb378b1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -547,7 +547,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId assert replicaShard != null : "failed to re-resolve " + routing + " when failing replicas"; UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED, "primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT); + unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT, false); failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetaData, routingChangesObserver); } } @@ -873,7 +873,7 @@ public void ignoreShard(ShardRouting shard, AllocationStatus allocationStatus, R UnassignedInfo newInfo = new UnassignedInfo(currInfo.getReason(), currInfo.getMessage(), currInfo.getFailure(), currInfo.getNumFailedAllocations(), currInfo.getUnassignedTimeInNanos(), currInfo.getUnassignedTimeInMillis(), currInfo.isDelayed(), - allocationStatus); + allocationStatus, false); ShardRouting updatedShard = shard.updateUnassigned(newInfo, shard.recoverySource()); changes.unassignedInfoUpdated(shard, newInfo); shard = updatedShard; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index eaeb6e0402076..ea941ce3f056d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.allocation.decider.Decision; @@ -214,6 +215,7 @@ public String value() { private final Exception failure; private final int failedAllocations; private final AllocationStatus lastAllocationStatus; // result of the last allocation attempt for this shard + private final boolean wasCancelledForNoopRecovery; /** * creates an UnassignedInfo object based on **current** time @@ -223,20 +225,22 @@ public String value() { **/ public UnassignedInfo(Reason reason, String message) { this(reason, message, null, reason == Reason.ALLOCATION_FAILED ? 1 : 0, System.nanoTime(), System.currentTimeMillis(), false, - AllocationStatus.NO_ATTEMPT); + AllocationStatus.NO_ATTEMPT, false); } /** - * @param reason the cause for making this shard unassigned. See {@link Reason} for more information. - * @param message more information about cause. - * @param failure the shard level failure that caused this shard to be unassigned, if exists. - * @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation - * @param unassignedTimeMillis the time of unassignment used to display to in our reporting. - * @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING. - * @param lastAllocationStatus the result of the last allocation attempt for this shard + * @param reason the cause for making this shard unassigned. See {@link Reason} for more information. + * @param message more information about cause. + * @param failure the shard level failure that caused this shard to be unassigned, if exists. + * @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation + * @param unassignedTimeMillis the time of unassignment used to display to in our reporting. + * @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING. + * @param lastAllocationStatus the result of the last allocation attempt for this shard + * @param wasCancelledForNoopRecovery if allocation of this shard was cancelled in favour of a noop recovery */ public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Exception failure, int failedAllocations, - long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed, AllocationStatus lastAllocationStatus) { + long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed, AllocationStatus lastAllocationStatus, + boolean wasCancelledForNoopRecovery) { this.reason = Objects.requireNonNull(reason); this.unassignedTimeMillis = unassignedTimeMillis; this.unassignedTimeNanos = unassignedTimeNanos; @@ -245,10 +249,13 @@ public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Excepti this.failure = failure; this.failedAllocations = failedAllocations; this.lastAllocationStatus = Objects.requireNonNull(lastAllocationStatus); + this.wasCancelledForNoopRecovery = wasCancelledForNoopRecovery; assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED) : "failedAllocations: " + failedAllocations + " for reason " + reason; assert !(message == null && failure != null) : "provide a message if a failure exception is provided"; assert !(delayed && reason != Reason.NODE_LEFT) : "shard can only be delayed if it is unassigned due to a node leaving"; + assert wasCancelledForNoopRecovery == false || reason == Reason.ALLOCATION_FAILED : + "cancelled status was not reset for reason [" + reason + "]"; } public UnassignedInfo(StreamInput in) throws IOException { @@ -262,6 +269,11 @@ public UnassignedInfo(StreamInput in) throws IOException { this.failure = in.readException(); this.failedAllocations = in.readVInt(); this.lastAllocationStatus = AllocationStatus.readFrom(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + this.wasCancelledForNoopRecovery = in.readBoolean(); + } else { + this.wasCancelledForNoopRecovery = false; + } } public void writeTo(StreamOutput out) throws IOException { @@ -273,6 +285,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeException(failure); out.writeVInt(failedAllocations); lastAllocationStatus.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(wasCancelledForNoopRecovery); + } } /** @@ -347,6 +362,13 @@ public AllocationStatus getLastAllocationStatus() { return lastAllocationStatus; } + /** + * @return true if the allocation of this shard was cancelled in favour of a noop recovery + */ + public boolean wasCancelledForNoopRecovery() { + return wasCancelledForNoopRecovery; + } + /** * Calculates the delay left based on current time (in nanoseconds) and the delay defined by the index settings. * Only relevant if shard is effectively delayed (see {@link #isDelayed()}) @@ -432,6 +454,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("details", details); } builder.field("allocation_status", lastAllocationStatus.value()); + builder.field("cancelled_for_noop", wasCancelledForNoopRecovery); builder.endObject(); return builder; } @@ -459,13 +482,16 @@ public boolean equals(Object o) { if (reason != that.reason) { return false; } - if (message != null ? !message.equals(that.message) : that.message != null) { + if (Objects.equals(message, that.message) == false) { return false; } if (lastAllocationStatus != that.lastAllocationStatus) { return false; } - return !(failure != null ? !failure.equals(that.failure) : that.failure != null); + if (Objects.equals(failure, that.failure) == false) { + return false; + } + return wasCancelledForNoopRecovery == that.wasCancelledForNoopRecovery; } @Override @@ -477,6 +503,7 @@ public int hashCode() { result = 31 * result + (message != null ? message.hashCode() : 0); result = 31 * result + (failure != null ? failure.hashCode() : 0); result = 31 * result + lastAllocationStatus.hashCode(); + result = 31 * result + Boolean.hashCode(wasCancelledForNoopRecovery); return result; } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index f158103ed226c..6a9e70126fe5b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -194,10 +194,17 @@ public ClusterState applyFailedShards(final ClusterState clusterState, final Lis shardToFail.shardId(), shardToFail, failedShard); } int failedAllocations = failedShard.unassignedInfo() != null ? failedShard.unassignedInfo().getNumFailedAllocations() : 0; + boolean wasCancelledForNoopRecovery; + if (failedShard.unassignedInfo() != null) { + wasCancelledForNoopRecovery = failedShard.unassignedInfo().wasCancelledForNoopRecovery() + || failedShard.unassignedInfo().getReason() == UnassignedInfo.Reason.REALLOCATED_REPLICA; + } else { + wasCancelledForNoopRecovery = false; + } String message = "failed shard on node [" + shardToFail.currentNodeId() + "]: " + failedShardEntry.getMessage(); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, message, failedShardEntry.getFailure(), failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false, - AllocationStatus.NO_ATTEMPT); + AllocationStatus.NO_ATTEMPT, wasCancelledForNoopRecovery); if (failedShardEntry.markAsStale()) { allocation.removeAllocationId(failedShard); } @@ -289,7 +296,7 @@ private void removeDelayMarkers(RoutingAllocation allocation) { if (newComputedLeftDelayNanos == 0) { unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus()), + unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus(), false), shardRouting.recoverySource(), allocation.changes()); } } @@ -308,7 +315,7 @@ private void resetFailedAllocationCounter(RoutingAllocation allocation) { UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), 0, unassignedInfo.getUnassignedTimeInNanos(), unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), - unassignedInfo.getLastAllocationStatus()), shardRouting.recoverySource(), allocation.changes()); + unassignedInfo.getLastAllocationStatus(), false), shardRouting.recoverySource(), allocation.changes()); } } @@ -421,7 +428,7 @@ private void disassociateDeadNodes(RoutingAllocation allocation) { final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index()); boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0; UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left [" + node.nodeId() + "]", - null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT); + null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT, false); allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData, allocation.changes()); } // its a dead node, remove it, note, its important to remove it *after* we apply failed shard diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index ba92b7a20455c..a8630db1e077f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -152,7 +152,7 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) { if (shardRouting.primary() && unassignedInfo.getLastAllocationStatus() == AllocationStatus.NO_ATTEMPT) { unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), AllocationStatus.DECIDERS_NO), + unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), AllocationStatus.DECIDERS_NO, false), shardRouting.recoverySource(), allocation.changes()); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java index 2e3219e67c7ae..50e45c6e1745f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java @@ -139,7 +139,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) ", " + shardRouting.unassignedInfo().getMessage(); unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY, unassignedInfoMessage, shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis(), false, - shardRouting.unassignedInfo().getLastAllocationStatus()); + shardRouting.unassignedInfo().getLastAllocationStatus(), false); } initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate, diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 2eda9da8e1086..0cf79949d98ad 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -77,6 +77,12 @@ public void processExistingRecoveries(RoutingAllocation allocation) { continue; } + // if we already cancelled this shard before, then we should not cancel it again + if (shard.unassignedInfo() != null && shard.unassignedInfo().wasCancelledForNoopRecovery()) { + logger.trace("{} was cancelled for a noop recovery before", shard); + continue; + } + AsyncShardFetch.FetchResult shardStores = fetchData(shard, allocation); if (shardStores.hasData() == false) { logger.trace("{}: fetching new stores for initializing shard", shard); @@ -110,7 +116,7 @@ && canPerformOperationBasedRecovery(primaryStore, shardStores, currentNode) == f "existing allocation of replica to [" + currentNode + "] cancelled, can perform a noop recovery on ["+ nodeWithHighestMatch + "]", null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, - UnassignedInfo.AllocationStatus.NO_ATTEMPT); + UnassignedInfo.AllocationStatus.NO_ATTEMPT, false); // don't cancel shard in the loop as it will cause a ConcurrentModificationException shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo, metaData.getIndexSafe(shard.index()), allocation.changes())); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index 20216af9bf38e..38cf35ee83e2a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -82,7 +82,8 @@ public void testSerialization() throws Exception { UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), UnassignedInfo.Reason.values()); UnassignedInfo meta = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null, null, - randomIntBetween(1, 100), System.nanoTime(), System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT): + randomIntBetween(1, 100), System.nanoTime(), System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT, + randomBoolean()): new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null); BytesStreamOutput out = new BytesStreamOutput(); meta.writeTo(out); @@ -94,6 +95,7 @@ public void testSerialization() throws Exception { assertThat(read.getMessage(), equalTo(meta.getMessage())); assertThat(read.getDetails(), equalTo(meta.getDetails())); assertThat(read.getNumFailedAllocations(), equalTo(meta.getNumFailedAllocations())); + assertThat(read.wasCancelledForNoopRecovery(), equalTo(meta.wasCancelledForNoopRecovery())); } public void testIndexCreated() { @@ -296,7 +298,7 @@ public void testFailedShard() { public void testRemainingDelayCalculation() throws Exception { final long baseTime = System.nanoTime(); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, 0, baseTime, - System.currentTimeMillis(), randomBoolean(), AllocationStatus.NO_ATTEMPT); + System.currentTimeMillis(), randomBoolean(), AllocationStatus.NO_ATTEMPT, false); final long totalDelayNanos = TimeValue.timeValueMillis(10).nanos(); final Settings indexSettings = Settings.builder() .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueNanos(totalDelayNanos)).build(); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java index 8bb1657b5ef3a..5cd547d8ec564 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java @@ -120,7 +120,7 @@ public void testCanAllocatePrimaryExistingInRestoreInProgress() { UnassignedInfo currentInfo = primary.unassignedInfo(); UnassignedInfo newInfo = new UnassignedInfo(currentInfo.getReason(), currentInfo.getMessage(), new IOException("i/o failure"), currentInfo.getNumFailedAllocations(), currentInfo.getUnassignedTimeInNanos(), - currentInfo.getUnassignedTimeInMillis(), currentInfo.isDelayed(), currentInfo.getLastAllocationStatus()); + currentInfo.getUnassignedTimeInMillis(), currentInfo.isDelayed(), currentInfo.getLastAllocationStatus(), false); primary = primary.updateUnassigned(newInfo, primary.recoverySource()); IndexRoutingTable indexRoutingTable = routingTable.index("test"); diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java index d27078b7e796b..4d33172f1399c 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java @@ -26,7 +26,10 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; @@ -282,6 +285,39 @@ public void testPreferCopyWithHighestMatchingOperations() throws Exception { assertThat(internalCluster().nodesInclude(indexName), allOf(hasItem(nodeWithHigherMatching), not(hasItem(nodeWithLowerMatching)))); } + /** + * Make sure that we do not repeatedly cancel an ongoing recovery for a noop copy on a broken node. + */ + public void testDoNotCancelRecoveryForBrokenNode() throws Exception { + internalCluster().startMasterOnlyNode(); + String nodeWithPrimary = internalCluster().startDataOnlyNode(); + String indexName = "test"; + assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms"))); + ensureYellow(indexName); + String nodeWithReplica = internalCluster().startDataOnlyNode(); + ensureGreen(indexName); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(1, 200)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + ensureActivePeerRecoveryRetentionLeasesAdvanced(indexName); + String emptyNode = internalCluster().startDataOnlyNode(); + MockTransportService transportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, nodeWithPrimary); + transportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals(PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG) + && connection.getNode().getName().equals(emptyNode) == false) { + throw new CircuitBreakingException("not enough memory to open engine", 100, 50, CircuitBreaker.Durability.TRANSIENT); + } + connection.sendRequest(requestId, action, request, options); + }); + internalCluster().restartNode(nodeWithReplica, new InternalTestCluster.RestartCallback()); + ensureGreen(indexName); + assertThat(internalCluster().nodesInclude(indexName), equalTo(Sets.newHashSet(nodeWithPrimary, emptyNode))); + transportService.clearAllRules(); + } + private void ensureActivePeerRecoveryRetentionLeasesAdvanced(String indexName) throws Exception { assertBusy(() -> { Index index = resolveIndex(indexName); diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index 821e721a1ac27..3278526e6bb01 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -66,6 +66,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.unmodifiableMap; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; public class ReplicaShardAllocatorTests extends ESAllocationTestCase { @@ -388,6 +389,21 @@ public void testNotCancellingRecovery() { assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0)); } + public void testDoNotCancelAlreadyCancelledShardRouting() { + UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, randomIntBetween(1, 10), + System.nanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT, true); + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo); + long retainingSeqNoOnPrimary = randomLongBetween(0, Long.MAX_VALUE); + List retentionLeases = Arrays.asList( + newRetentionLease(node1, retainingSeqNoOnPrimary), newRetentionLease(node3, retainingSeqNoOnPrimary)); + testAllocator + .addData(node1, retentionLeases, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)) + .addData(node2, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.processExistingRecoveries(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(false)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED), empty()); + } + private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders) { return onePrimaryOnNode1And1Replica(deciders, Settings.EMPTY, UnassignedInfo.Reason.CLUSTER_RECOVERED); } @@ -410,7 +426,7 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide .addShard(ShardRouting.newUnassigned(shardId, false, RecoverySource.PeerRecoverySource.INSTANCE, new UnassignedInfo(reason, null, null, failedAllocations, System.nanoTime(), - System.currentTimeMillis(), delayed, UnassignedInfo.AllocationStatus.NO_ATTEMPT) + System.currentTimeMillis(), delayed, UnassignedInfo.AllocationStatus.NO_ATTEMPT, false) )) .build()) ) @@ -422,7 +438,7 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, System.nanoTime()); } - private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) { + private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders, UnassignedInfo unassignedInfo) { ShardRouting primaryShard = TestShardRouting.newShardRouting(shardId, node1.getId(), true, ShardRoutingState.STARTED); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(Version.CURRENT)) @@ -434,8 +450,7 @@ private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDecid .addIndexShard(new IndexShardRoutingTable.Builder(shardId) .addShard(primaryShard) .addShard(TestShardRouting.newShardRouting(shardId, node2.getId(), null, false, - ShardRoutingState.INITIALIZING, - new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null))) + ShardRoutingState.INITIALIZING, unassignedInfo)) .build()) ) .build(); @@ -446,6 +461,10 @@ private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDecid return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, System.nanoTime()); } + private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) { + return onePrimaryOnNode1And1ReplicaRecovering(deciders, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null)); + } + static RetentionLease newRetentionLease(DiscoveryNode node, long retainingSeqNo) { return new RetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(node.getId()), retainingSeqNo, randomNonNegativeLong(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 8f07c99a49764..7c0a92675320a 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -998,7 +998,6 @@ public void testHistoryRetention() throws Exception { assertThat(recoveryState.getTranslog().recoveredOperations(), greaterThan(0)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47974") public void testDoNotInfinitelyWaitForMapping() { internalCluster().ensureAtLeastNumDataNodes(3); createIndex("test", Settings.builder() From 600c3dee8905b53243d9c12983b365fbbecf151e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 20 Oct 2019 22:32:21 -0400 Subject: [PATCH 02/11] make test fail more often --- .../gateway/ReplicaShardAllocatorIT.java | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java index 4d33172f1399c..d658d42e95f2f 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; @@ -50,6 +49,7 @@ import java.util.Collection; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -293,28 +293,30 @@ public void testDoNotCancelRecoveryForBrokenNode() throws Exception { String nodeWithPrimary = internalCluster().startDataOnlyNode(); String indexName = "test"; assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms"))); - ensureYellow(indexName); - String nodeWithReplica = internalCluster().startDataOnlyNode(); - ensureGreen(indexName); - indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(1, 200)) + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(200, 500)) .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); - ensureActivePeerRecoveryRetentionLeasesAdvanced(indexName); - String emptyNode = internalCluster().startDataOnlyNode(); + client().admin().indices().prepareFlush(indexName).get(); + AtomicReference brokenNode = new AtomicReference<>(); MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeWithPrimary); transportService.addSendBehavior((connection, requestId, action, request, options) -> { - if (action.equals(PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG) - && connection.getNode().getName().equals(emptyNode) == false) { - throw new CircuitBreakingException("not enough memory to open engine", 100, 50, CircuitBreaker.Durability.TRANSIENT); + if (action.equals(PeerRecoveryTargetService.Actions.TRANSLOG_OPS)) { + String nodeName = connection.getNode().getName(); + brokenNode.compareAndSet(null, nodeName); + if (brokenNode.get().equals(nodeName)) { + throw new CircuitBreakingException("not enough memory for indexing", 100, 50, CircuitBreaker.Durability.TRANSIENT); + } } connection.sendRequest(requestId, action, request, options); }); - internalCluster().restartNode(nodeWithReplica, new InternalTestCluster.RestartCallback()); + internalCluster().startDataOnlyNodes(2); + client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)).get(); ensureGreen(indexName); - assertThat(internalCluster().nodesInclude(indexName), equalTo(Sets.newHashSet(nodeWithPrimary, emptyNode))); + assertThat(internalCluster().nodesInclude(indexName), not(hasItem(brokenNode.get()))); transportService.clearAllRules(); } From 3976586552ed0e4d7698b77bc9b8f00cae96ea1d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 21 Oct 2019 11:42:34 -0400 Subject: [PATCH 03/11] keep track list of failed nodes --- .../cluster/routing/RoutingNodes.java | 4 +- .../cluster/routing/UnassignedInfo.java | 47 ++++++++++--------- .../routing/allocation/AllocationService.java | 22 +++++---- .../allocator/BalancedShardsAllocator.java | 3 +- ...AllocateEmptyPrimaryAllocationCommand.java | 3 +- .../gateway/ReplicaShardAllocator.java | 20 ++++---- .../cluster/reroute/ClusterRerouteTests.java | 8 ++++ .../cluster/routing/UnassignedInfoTests.java | 13 +++-- .../MaxRetryAllocationDeciderTests.java | 46 +++++++++++++----- ...storeInProgressAllocationDeciderTests.java | 4 +- .../gateway/ReplicaShardAllocatorIT.java | 15 +++--- .../gateway/ReplicaShardAllocatorTests.java | 47 +++++++++++++++---- 12 files changed, 149 insertions(+), 83 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 5cf491bb378b1..e286a1f52ae19 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -547,7 +547,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId assert replicaShard != null : "failed to re-resolve " + routing + " when failing replicas"; UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED, "primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT, false); + unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT, Collections.emptySet()); failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetaData, routingChangesObserver); } } @@ -873,7 +873,7 @@ public void ignoreShard(ShardRouting shard, AllocationStatus allocationStatus, R UnassignedInfo newInfo = new UnassignedInfo(currInfo.getReason(), currInfo.getMessage(), currInfo.getFailure(), currInfo.getNumFailedAllocations(), currInfo.getUnassignedTimeInNanos(), currInfo.getUnassignedTimeInMillis(), currInfo.isDelayed(), - allocationStatus, false); + allocationStatus, currInfo.getFailedNodeIds()); ShardRouting updatedShard = shard.updateUnassigned(newInfo, shard.recoverySource()); changes.unassignedInfoUpdated(shard, newInfo); shard = updatedShard; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index ea941ce3f056d..fef9ccb85625c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -39,8 +39,10 @@ import java.io.IOException; import java.time.Instant; import java.time.ZoneOffset; +import java.util.Collections; import java.util.Locale; import java.util.Objects; +import java.util.Set; /** * Holds additional information as to why the shard is in unassigned state. @@ -214,8 +216,8 @@ public String value() { private final String message; private final Exception failure; private final int failedAllocations; + private final Set failedNodeIds; private final AllocationStatus lastAllocationStatus; // result of the last allocation attempt for this shard - private final boolean wasCancelledForNoopRecovery; /** * creates an UnassignedInfo object based on **current** time @@ -225,22 +227,22 @@ public String value() { **/ public UnassignedInfo(Reason reason, String message) { this(reason, message, null, reason == Reason.ALLOCATION_FAILED ? 1 : 0, System.nanoTime(), System.currentTimeMillis(), false, - AllocationStatus.NO_ATTEMPT, false); + AllocationStatus.NO_ATTEMPT, Collections.emptySet()); } /** - * @param reason the cause for making this shard unassigned. See {@link Reason} for more information. - * @param message more information about cause. - * @param failure the shard level failure that caused this shard to be unassigned, if exists. - * @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation - * @param unassignedTimeMillis the time of unassignment used to display to in our reporting. - * @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING. - * @param lastAllocationStatus the result of the last allocation attempt for this shard - * @param wasCancelledForNoopRecovery if allocation of this shard was cancelled in favour of a noop recovery + * @param reason the cause for making this shard unassigned. See {@link Reason} for more information. + * @param message more information about cause. + * @param failure the shard level failure that caused this shard to be unassigned, if exists. + * @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation + * @param unassignedTimeMillis the time of unassignment used to display to in our reporting. + * @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING. + * @param lastAllocationStatus the result of the last allocation attempt for this shard + * @param failedNodeIds a set of nodeIds that previously failed to allocate this shard */ public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Exception failure, int failedAllocations, long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed, AllocationStatus lastAllocationStatus, - boolean wasCancelledForNoopRecovery) { + Set failedNodeIds) { this.reason = Objects.requireNonNull(reason); this.unassignedTimeMillis = unassignedTimeMillis; this.unassignedTimeNanos = unassignedTimeNanos; @@ -249,13 +251,12 @@ public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Excepti this.failure = failure; this.failedAllocations = failedAllocations; this.lastAllocationStatus = Objects.requireNonNull(lastAllocationStatus); - this.wasCancelledForNoopRecovery = wasCancelledForNoopRecovery; + this.failedNodeIds = Collections.unmodifiableSet(failedNodeIds); assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED) : "failedAllocations: " + failedAllocations + " for reason " + reason; assert !(message == null && failure != null) : "provide a message if a failure exception is provided"; assert !(delayed && reason != Reason.NODE_LEFT) : "shard can only be delayed if it is unassigned due to a node leaving"; - assert wasCancelledForNoopRecovery == false || reason == Reason.ALLOCATION_FAILED : - "cancelled status was not reset for reason [" + reason + "]"; + assert failedAllocations >= failedNodeIds.size() : "failedAllocations: " + failedAllocations + " failedNodeIds: " + failedNodeIds; } public UnassignedInfo(StreamInput in) throws IOException { @@ -270,9 +271,9 @@ public UnassignedInfo(StreamInput in) throws IOException { this.failedAllocations = in.readVInt(); this.lastAllocationStatus = AllocationStatus.readFrom(in); if (in.getVersion().onOrAfter(Version.V_8_0_0)) { - this.wasCancelledForNoopRecovery = in.readBoolean(); + this.failedNodeIds = Collections.unmodifiableSet(in.readSet(StreamInput::readString)); } else { - this.wasCancelledForNoopRecovery = false; + this.failedNodeIds = Collections.emptySet(); } } @@ -286,7 +287,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(failedAllocations); lastAllocationStatus.writeTo(out); if (out.getVersion().onOrAfter(Version.V_8_0_0)) { - out.writeBoolean(wasCancelledForNoopRecovery); + out.writeCollection(failedNodeIds, StreamOutput::writeString); } } @@ -363,10 +364,10 @@ public AllocationStatus getLastAllocationStatus() { } /** - * @return true if the allocation of this shard was cancelled in favour of a noop recovery + * Returns a set of nodeId that previously failed to allocate this shard */ - public boolean wasCancelledForNoopRecovery() { - return wasCancelledForNoopRecovery; + public Set getFailedNodeIds() { + return failedNodeIds; } /** @@ -454,7 +455,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("details", details); } builder.field("allocation_status", lastAllocationStatus.value()); - builder.field("cancelled_for_noop", wasCancelledForNoopRecovery); + builder.field("failed_nodes", failedNodeIds); builder.endObject(); return builder; } @@ -491,7 +492,7 @@ public boolean equals(Object o) { if (Objects.equals(failure, that.failure) == false) { return false; } - return wasCancelledForNoopRecovery == that.wasCancelledForNoopRecovery; + return failedNodeIds.equals(that.failedNodeIds); } @Override @@ -503,7 +504,7 @@ public int hashCode() { result = 31 * result + (message != null ? message.hashCode() : 0); result = 31 * result + (failure != null ? failure.hashCode() : 0); result = 31 * result + lastAllocationStatus.hashCode(); - result = 31 * result + Boolean.hashCode(wasCancelledForNoopRecovery); + result = 31 * result + failedNodeIds.hashCode(); return result; } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 6a9e70126fe5b..0ee294e1664ed 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -46,9 +46,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -194,17 +196,18 @@ public ClusterState applyFailedShards(final ClusterState clusterState, final Lis shardToFail.shardId(), shardToFail, failedShard); } int failedAllocations = failedShard.unassignedInfo() != null ? failedShard.unassignedInfo().getNumFailedAllocations() : 0; - boolean wasCancelledForNoopRecovery; + final Set failedNodeIds; if (failedShard.unassignedInfo() != null) { - wasCancelledForNoopRecovery = failedShard.unassignedInfo().wasCancelledForNoopRecovery() - || failedShard.unassignedInfo().getReason() == UnassignedInfo.Reason.REALLOCATED_REPLICA; + failedNodeIds = new HashSet<>(failedShard.unassignedInfo().getFailedNodeIds().size() + 1); + failedNodeIds.addAll(failedShard.unassignedInfo().getFailedNodeIds()); + failedNodeIds.add(failedShard.currentNodeId()); } else { - wasCancelledForNoopRecovery = false; + failedNodeIds = Set.of(failedShard.currentNodeId()); } String message = "failed shard on node [" + shardToFail.currentNodeId() + "]: " + failedShardEntry.getMessage(); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, message, failedShardEntry.getFailure(), failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false, - AllocationStatus.NO_ATTEMPT, wasCancelledForNoopRecovery); + AllocationStatus.NO_ATTEMPT, failedNodeIds); if (failedShardEntry.markAsStale()) { allocation.removeAllocationId(failedShard); } @@ -296,8 +299,8 @@ private void removeDelayMarkers(RoutingAllocation allocation) { if (newComputedLeftDelayNanos == 0) { unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus(), false), - shardRouting.recoverySource(), allocation.changes()); + unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus(), + unassignedInfo.getFailedNodeIds()), shardRouting.recoverySource(), allocation.changes()); } } } @@ -315,7 +318,7 @@ private void resetFailedAllocationCounter(RoutingAllocation allocation) { UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), 0, unassignedInfo.getUnassignedTimeInNanos(), unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), - unassignedInfo.getLastAllocationStatus(), false), shardRouting.recoverySource(), allocation.changes()); + unassignedInfo.getLastAllocationStatus(), Collections.emptySet()), shardRouting.recoverySource(), allocation.changes()); } } @@ -428,7 +431,8 @@ private void disassociateDeadNodes(RoutingAllocation allocation) { final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index()); boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0; UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left [" + node.nodeId() + "]", - null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT, false); + null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT, + Collections.emptySet()); allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData, allocation.changes()); } // its a dead node, remove it, note, its important to remove it *after* we apply failed shard diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index a8630db1e077f..9a25c45ea355d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -152,7 +152,8 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) { if (shardRouting.primary() && unassignedInfo.getLastAllocationStatus() == AllocationStatus.NO_ATTEMPT) { unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), AllocationStatus.DECIDERS_NO, false), + unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), AllocationStatus.DECIDERS_NO, + unassignedInfo.getFailedNodeIds()), shardRouting.recoverySource(), allocation.changes()); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java index 50e45c6e1745f..08f64407f6d09 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java @@ -38,6 +38,7 @@ import org.elasticsearch.index.shard.ShardNotFoundException; import java.io.IOException; +import java.util.Collections; import java.util.Optional; /** @@ -139,7 +140,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) ", " + shardRouting.unassignedInfo().getMessage(); unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY, unassignedInfoMessage, shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis(), false, - shardRouting.unassignedInfo().getLastAllocationStatus(), false); + shardRouting.unassignedInfo().getLastAllocationStatus(), Collections.emptySet()); } initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate, diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 0cf79949d98ad..04bcd16f2a4aa 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -43,6 +43,7 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -77,12 +78,6 @@ public void processExistingRecoveries(RoutingAllocation allocation) { continue; } - // if we already cancelled this shard before, then we should not cancel it again - if (shard.unassignedInfo() != null && shard.unassignedInfo().wasCancelledForNoopRecovery()) { - logger.trace("{} was cancelled for a noop recovery before", shard); - continue; - } - AsyncShardFetch.FetchResult shardStores = fetchData(shard, allocation); if (shardStores.hasData() == false) { logger.trace("{}: fetching new stores for initializing shard", shard); @@ -101,7 +96,7 @@ public void processExistingRecoveries(RoutingAllocation allocation) { continue; } - MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryNode, primaryStore, shardStores, false); + MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, true, primaryNode, primaryStore, shardStores, false); if (matchingNodes.getNodeWithHighestMatch() != null) { DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId()); DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch(); @@ -116,7 +111,7 @@ && canPerformOperationBasedRecovery(primaryStore, shardStores, currentNode) == f "existing allocation of replica to [" + currentNode + "] cancelled, can perform a noop recovery on ["+ nodeWithHighestMatch + "]", null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, - UnassignedInfo.AllocationStatus.NO_ATTEMPT, false); + UnassignedInfo.AllocationStatus.NO_ATTEMPT, Collections.emptySet()); // don't cancel shard in the loop as it will cause a ConcurrentModificationException shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo, metaData.getIndexSafe(shard.index()), allocation.changes())); @@ -192,7 +187,8 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas return AllocateUnassignedDecision.NOT_TAKEN; } - MatchingNodes matchingNodes = findMatchingNodes(unassignedShard, allocation, primaryNode, primaryStore, shardStores, explain); + MatchingNodes matchingNodes = findMatchingNodes( + unassignedShard, allocation, false, primaryNode, primaryStore, shardStores, explain); assert explain == false || matchingNodes.nodeDecisions != null : "in explain mode, we must have individual node decisions"; List nodeDecisions = augmentExplanationsWithStoreInfo(result.v2(), matchingNodes.nodeDecisions); @@ -303,7 +299,7 @@ private static TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore return nodeFilesStore.storeFilesMetaData(); } - private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation, + private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation, boolean ignorePreviousFailedNodes, DiscoveryNode primaryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, AsyncShardFetch.FetchResult data, boolean explain) { @@ -311,6 +307,10 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al Map nodeDecisions = explain ? new HashMap<>() : null; for (Map.Entry nodeStoreEntry : data.getData().entrySet()) { DiscoveryNode discoNode = nodeStoreEntry.getKey(); + if (ignorePreviousFailedNodes + && shard.unassignedInfo() != null && shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) { + continue; + } TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData(); // we don't have any files at all, it is an empty index if (storeFilesMetaData.isEmpty()) { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java index 5cc5f98a49282..80df2a94f647a 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java @@ -45,11 +45,14 @@ import java.io.IOException; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; @@ -108,6 +111,7 @@ public void onFailure(Exception e) { req.dryRun(false);// now we allocate final int retries = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(Settings.EMPTY); + final Set failedNodeIds = new HashSet<>(); // now fail it N-1 times for (int i = 0; i < retries; i++) { ClusterState newState = task.execute(clusterState); @@ -120,6 +124,7 @@ public void onFailure(Exception e) { List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i, new UnsupportedOperationException(), randomBoolean())); + failedNodeIds.add(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); newState = allocationService.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -131,6 +136,7 @@ public void onFailure(Exception e) { assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING); } assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), i+1); + assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); } @@ -143,6 +149,7 @@ public void onFailure(Exception e) { assertEquals(routingTable.index("idx").shards().size(), 1); assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED); assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), retries); + assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); req.setRetryFailed(true); // now we manually retry and get the shard back into initializing newState = task.execute(clusterState); @@ -152,6 +159,7 @@ public void onFailure(Exception e) { assertEquals(1, routingTable.index("idx").shards().size()); assertEquals(INITIALIZING, routingTable.index("idx").shard(0).shards().get(0).state()); assertEquals(0, routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations()); + assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), empty()); } private ClusterState createInitialClusterState(AllocationService service) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index 38cf35ee83e2a..eb5b5c10c843c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -44,6 +44,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; @@ -80,10 +83,12 @@ public void testReasonOrdinalOrder() { public void testSerialization() throws Exception { UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), UnassignedInfo.Reason.values()); + int failedAllocations = randomIntBetween(1, 100); + Set failedNodes = IntStream.range(0, between(0, failedAllocations)) + .mapToObj(n -> "failed-node-" + n).collect(Collectors.toSet()); UnassignedInfo meta = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null, null, - randomIntBetween(1, 100), System.nanoTime(), System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT, - randomBoolean()): + failedAllocations, System.nanoTime(), System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT, failedNodes): new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null); BytesStreamOutput out = new BytesStreamOutput(); meta.writeTo(out); @@ -95,7 +100,7 @@ public void testSerialization() throws Exception { assertThat(read.getMessage(), equalTo(meta.getMessage())); assertThat(read.getDetails(), equalTo(meta.getDetails())); assertThat(read.getNumFailedAllocations(), equalTo(meta.getNumFailedAllocations())); - assertThat(read.wasCancelledForNoopRecovery(), equalTo(meta.wasCancelledForNoopRecovery())); + assertThat(read.getFailedNodeIds(), equalTo(meta.getFailedNodeIds())); } public void testIndexCreated() { @@ -298,7 +303,7 @@ public void testFailedShard() { public void testRemainingDelayCalculation() throws Exception { final long baseTime = System.nanoTime(); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, 0, baseTime, - System.currentTimeMillis(), randomBoolean(), AllocationStatus.NO_ATTEMPT, false); + System.currentTimeMillis(), randomBoolean(), AllocationStatus.NO_ATTEMPT, Collections.emptySet()); final long totalDelayNanos = TimeValue.timeValueMillis(10).nanos(); final Settings indexSettings = Settings.builder() .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueNanos(totalDelayNanos)).build(); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java index 19af72d18db4a..050b37a156aa2 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java @@ -38,12 +38,15 @@ import org.elasticsearch.test.gateway.TestGatewayAllocator; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; @@ -87,11 +90,13 @@ public void testSingleRetryOnIgnore() { ClusterState clusterState = createInitialClusterState(); RoutingTable routingTable = clusterState.routingTable(); final int retries = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(Settings.EMPTY); + Set failedNodeIds = new HashSet<>(); // now fail it N-1 times for (int i = 0; i < retries-1; i++) { List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i, new UnsupportedOperationException(), randomBoolean())); + failedNodeIds.add(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); ClusterState newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -99,23 +104,27 @@ public void testSingleRetryOnIgnore() { assertEquals(routingTable.index("idx").shards().size(), 1); assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING); assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), i+1); + assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), containsString("boom" + i)); } // now we go and check that we are actually stick to unassigned on the next failure List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom", new UnsupportedOperationException(), randomBoolean())); + failedNodeIds.add(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); ClusterState newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; routingTable = newState.routingTable(); assertEquals(routingTable.index("idx").shards().size(), 1); assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), retries); + assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED); assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), containsString("boom")); // manual resetting of retry count newState = strategy.reroute(clusterState, new AllocationCommands(), false, true).getClusterState(); + failedNodeIds.clear(); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; routingTable = newState.routingTable(); @@ -123,35 +132,39 @@ public void testSingleRetryOnIgnore() { clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); assertEquals(routingTable.index("idx").shards().size(), 1); assertEquals(0, routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations()); + assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), empty()); assertEquals(INITIALIZING, routingTable.index("idx").shard(0).shards().get(0).state()); assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), containsString("boom")); // again fail it N-1 times for (int i = 0; i < retries-1; i++) { - failedShards = Collections.singletonList( - new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom", - new UnsupportedOperationException(), randomBoolean())); - - newState = strategy.applyFailedShards(clusterState, failedShards); - assertThat(newState, not(equalTo(clusterState))); - clusterState = newState; - routingTable = newState.routingTable(); - assertEquals(routingTable.index("idx").shards().size(), 1); - assertEquals(i + 1, routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations()); - assertEquals(INITIALIZING, routingTable.index("idx").shard(0).shards().get(0).state()); - assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), containsString("boom")); + failedShards = Collections.singletonList( + new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom", + new UnsupportedOperationException(), randomBoolean())); + failedNodeIds.add(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); + newState = strategy.applyFailedShards(clusterState, failedShards); + assertThat(newState, not(equalTo(clusterState))); + clusterState = newState; + routingTable = newState.routingTable(); + assertEquals(routingTable.index("idx").shards().size(), 1); + assertEquals(i + 1, routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations()); + assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); + assertEquals(INITIALIZING, routingTable.index("idx").shard(0).shards().get(0).state()); + assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), containsString("boom")); } // now we go and check that we are actually stick to unassigned on the next failure failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom", new UnsupportedOperationException(), randomBoolean())); + failedNodeIds.add(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; routingTable = newState.routingTable(); assertEquals(routingTable.index("idx").shards().size(), 1); assertEquals(retries, routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations()); + assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); assertEquals(UNASSIGNED, routingTable.index("idx").shard(0).shards().get(0).state()); assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), containsString("boom")); } @@ -159,12 +172,14 @@ public void testSingleRetryOnIgnore() { public void testFailedAllocation() { ClusterState clusterState = createInitialClusterState(); RoutingTable routingTable = clusterState.routingTable(); + Set failedNodeIds = new HashSet<>(); final int retries = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(Settings.EMPTY); // now fail it N-1 times for (int i = 0; i < retries-1; i++) { List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i, new UnsupportedOperationException(), randomBoolean())); + failedNodeIds.add(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); ClusterState newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -173,6 +188,7 @@ public void testFailedAllocation() { ShardRouting unassignedPrimary = routingTable.index("idx").shard(0).shards().get(0); assertEquals(unassignedPrimary.state(), INITIALIZING); assertEquals(unassignedPrimary.unassignedInfo().getNumFailedAllocations(), i+1); + assertThat(unassignedPrimary.unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); assertThat(unassignedPrimary.unassignedInfo().getMessage(), containsString("boom" + i)); // MaxRetryAllocationDecider#canForceAllocatePrimary should return YES decisions because canAllocate returns YES here assertEquals(Decision.YES, new MaxRetryAllocationDecider().canForceAllocatePrimary( @@ -183,12 +199,14 @@ public void testFailedAllocation() { List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom", new UnsupportedOperationException(), randomBoolean())); + failedNodeIds.add(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); ClusterState newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; routingTable = newState.routingTable(); assertEquals(routingTable.index("idx").shards().size(), 1); ShardRouting unassignedPrimary = routingTable.index("idx").shard(0).shards().get(0); + assertThat(unassignedPrimary.unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); assertEquals(unassignedPrimary.unassignedInfo().getNumFailedAllocations(), retries); assertEquals(unassignedPrimary.state(), UNASSIGNED); assertThat(unassignedPrimary.unassignedInfo().getMessage(), containsString("boom")); @@ -212,6 +230,7 @@ public void testFailedAllocation() { assertEquals(routingTable.index("idx").shards().size(), 1); ShardRouting unassignedPrimary = routingTable.index("idx").shard(0).shards().get(0); assertEquals(unassignedPrimary.unassignedInfo().getNumFailedAllocations(), retries); + assertThat(unassignedPrimary.unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); assertEquals(unassignedPrimary.state(), INITIALIZING); assertThat(unassignedPrimary.unassignedInfo().getMessage(), containsString("boom")); // bumped up the max retry count, so canForceAllocatePrimary should return a YES decision @@ -223,6 +242,7 @@ public void testFailedAllocation() { routingTable = clusterState.routingTable(); // all counters have been reset to 0 ie. no unassigned info + failedNodeIds.clear(); assertEquals(routingTable.index("idx").shards().size(), 1); assertNull(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo()); assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), STARTED); @@ -231,6 +251,7 @@ public void testFailedAllocation() { List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "ZOOOMG", new UnsupportedOperationException(), randomBoolean())); + failedNodeIds.add(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -238,6 +259,7 @@ public void testFailedAllocation() { assertEquals(routingTable.index("idx").shards().size(), 1); unassignedPrimary = routingTable.index("idx").shard(0).shards().get(0); assertEquals(unassignedPrimary.unassignedInfo().getNumFailedAllocations(), 1); + assertThat(unassignedPrimary.unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); assertEquals(unassignedPrimary.state(), UNASSIGNED); assertThat(unassignedPrimary.unassignedInfo().getMessage(), containsString("ZOOOMG")); // Counter reset, so MaxRetryAllocationDecider#canForceAllocatePrimary should return a YES decision diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java index 5cd547d8ec564..fbedd5e1799c6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java @@ -119,8 +119,8 @@ public void testCanAllocatePrimaryExistingInRestoreInProgress() { UnassignedInfo currentInfo = primary.unassignedInfo(); UnassignedInfo newInfo = new UnassignedInfo(currentInfo.getReason(), currentInfo.getMessage(), new IOException("i/o failure"), - currentInfo.getNumFailedAllocations(), currentInfo.getUnassignedTimeInNanos(), - currentInfo.getUnassignedTimeInMillis(), currentInfo.isDelayed(), currentInfo.getLastAllocationStatus(), false); + currentInfo.getNumFailedAllocations(), currentInfo.getUnassignedTimeInNanos(), currentInfo.getUnassignedTimeInMillis(), + currentInfo.isDelayed(), currentInfo.getLastAllocationStatus(), currentInfo.getFailedNodeIds()); primary = primary.updateUnassigned(newInfo, primary.recoverySource()); IndexRoutingTable indexRoutingTable = routingTable.index("test"); diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java index d658d42e95f2f..7791f919999f1 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java @@ -49,7 +49,6 @@ import java.util.Collection; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -299,24 +298,22 @@ public void testDoNotCancelRecoveryForBrokenNode() throws Exception { indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(200, 500)) .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); client().admin().indices().prepareFlush(indexName).get(); - AtomicReference brokenNode = new AtomicReference<>(); + String brokenNode = internalCluster().startDataOnlyNode(); MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeWithPrimary); transportService.addSendBehavior((connection, requestId, action, request, options) -> { if (action.equals(PeerRecoveryTargetService.Actions.TRANSLOG_OPS)) { - String nodeName = connection.getNode().getName(); - brokenNode.compareAndSet(null, nodeName); - if (brokenNode.get().equals(nodeName)) { + if (brokenNode.equals(connection.getNode().getName())) { throw new CircuitBreakingException("not enough memory for indexing", 100, 50, CircuitBreaker.Durability.TRANSIENT); } } connection.sendRequest(requestId, action, request, options); }); - internalCluster().startDataOnlyNodes(2); - client().admin().indices().prepareUpdateSettings(indexName) - .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)).get(); + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1))); + internalCluster().startDataOnlyNode(); + client().admin().cluster().prepareReroute().setRetryFailed(true).get(); ensureGreen(indexName); - assertThat(internalCluster().nodesInclude(indexName), not(hasItem(brokenNode.get()))); transportService.clearAllRules(); } diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index 3278526e6bb01..a91752a42580b 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -61,13 +61,16 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.unmodifiableMap; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; public class ReplicaShardAllocatorTests extends ESAllocationTestCase { private static final org.apache.lucene.util.Version MIN_SUPPORTED_LUCENE_VERSION = org.elasticsearch.Version.CURRENT @@ -183,7 +186,14 @@ public void testPreferCopyWithHighestMatchingOperations() { } public void testCancelRecoveryIfFoundCopyWithNoopRetentionLease() { - RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); + final UnassignedInfo unassignedInfo; + if (randomBoolean()) { + unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null); + } else { + unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, randomIntBetween(1, 10), + System.nanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT, Set.of("node-4")); + } + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo); long retainingSeqNo = randomLongBetween(1, Long.MAX_VALUE); testAllocator.addData(node1, Arrays.asList(newRetentionLease(node1, retainingSeqNo), newRetentionLease(node3, retainingSeqNo)), "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); @@ -191,8 +201,11 @@ public void testCancelRecoveryIfFoundCopyWithNoopRetentionLease() { testAllocator.addData(node3, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); testAllocator.processExistingRecoveries(allocation); assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).get(0).shardId(), equalTo(shardId)); + List unassignedShards = allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED); + assertThat(unassignedShards, hasSize(1)); + assertThat(unassignedShards.get(0).shardId(), equalTo(shardId)); + assertThat(unassignedShards.get(0).unassignedInfo().getNumFailedAllocations(), equalTo(0)); + assertThat(unassignedShards.get(0).unassignedInfo().getFailedNodeIds(), empty()); } public void testNotCancellingRecoveryIfCurrentRecoveryHasRetentionLease() { @@ -358,7 +371,14 @@ public void testCancelRecoveryBetterSyncId() { } public void testNotCancellingRecoveryIfSyncedOnExistingRecovery() { - RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); + final UnassignedInfo unassignedInfo; + if (randomBoolean()) { + unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null); + } else { + unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, randomIntBetween(1, 10), + System.nanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT, Set.of("node-4")); + } + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo); List retentionLeases = new ArrayList<>(); if (randomBoolean()) { long retainingSeqNoOnPrimary = randomLongBetween(0, Long.MAX_VALUE); @@ -389,16 +409,23 @@ public void testNotCancellingRecovery() { assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0)); } - public void testDoNotCancelAlreadyCancelledShardRouting() { - UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, randomIntBetween(1, 10), - System.nanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT, true); + public void testDoNotCancelForBrokenNode() { + Set failedNodes = new HashSet<>(); + failedNodes.add(node3.getId()); + if (randomBoolean()) { + failedNodes.add("node4"); + } + UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, + randomIntBetween(failedNodes.size(), 10), System.nanoTime(), System.currentTimeMillis(), false, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodes); RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo); long retainingSeqNoOnPrimary = randomLongBetween(0, Long.MAX_VALUE); List retentionLeases = Arrays.asList( newRetentionLease(node1, retainingSeqNoOnPrimary), newRetentionLease(node3, retainingSeqNoOnPrimary)); testAllocator .addData(node1, retentionLeases, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)) - .addData(node2, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + .addData(node2, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)) + .addData(node3, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); testAllocator.processExistingRecoveries(allocation); assertThat(allocation.routingNodesChanged(), equalTo(false)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED), empty()); @@ -426,8 +453,8 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide .addShard(ShardRouting.newUnassigned(shardId, false, RecoverySource.PeerRecoverySource.INSTANCE, new UnassignedInfo(reason, null, null, failedAllocations, System.nanoTime(), - System.currentTimeMillis(), delayed, UnassignedInfo.AllocationStatus.NO_ATTEMPT, false) - )) + System.currentTimeMillis(), delayed, UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Collections.emptySet()))) .build()) ) .build(); From 0dd5cc8c2c59b6c440890b17f5a44b7dac9a4439 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 29 Oct 2019 12:15:29 -0400 Subject: [PATCH 04/11] track only nodes that failed to perform noop allocations --- .../cluster/routing/RoutingNodes.java | 2 +- .../cluster/routing/UnassignedInfo.java | 46 ++++++++++--------- .../routing/allocation/AllocationService.java | 11 +++-- .../allocator/BalancedShardsAllocator.java | 2 +- .../gateway/ReplicaShardAllocator.java | 2 +- .../cluster/reroute/ClusterRerouteTests.java | 8 ---- .../cluster/routing/UnassignedInfoTests.java | 2 +- .../MaxRetryAllocationDeciderTests.java | 46 +++++-------------- ...storeInProgressAllocationDeciderTests.java | 2 +- .../gateway/ReplicaShardAllocatorTests.java | 2 +- 10 files changed, 49 insertions(+), 74 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index e286a1f52ae19..24463d8ebab94 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -873,7 +873,7 @@ public void ignoreShard(ShardRouting shard, AllocationStatus allocationStatus, R UnassignedInfo newInfo = new UnassignedInfo(currInfo.getReason(), currInfo.getMessage(), currInfo.getFailure(), currInfo.getNumFailedAllocations(), currInfo.getUnassignedTimeInNanos(), currInfo.getUnassignedTimeInMillis(), currInfo.isDelayed(), - allocationStatus, currInfo.getFailedNodeIds()); + allocationStatus, currInfo.getFailedNoopAllocationNodeIds()); ShardRouting updatedShard = shard.updateUnassigned(newInfo, shard.recoverySource()); changes.unassignedInfoUpdated(shard, newInfo); shard = updatedShard; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index fef9ccb85625c..5c1570b77f1fb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -23,6 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -216,7 +217,7 @@ public String value() { private final String message; private final Exception failure; private final int failedAllocations; - private final Set failedNodeIds; + private final Set failedNoopAllocationNodeIds; private final AllocationStatus lastAllocationStatus; // result of the last allocation attempt for this shard /** @@ -231,18 +232,18 @@ public UnassignedInfo(Reason reason, String message) { } /** - * @param reason the cause for making this shard unassigned. See {@link Reason} for more information. - * @param message more information about cause. - * @param failure the shard level failure that caused this shard to be unassigned, if exists. - * @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation - * @param unassignedTimeMillis the time of unassignment used to display to in our reporting. - * @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING. - * @param lastAllocationStatus the result of the last allocation attempt for this shard - * @param failedNodeIds a set of nodeIds that previously failed to allocate this shard + * @param reason the cause for making this shard unassigned. See {@link Reason} for more information. + * @param message more information about cause. + * @param failure the shard level failure that caused this shard to be unassigned, if exists. + * @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation + * @param unassignedTimeMillis the time of unassignment used to display to in our reporting. + * @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING. + * @param lastAllocationStatus the result of the last allocation attempt for this shard + * @param failedNoopAllocationNodeIds a set of nodeIds that failed noop allocations for this shard */ public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Exception failure, int failedAllocations, long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed, AllocationStatus lastAllocationStatus, - Set failedNodeIds) { + Set failedNoopAllocationNodeIds) { this.reason = Objects.requireNonNull(reason); this.unassignedTimeMillis = unassignedTimeMillis; this.unassignedTimeNanos = unassignedTimeNanos; @@ -251,12 +252,12 @@ public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Excepti this.failure = failure; this.failedAllocations = failedAllocations; this.lastAllocationStatus = Objects.requireNonNull(lastAllocationStatus); - this.failedNodeIds = Collections.unmodifiableSet(failedNodeIds); + this.failedNoopAllocationNodeIds = Collections.unmodifiableSet(failedNoopAllocationNodeIds); assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED) : "failedAllocations: " + failedAllocations + " for reason " + reason; assert !(message == null && failure != null) : "provide a message if a failure exception is provided"; assert !(delayed && reason != Reason.NODE_LEFT) : "shard can only be delayed if it is unassigned due to a node leaving"; - assert failedAllocations >= failedNodeIds.size() : "failedAllocations: " + failedAllocations + " failedNodeIds: " + failedNodeIds; + assert failedAllocations >= failedNoopAllocationNodeIds.size() : "failedAllocations: " + failedAllocations + " failedNodeIds: " + failedNoopAllocationNodeIds; } public UnassignedInfo(StreamInput in) throws IOException { @@ -271,9 +272,9 @@ public UnassignedInfo(StreamInput in) throws IOException { this.failedAllocations = in.readVInt(); this.lastAllocationStatus = AllocationStatus.readFrom(in); if (in.getVersion().onOrAfter(Version.V_8_0_0)) { - this.failedNodeIds = Collections.unmodifiableSet(in.readSet(StreamInput::readString)); + this.failedNoopAllocationNodeIds = Collections.unmodifiableSet(in.readSet(StreamInput::readString)); } else { - this.failedNodeIds = Collections.emptySet(); + this.failedNoopAllocationNodeIds = Collections.emptySet(); } } @@ -287,7 +288,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(failedAllocations); lastAllocationStatus.writeTo(out); if (out.getVersion().onOrAfter(Version.V_8_0_0)) { - out.writeCollection(failedNodeIds, StreamOutput::writeString); + out.writeCollection(failedNoopAllocationNodeIds, StreamOutput::writeString); } } @@ -364,10 +365,13 @@ public AllocationStatus getLastAllocationStatus() { } /** - * Returns a set of nodeId that previously failed to allocate this shard + * A set of nodeIds that {@link org.elasticsearch.gateway.ReplicaShardAllocator} has cancelled ongoing recoveries for copies on them, + * but they failed to complete noop allocations for this shard. We track those nodes so we won't cancel recoveries for them again. + * + * @see org.elasticsearch.gateway.ReplicaShardAllocator#processExistingRecoveries(RoutingAllocation) */ - public Set getFailedNodeIds() { - return failedNodeIds; + public Set getFailedNoopAllocationNodeIds() { + return failedNoopAllocationNodeIds; } /** @@ -455,7 +459,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("details", details); } builder.field("allocation_status", lastAllocationStatus.value()); - builder.field("failed_nodes", failedNodeIds); + builder.field("failed_nodes", failedNoopAllocationNodeIds); builder.endObject(); return builder; } @@ -492,7 +496,7 @@ public boolean equals(Object o) { if (Objects.equals(failure, that.failure) == false) { return false; } - return failedNodeIds.equals(that.failedNodeIds); + return failedNoopAllocationNodeIds.equals(that.failedNoopAllocationNodeIds); } @Override @@ -504,7 +508,7 @@ public int hashCode() { result = 31 * result + (message != null ? message.hashCode() : 0); result = 31 * result + (failure != null ? failure.hashCode() : 0); result = 31 * result + lastAllocationStatus.hashCode(); - result = 31 * result + failedNodeIds.hashCode(); + result = 31 * result + failedNoopAllocationNodeIds.hashCode(); return result; } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 0ee294e1664ed..5fe26627dfea8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -197,12 +197,13 @@ public ClusterState applyFailedShards(final ClusterState clusterState, final Lis } int failedAllocations = failedShard.unassignedInfo() != null ? failedShard.unassignedInfo().getNumFailedAllocations() : 0; final Set failedNodeIds; - if (failedShard.unassignedInfo() != null) { - failedNodeIds = new HashSet<>(failedShard.unassignedInfo().getFailedNodeIds().size() + 1); - failedNodeIds.addAll(failedShard.unassignedInfo().getFailedNodeIds()); + if (failedShard.unassignedInfo() != null && + failedShard.unassignedInfo().getReason() == UnassignedInfo.Reason.REALLOCATED_REPLICA) { + failedNodeIds = new HashSet<>(failedShard.unassignedInfo().getFailedNoopAllocationNodeIds().size() + 1); + failedNodeIds.addAll(failedShard.unassignedInfo().getFailedNoopAllocationNodeIds()); failedNodeIds.add(failedShard.currentNodeId()); } else { - failedNodeIds = Set.of(failedShard.currentNodeId()); + failedNodeIds = Collections.emptySet(); } String message = "failed shard on node [" + shardToFail.currentNodeId() + "]: " + failedShardEntry.getMessage(); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, message, @@ -300,7 +301,7 @@ private void removeDelayMarkers(RoutingAllocation allocation) { unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus(), - unassignedInfo.getFailedNodeIds()), shardRouting.recoverySource(), allocation.changes()); + unassignedInfo.getFailedNoopAllocationNodeIds()), shardRouting.recoverySource(), allocation.changes()); } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 9a25c45ea355d..a589ad00ffbf5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -153,7 +153,7 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) { unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), AllocationStatus.DECIDERS_NO, - unassignedInfo.getFailedNodeIds()), + unassignedInfo.getFailedNoopAllocationNodeIds()), shardRouting.recoverySource(), allocation.changes()); } } diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 04bcd16f2a4aa..5ccce164eb380 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -308,7 +308,7 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al for (Map.Entry nodeStoreEntry : data.getData().entrySet()) { DiscoveryNode discoNode = nodeStoreEntry.getKey(); if (ignorePreviousFailedNodes - && shard.unassignedInfo() != null && shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) { + && shard.unassignedInfo() != null && shard.unassignedInfo().getFailedNoopAllocationNodeIds().contains(discoNode.getId())) { continue; } TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData(); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java index 80df2a94f647a..5cc5f98a49282 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java @@ -45,14 +45,11 @@ import java.io.IOException; import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; @@ -111,7 +108,6 @@ public void onFailure(Exception e) { req.dryRun(false);// now we allocate final int retries = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(Settings.EMPTY); - final Set failedNodeIds = new HashSet<>(); // now fail it N-1 times for (int i = 0; i < retries; i++) { ClusterState newState = task.execute(clusterState); @@ -124,7 +120,6 @@ public void onFailure(Exception e) { List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i, new UnsupportedOperationException(), randomBoolean())); - failedNodeIds.add(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); newState = allocationService.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -136,7 +131,6 @@ public void onFailure(Exception e) { assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING); } assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), i+1); - assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); } @@ -149,7 +143,6 @@ public void onFailure(Exception e) { assertEquals(routingTable.index("idx").shards().size(), 1); assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED); assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), retries); - assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); req.setRetryFailed(true); // now we manually retry and get the shard back into initializing newState = task.execute(clusterState); @@ -159,7 +152,6 @@ public void onFailure(Exception e) { assertEquals(1, routingTable.index("idx").shards().size()); assertEquals(INITIALIZING, routingTable.index("idx").shard(0).shards().get(0).state()); assertEquals(0, routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations()); - assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), empty()); } private ClusterState createInitialClusterState(AllocationService service) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index eb5b5c10c843c..2e92a5f7be794 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -100,7 +100,7 @@ public void testSerialization() throws Exception { assertThat(read.getMessage(), equalTo(meta.getMessage())); assertThat(read.getDetails(), equalTo(meta.getDetails())); assertThat(read.getNumFailedAllocations(), equalTo(meta.getNumFailedAllocations())); - assertThat(read.getFailedNodeIds(), equalTo(meta.getFailedNodeIds())); + assertThat(read.getFailedNoopAllocationNodeIds(), equalTo(meta.getFailedNoopAllocationNodeIds())); } public void testIndexCreated() { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java index 050b37a156aa2..19af72d18db4a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java @@ -38,15 +38,12 @@ import org.elasticsearch.test.gateway.TestGatewayAllocator; import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Set; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; @@ -90,13 +87,11 @@ public void testSingleRetryOnIgnore() { ClusterState clusterState = createInitialClusterState(); RoutingTable routingTable = clusterState.routingTable(); final int retries = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(Settings.EMPTY); - Set failedNodeIds = new HashSet<>(); // now fail it N-1 times for (int i = 0; i < retries-1; i++) { List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i, new UnsupportedOperationException(), randomBoolean())); - failedNodeIds.add(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); ClusterState newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -104,27 +99,23 @@ public void testSingleRetryOnIgnore() { assertEquals(routingTable.index("idx").shards().size(), 1); assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING); assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), i+1); - assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), containsString("boom" + i)); } // now we go and check that we are actually stick to unassigned on the next failure List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom", new UnsupportedOperationException(), randomBoolean())); - failedNodeIds.add(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); ClusterState newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; routingTable = newState.routingTable(); assertEquals(routingTable.index("idx").shards().size(), 1); assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), retries); - assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED); assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), containsString("boom")); // manual resetting of retry count newState = strategy.reroute(clusterState, new AllocationCommands(), false, true).getClusterState(); - failedNodeIds.clear(); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; routingTable = newState.routingTable(); @@ -132,39 +123,35 @@ public void testSingleRetryOnIgnore() { clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); assertEquals(routingTable.index("idx").shards().size(), 1); assertEquals(0, routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations()); - assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), empty()); assertEquals(INITIALIZING, routingTable.index("idx").shard(0).shards().get(0).state()); assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), containsString("boom")); // again fail it N-1 times for (int i = 0; i < retries-1; i++) { - failedShards = Collections.singletonList( - new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom", - new UnsupportedOperationException(), randomBoolean())); - failedNodeIds.add(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); - newState = strategy.applyFailedShards(clusterState, failedShards); - assertThat(newState, not(equalTo(clusterState))); - clusterState = newState; - routingTable = newState.routingTable(); - assertEquals(routingTable.index("idx").shards().size(), 1); - assertEquals(i + 1, routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations()); - assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); - assertEquals(INITIALIZING, routingTable.index("idx").shard(0).shards().get(0).state()); - assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), containsString("boom")); + failedShards = Collections.singletonList( + new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom", + new UnsupportedOperationException(), randomBoolean())); + + newState = strategy.applyFailedShards(clusterState, failedShards); + assertThat(newState, not(equalTo(clusterState))); + clusterState = newState; + routingTable = newState.routingTable(); + assertEquals(routingTable.index("idx").shards().size(), 1); + assertEquals(i + 1, routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations()); + assertEquals(INITIALIZING, routingTable.index("idx").shard(0).shards().get(0).state()); + assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), containsString("boom")); } // now we go and check that we are actually stick to unassigned on the next failure failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom", new UnsupportedOperationException(), randomBoolean())); - failedNodeIds.add(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; routingTable = newState.routingTable(); assertEquals(routingTable.index("idx").shards().size(), 1); assertEquals(retries, routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations()); - assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); assertEquals(UNASSIGNED, routingTable.index("idx").shard(0).shards().get(0).state()); assertThat(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), containsString("boom")); } @@ -172,14 +159,12 @@ public void testSingleRetryOnIgnore() { public void testFailedAllocation() { ClusterState clusterState = createInitialClusterState(); RoutingTable routingTable = clusterState.routingTable(); - Set failedNodeIds = new HashSet<>(); final int retries = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(Settings.EMPTY); // now fail it N-1 times for (int i = 0; i < retries-1; i++) { List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i, new UnsupportedOperationException(), randomBoolean())); - failedNodeIds.add(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); ClusterState newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -188,7 +173,6 @@ public void testFailedAllocation() { ShardRouting unassignedPrimary = routingTable.index("idx").shard(0).shards().get(0); assertEquals(unassignedPrimary.state(), INITIALIZING); assertEquals(unassignedPrimary.unassignedInfo().getNumFailedAllocations(), i+1); - assertThat(unassignedPrimary.unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); assertThat(unassignedPrimary.unassignedInfo().getMessage(), containsString("boom" + i)); // MaxRetryAllocationDecider#canForceAllocatePrimary should return YES decisions because canAllocate returns YES here assertEquals(Decision.YES, new MaxRetryAllocationDecider().canForceAllocatePrimary( @@ -199,14 +183,12 @@ public void testFailedAllocation() { List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom", new UnsupportedOperationException(), randomBoolean())); - failedNodeIds.add(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); ClusterState newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; routingTable = newState.routingTable(); assertEquals(routingTable.index("idx").shards().size(), 1); ShardRouting unassignedPrimary = routingTable.index("idx").shard(0).shards().get(0); - assertThat(unassignedPrimary.unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); assertEquals(unassignedPrimary.unassignedInfo().getNumFailedAllocations(), retries); assertEquals(unassignedPrimary.state(), UNASSIGNED); assertThat(unassignedPrimary.unassignedInfo().getMessage(), containsString("boom")); @@ -230,7 +212,6 @@ public void testFailedAllocation() { assertEquals(routingTable.index("idx").shards().size(), 1); ShardRouting unassignedPrimary = routingTable.index("idx").shard(0).shards().get(0); assertEquals(unassignedPrimary.unassignedInfo().getNumFailedAllocations(), retries); - assertThat(unassignedPrimary.unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); assertEquals(unassignedPrimary.state(), INITIALIZING); assertThat(unassignedPrimary.unassignedInfo().getMessage(), containsString("boom")); // bumped up the max retry count, so canForceAllocatePrimary should return a YES decision @@ -242,7 +223,6 @@ public void testFailedAllocation() { routingTable = clusterState.routingTable(); // all counters have been reset to 0 ie. no unassigned info - failedNodeIds.clear(); assertEquals(routingTable.index("idx").shards().size(), 1); assertNull(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo()); assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), STARTED); @@ -251,7 +231,6 @@ public void testFailedAllocation() { List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "ZOOOMG", new UnsupportedOperationException(), randomBoolean())); - failedNodeIds.add(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -259,7 +238,6 @@ public void testFailedAllocation() { assertEquals(routingTable.index("idx").shards().size(), 1); unassignedPrimary = routingTable.index("idx").shard(0).shards().get(0); assertEquals(unassignedPrimary.unassignedInfo().getNumFailedAllocations(), 1); - assertThat(unassignedPrimary.unassignedInfo().getFailedNodeIds(), equalTo(failedNodeIds)); assertEquals(unassignedPrimary.state(), UNASSIGNED); assertThat(unassignedPrimary.unassignedInfo().getMessage(), containsString("ZOOOMG")); // Counter reset, so MaxRetryAllocationDecider#canForceAllocatePrimary should return a YES decision diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java index fbedd5e1799c6..0f8e39210164c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java @@ -120,7 +120,7 @@ public void testCanAllocatePrimaryExistingInRestoreInProgress() { UnassignedInfo currentInfo = primary.unassignedInfo(); UnassignedInfo newInfo = new UnassignedInfo(currentInfo.getReason(), currentInfo.getMessage(), new IOException("i/o failure"), currentInfo.getNumFailedAllocations(), currentInfo.getUnassignedTimeInNanos(), currentInfo.getUnassignedTimeInMillis(), - currentInfo.isDelayed(), currentInfo.getLastAllocationStatus(), currentInfo.getFailedNodeIds()); + currentInfo.isDelayed(), currentInfo.getLastAllocationStatus(), currentInfo.getFailedNoopAllocationNodeIds()); primary = primary.updateUnassigned(newInfo, primary.recoverySource()); IndexRoutingTable indexRoutingTable = routingTable.index("test"); diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index a91752a42580b..2d908d0dc7d7b 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -205,7 +205,7 @@ public void testCancelRecoveryIfFoundCopyWithNoopRetentionLease() { assertThat(unassignedShards, hasSize(1)); assertThat(unassignedShards.get(0).shardId(), equalTo(shardId)); assertThat(unassignedShards.get(0).unassignedInfo().getNumFailedAllocations(), equalTo(0)); - assertThat(unassignedShards.get(0).unassignedInfo().getFailedNodeIds(), empty()); + assertThat(unassignedShards.get(0).unassignedInfo().getFailedNoopAllocationNodeIds(), empty()); } public void testNotCancellingRecoveryIfCurrentRecoveryHasRetentionLease() { From 05e7691cd18d45a87436821ee7817907f3c7b5d6 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 29 Oct 2019 12:35:13 -0400 Subject: [PATCH 05/11] summary --- .../org/elasticsearch/cluster/routing/UnassignedInfo.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index 5c1570b77f1fb..119f9e52d4aa1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -430,6 +430,9 @@ public String shortSummary() { if (failedAllocations > 0) { sb.append(", failed_attempts[").append(failedAllocations).append("]"); } + if (failedNoopAllocationNodeIds.isEmpty() == false) { + sb.append(", failed_noop_allocation_nodes[").append(failedNoopAllocationNodeIds).append("]"); + } sb.append(", delayed=").append(delayed); String details = getDetails(); @@ -459,7 +462,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("details", details); } builder.field("allocation_status", lastAllocationStatus.value()); - builder.field("failed_nodes", failedNoopAllocationNodeIds); + if (failedNoopAllocationNodeIds.isEmpty() == false) { + builder.field("failed_noop_allocation_nodes", failedNoopAllocationNodeIds); + } builder.endObject(); return builder; } From 89c675346b2d932bc07d4f94932766e52c57e2e6 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 29 Oct 2019 12:50:09 -0400 Subject: [PATCH 06/11] carry over the failed set when cancel recovery --- .../org/elasticsearch/cluster/routing/UnassignedInfo.java | 1 - .../org/elasticsearch/gateway/ReplicaShardAllocator.java | 5 ++++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index 119f9e52d4aa1..e7ccb4f3ce1ed 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -257,7 +257,6 @@ public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Excepti "failedAllocations: " + failedAllocations + " for reason " + reason; assert !(message == null && failure != null) : "provide a message if a failure exception is provided"; assert !(delayed && reason != Reason.NODE_LEFT) : "shard can only be delayed if it is unassigned due to a node leaving"; - assert failedAllocations >= failedNoopAllocationNodeIds.size() : "failedAllocations: " + failedAllocations + " failedNodeIds: " + failedNoopAllocationNodeIds; } public UnassignedInfo(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 5ccce164eb380..723f3c82c7ebf 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -48,6 +48,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; @@ -107,11 +108,13 @@ && canPerformOperationBasedRecovery(primaryStore, shardStores, currentNode) == f // we found a better match that can perform noop recovery, cancel the existing allocation. logger.debug("cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]", currentNode, nodeWithHighestMatch); + final Set failedNoopAllocationNodeIds = shard.unassignedInfo() == null ? Collections.emptySet() + : shard.unassignedInfo().getFailedNoopAllocationNodeIds(); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA, "existing allocation of replica to [" + currentNode + "] cancelled, can perform a noop recovery on ["+ nodeWithHighestMatch + "]", null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, - UnassignedInfo.AllocationStatus.NO_ATTEMPT, Collections.emptySet()); + UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNoopAllocationNodeIds); // don't cancel shard in the loop as it will cause a ConcurrentModificationException shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo, metaData.getIndexSafe(shard.index()), allocation.changes())); From a77b19d2add2dda590a363e000488261f73a3495 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 29 Oct 2019 14:54:42 -0400 Subject: [PATCH 07/11] ignorePreviousFailedNodes -> noMatchFailedNoopAllocationNodes --- .../org/elasticsearch/gateway/ReplicaShardAllocator.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 723f3c82c7ebf..cdc478e8b5484 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -302,7 +302,7 @@ private static TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore return nodeFilesStore.storeFilesMetaData(); } - private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation, boolean ignorePreviousFailedNodes, + private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation, boolean noMatchFailedNoopAllocationNodes, DiscoveryNode primaryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, AsyncShardFetch.FetchResult data, boolean explain) { @@ -310,8 +310,8 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al Map nodeDecisions = explain ? new HashMap<>() : null; for (Map.Entry nodeStoreEntry : data.getData().entrySet()) { DiscoveryNode discoNode = nodeStoreEntry.getKey(); - if (ignorePreviousFailedNodes - && shard.unassignedInfo() != null && shard.unassignedInfo().getFailedNoopAllocationNodeIds().contains(discoNode.getId())) { + if (noMatchFailedNoopAllocationNodes && shard.unassignedInfo() != null && + shard.unassignedInfo().getFailedNoopAllocationNodeIds().contains(discoNode.getId())) { continue; } TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData(); From e27bafe6f1ce02203b9c08dc6608a99d822219d8 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 29 Oct 2019 14:59:55 -0400 Subject: [PATCH 08/11] maintain the failed node list when reset failed counter --- .../cluster/routing/allocation/AllocationService.java | 3 ++- .../org/elasticsearch/gateway/ReplicaShardAllocatorIT.java | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 5fe26627dfea8..ead2a80da7930 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -319,7 +319,8 @@ private void resetFailedAllocationCounter(RoutingAllocation allocation) { UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), 0, unassignedInfo.getUnassignedTimeInNanos(), unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), - unassignedInfo.getLastAllocationStatus(), Collections.emptySet()), shardRouting.recoverySource(), allocation.changes()); + unassignedInfo.getLastAllocationStatus(), unassignedInfo.getFailedNoopAllocationNodeIds()), + shardRouting.recoverySource(), allocation.changes()); } } diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java index 4811d003ab824..a1b535c6a012a 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java @@ -312,7 +312,6 @@ public void testDoNotCancelRecoveryForBrokenNode() throws Exception { assertAcked(client().admin().indices().prepareUpdateSettings(indexName) .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1))); internalCluster().startDataOnlyNode(); - client().admin().cluster().prepareReroute().setRetryFailed(true).get(); ensureGreen(indexName); transportService.clearAllRules(); } From 51dd4e604e6538104cfc0f6755b005bc42572c16 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 29 Oct 2019 15:13:19 -0400 Subject: [PATCH 09/11] wait for new node --- .../org/elasticsearch/gateway/ReplicaShardAllocatorIT.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java index a1b535c6a012a..554b75a3dc539 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java @@ -301,9 +301,15 @@ public void testDoNotCancelRecoveryForBrokenNode() throws Exception { String brokenNode = internalCluster().startDataOnlyNode(); MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeWithPrimary); + CountDownLatch newNodeStarted = new CountDownLatch(1); transportService.addSendBehavior((connection, requestId, action, request, options) -> { if (action.equals(PeerRecoveryTargetService.Actions.TRANSLOG_OPS)) { if (brokenNode.equals(connection.getNode().getName())) { + try { + newNodeStarted.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } throw new CircuitBreakingException("not enough memory for indexing", 100, 50, CircuitBreaker.Durability.TRANSIENT); } } @@ -312,6 +318,7 @@ public void testDoNotCancelRecoveryForBrokenNode() throws Exception { assertAcked(client().admin().indices().prepareUpdateSettings(indexName) .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1))); internalCluster().startDataOnlyNode(); + newNodeStarted.countDown(); ensureGreen(indexName); transportService.clearAllRules(); } From 1ca4ed6e37e04240649a3ed47c99d4d79834e951 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 30 Oct 2019 12:20:07 -0400 Subject: [PATCH 10/11] discard failed nodes when retry failed allocation --- .../cluster/routing/allocation/AllocationService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index ead2a80da7930..f15ac2f630f6a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -319,7 +319,7 @@ private void resetFailedAllocationCounter(RoutingAllocation allocation) { UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), 0, unassignedInfo.getUnassignedTimeInNanos(), unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), - unassignedInfo.getLastAllocationStatus(), unassignedInfo.getFailedNoopAllocationNodeIds()), + unassignedInfo.getLastAllocationStatus(), Collections.emptySet()), shardRouting.recoverySource(), allocation.changes()); } } From 42e6b5430737e94d5f1b8e208850bddf14b1b58e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 30 Oct 2019 12:22:03 -0400 Subject: [PATCH 11/11] Track failed nodes except while started --- .../cluster/routing/RoutingNodes.java | 2 +- .../cluster/routing/UnassignedInfo.java | 54 +++++++------ .../routing/allocation/AllocationService.java | 12 ++- .../allocator/BalancedShardsAllocator.java | 2 +- .../gateway/ReplicaShardAllocator.java | 12 +-- .../cluster/routing/UnassignedInfoTests.java | 2 +- .../TrackFailedAllocationNodesTests.java | 81 +++++++++++++++++++ ...storeInProgressAllocationDeciderTests.java | 2 +- .../gateway/ReplicaShardAllocatorTests.java | 2 +- 9 files changed, 126 insertions(+), 43 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/cluster/routing/allocation/TrackFailedAllocationNodesTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 24463d8ebab94..e286a1f52ae19 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -873,7 +873,7 @@ public void ignoreShard(ShardRouting shard, AllocationStatus allocationStatus, R UnassignedInfo newInfo = new UnassignedInfo(currInfo.getReason(), currInfo.getMessage(), currInfo.getFailure(), currInfo.getNumFailedAllocations(), currInfo.getUnassignedTimeInNanos(), currInfo.getUnassignedTimeInMillis(), currInfo.isDelayed(), - allocationStatus, currInfo.getFailedNoopAllocationNodeIds()); + allocationStatus, currInfo.getFailedNodeIds()); ShardRouting updatedShard = shard.updateUnassigned(newInfo, shard.recoverySource()); changes.unassignedInfoUpdated(shard, newInfo); shard = updatedShard; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index e7ccb4f3ce1ed..7152f27d36ee5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -41,6 +41,7 @@ import java.time.Instant; import java.time.ZoneOffset; import java.util.Collections; +import java.util.List; import java.util.Locale; import java.util.Objects; import java.util.Set; @@ -217,7 +218,7 @@ public String value() { private final String message; private final Exception failure; private final int failedAllocations; - private final Set failedNoopAllocationNodeIds; + private final Set failedNodeIds; private final AllocationStatus lastAllocationStatus; // result of the last allocation attempt for this shard /** @@ -232,18 +233,18 @@ public UnassignedInfo(Reason reason, String message) { } /** - * @param reason the cause for making this shard unassigned. See {@link Reason} for more information. - * @param message more information about cause. - * @param failure the shard level failure that caused this shard to be unassigned, if exists. - * @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation - * @param unassignedTimeMillis the time of unassignment used to display to in our reporting. - * @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING. - * @param lastAllocationStatus the result of the last allocation attempt for this shard - * @param failedNoopAllocationNodeIds a set of nodeIds that failed noop allocations for this shard + * @param reason the cause for making this shard unassigned. See {@link Reason} for more information. + * @param message more information about cause. + * @param failure the shard level failure that caused this shard to be unassigned, if exists. + * @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation + * @param unassignedTimeMillis the time of unassignment used to display to in our reporting. + * @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING. + * @param lastAllocationStatus the result of the last allocation attempt for this shard + * @param failedNodeIds a set of nodeIds that failed to complete allocations for this shard */ public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Exception failure, int failedAllocations, long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed, AllocationStatus lastAllocationStatus, - Set failedNoopAllocationNodeIds) { + Set failedNodeIds) { this.reason = Objects.requireNonNull(reason); this.unassignedTimeMillis = unassignedTimeMillis; this.unassignedTimeNanos = unassignedTimeNanos; @@ -252,7 +253,7 @@ public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Excepti this.failure = failure; this.failedAllocations = failedAllocations; this.lastAllocationStatus = Objects.requireNonNull(lastAllocationStatus); - this.failedNoopAllocationNodeIds = Collections.unmodifiableSet(failedNoopAllocationNodeIds); + this.failedNodeIds = Collections.unmodifiableSet(failedNodeIds); assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED) : "failedAllocations: " + failedAllocations + " for reason " + reason; assert !(message == null && failure != null) : "provide a message if a failure exception is provided"; @@ -271,9 +272,9 @@ public UnassignedInfo(StreamInput in) throws IOException { this.failedAllocations = in.readVInt(); this.lastAllocationStatus = AllocationStatus.readFrom(in); if (in.getVersion().onOrAfter(Version.V_8_0_0)) { - this.failedNoopAllocationNodeIds = Collections.unmodifiableSet(in.readSet(StreamInput::readString)); + this.failedNodeIds = Collections.unmodifiableSet(in.readSet(StreamInput::readString)); } else { - this.failedNoopAllocationNodeIds = Collections.emptySet(); + this.failedNodeIds = Collections.emptySet(); } } @@ -287,7 +288,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(failedAllocations); lastAllocationStatus.writeTo(out); if (out.getVersion().onOrAfter(Version.V_8_0_0)) { - out.writeCollection(failedNoopAllocationNodeIds, StreamOutput::writeString); + out.writeCollection(failedNodeIds, StreamOutput::writeString); } } @@ -364,13 +365,16 @@ public AllocationStatus getLastAllocationStatus() { } /** - * A set of nodeIds that {@link org.elasticsearch.gateway.ReplicaShardAllocator} has cancelled ongoing recoveries for copies on them, - * but they failed to complete noop allocations for this shard. We track those nodes so we won't cancel recoveries for them again. + * A set of nodeIds that failed to complete allocations for this shard. {@link org.elasticsearch.gateway.ReplicaShardAllocator} + * uses this set to avoid repeatedly canceling ongoing recoveries for copies on those nodes although they can perform noop recoveries. + * This set will be discarded when a shard moves to started. And if a shard is failed while started (i.e., from started to unassigned), + * the currently assigned node won't be added to this set. * * @see org.elasticsearch.gateway.ReplicaShardAllocator#processExistingRecoveries(RoutingAllocation) + * @see org.elasticsearch.cluster.routing.allocation.AllocationService#applyFailedShards(ClusterState, List, List) */ - public Set getFailedNoopAllocationNodeIds() { - return failedNoopAllocationNodeIds; + public Set getFailedNodeIds() { + return failedNodeIds; } /** @@ -429,8 +433,8 @@ public String shortSummary() { if (failedAllocations > 0) { sb.append(", failed_attempts[").append(failedAllocations).append("]"); } - if (failedNoopAllocationNodeIds.isEmpty() == false) { - sb.append(", failed_noop_allocation_nodes[").append(failedNoopAllocationNodeIds).append("]"); + if (failedNodeIds.isEmpty() == false) { + sb.append(", failed_nodes[").append(failedNodeIds).append("]"); } sb.append(", delayed=").append(delayed); String details = getDetails(); @@ -455,15 +459,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (failedAllocations > 0) { builder.field("failed_attempts", failedAllocations); } + if (failedNodeIds.isEmpty() == false) { + builder.field("failed_nodes", failedNodeIds); + } builder.field("delayed", delayed); String details = getDetails(); if (details != null) { builder.field("details", details); } builder.field("allocation_status", lastAllocationStatus.value()); - if (failedNoopAllocationNodeIds.isEmpty() == false) { - builder.field("failed_noop_allocation_nodes", failedNoopAllocationNodeIds); - } builder.endObject(); return builder; } @@ -500,7 +504,7 @@ public boolean equals(Object o) { if (Objects.equals(failure, that.failure) == false) { return false; } - return failedNoopAllocationNodeIds.equals(that.failedNoopAllocationNodeIds); + return failedNodeIds.equals(that.failedNodeIds); } @Override @@ -512,7 +516,7 @@ public int hashCode() { result = 31 * result + (message != null ? message.hashCode() : 0); result = 31 * result + (failure != null ? failure.hashCode() : 0); result = 31 * result + lastAllocationStatus.hashCode(); - result = 31 * result + failedNoopAllocationNodeIds.hashCode(); + result = 31 * result + failedNodeIds.hashCode(); return result; } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index f15ac2f630f6a..b7c288b703ed1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -197,10 +197,9 @@ public ClusterState applyFailedShards(final ClusterState clusterState, final Lis } int failedAllocations = failedShard.unassignedInfo() != null ? failedShard.unassignedInfo().getNumFailedAllocations() : 0; final Set failedNodeIds; - if (failedShard.unassignedInfo() != null && - failedShard.unassignedInfo().getReason() == UnassignedInfo.Reason.REALLOCATED_REPLICA) { - failedNodeIds = new HashSet<>(failedShard.unassignedInfo().getFailedNoopAllocationNodeIds().size() + 1); - failedNodeIds.addAll(failedShard.unassignedInfo().getFailedNoopAllocationNodeIds()); + if (failedShard.unassignedInfo() != null) { + failedNodeIds = new HashSet<>(failedShard.unassignedInfo().getFailedNodeIds().size() + 1); + failedNodeIds.addAll(failedShard.unassignedInfo().getFailedNodeIds()); failedNodeIds.add(failedShard.currentNodeId()); } else { failedNodeIds = Collections.emptySet(); @@ -301,7 +300,7 @@ private void removeDelayMarkers(RoutingAllocation allocation) { unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus(), - unassignedInfo.getFailedNoopAllocationNodeIds()), shardRouting.recoverySource(), allocation.changes()); + unassignedInfo.getFailedNodeIds()), shardRouting.recoverySource(), allocation.changes()); } } } @@ -319,8 +318,7 @@ private void resetFailedAllocationCounter(RoutingAllocation allocation) { UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), 0, unassignedInfo.getUnassignedTimeInNanos(), unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), - unassignedInfo.getLastAllocationStatus(), Collections.emptySet()), - shardRouting.recoverySource(), allocation.changes()); + unassignedInfo.getLastAllocationStatus(), Collections.emptySet()), shardRouting.recoverySource(), allocation.changes()); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index a589ad00ffbf5..9a25c45ea355d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -153,7 +153,7 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) { unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), AllocationStatus.DECIDERS_NO, - unassignedInfo.getFailedNoopAllocationNodeIds()), + unassignedInfo.getFailedNodeIds()), shardRouting.recoverySource(), allocation.changes()); } } diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index cdc478e8b5484..03bc84477e95b 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -108,13 +108,13 @@ && canPerformOperationBasedRecovery(primaryStore, shardStores, currentNode) == f // we found a better match that can perform noop recovery, cancel the existing allocation. logger.debug("cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]", currentNode, nodeWithHighestMatch); - final Set failedNoopAllocationNodeIds = shard.unassignedInfo() == null ? Collections.emptySet() - : shard.unassignedInfo().getFailedNoopAllocationNodeIds(); + final Set failedNodeIds = + shard.unassignedInfo() == null ? Collections.emptySet() : shard.unassignedInfo().getFailedNodeIds(); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA, "existing allocation of replica to [" + currentNode + "] cancelled, can perform a noop recovery on ["+ nodeWithHighestMatch + "]", null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, - UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNoopAllocationNodeIds); + UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodeIds); // don't cancel shard in the loop as it will cause a ConcurrentModificationException shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo, metaData.getIndexSafe(shard.index()), allocation.changes())); @@ -302,7 +302,7 @@ private static TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore return nodeFilesStore.storeFilesMetaData(); } - private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation, boolean noMatchFailedNoopAllocationNodes, + private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation, boolean noMatchFailedNodes, DiscoveryNode primaryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, AsyncShardFetch.FetchResult data, boolean explain) { @@ -310,8 +310,8 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al Map nodeDecisions = explain ? new HashMap<>() : null; for (Map.Entry nodeStoreEntry : data.getData().entrySet()) { DiscoveryNode discoNode = nodeStoreEntry.getKey(); - if (noMatchFailedNoopAllocationNodes && shard.unassignedInfo() != null && - shard.unassignedInfo().getFailedNoopAllocationNodeIds().contains(discoNode.getId())) { + if (noMatchFailedNodes && shard.unassignedInfo() != null && + shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) { continue; } TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData(); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index 2e92a5f7be794..eb5b5c10c843c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -100,7 +100,7 @@ public void testSerialization() throws Exception { assertThat(read.getMessage(), equalTo(meta.getMessage())); assertThat(read.getDetails(), equalTo(meta.getDetails())); assertThat(read.getNumFailedAllocations(), equalTo(meta.getNumFailedAllocations())); - assertThat(read.getFailedNoopAllocationNodeIds(), equalTo(meta.getFailedNoopAllocationNodeIds())); + assertThat(read.getFailedNodeIds(), equalTo(meta.getFailedNodeIds())); } public void testIndexCreated() { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/TrackFailedAllocationNodesTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/TrackFailedAllocationNodesTests.java new file mode 100644 index 0000000000000..829b7ef1c9713 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/TrackFailedAllocationNodesTests.java @@ -0,0 +1,81 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; +import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.elasticsearch.common.settings.Settings; + +import java.util.HashSet; +import java.util.Set; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; + +public class TrackFailedAllocationNodesTests extends ESAllocationTestCase { + + public void testTrackFailedNodes() { + int maxRetries = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(Settings.EMPTY); + AllocationService allocationService = createAllocationService(); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("idx").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) + .build(); + DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); + for (int i = 0; i < 5; i++) { + discoNodes.add(newNode("node-" + i)); + } + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(discoNodes) + .metaData(metaData).routingTable(RoutingTable.builder().addAsNew(metaData.index("idx")).build()) + .build(); + clusterState = allocationService.reroute(clusterState, "reroute"); + Set failedNodeIds = new HashSet<>(); + + // track the failed nodes if shard is not started + for (int i = 0; i < maxRetries; i++) { + failedNodeIds.add(clusterState.routingTable().index("idx").shard(0).shards().get(0).currentNodeId()); + clusterState = allocationService.applyFailedShard( + clusterState, clusterState.routingTable().index("idx").shard(0).shards().get(0), randomBoolean()); + assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), + equalTo(failedNodeIds)); + } + + // reroute with retryFailed=true should discard the failedNodes + assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).state(), equalTo(ShardRoutingState.UNASSIGNED)); + clusterState = allocationService.reroute(clusterState, new AllocationCommands(), false, true).getClusterState(); + assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), empty()); + + // do not track the failed nodes while shard is started + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).state(), equalTo(ShardRoutingState.STARTED)); + clusterState = allocationService.applyFailedShard( + clusterState, clusterState.routingTable().index("idx").shard(0).shards().get(0), false); + assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), empty()); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java index 0f8e39210164c..fbedd5e1799c6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java @@ -120,7 +120,7 @@ public void testCanAllocatePrimaryExistingInRestoreInProgress() { UnassignedInfo currentInfo = primary.unassignedInfo(); UnassignedInfo newInfo = new UnassignedInfo(currentInfo.getReason(), currentInfo.getMessage(), new IOException("i/o failure"), currentInfo.getNumFailedAllocations(), currentInfo.getUnassignedTimeInNanos(), currentInfo.getUnassignedTimeInMillis(), - currentInfo.isDelayed(), currentInfo.getLastAllocationStatus(), currentInfo.getFailedNoopAllocationNodeIds()); + currentInfo.isDelayed(), currentInfo.getLastAllocationStatus(), currentInfo.getFailedNodeIds()); primary = primary.updateUnassigned(newInfo, primary.recoverySource()); IndexRoutingTable indexRoutingTable = routingTable.index("test"); diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index 2d908d0dc7d7b..a91752a42580b 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -205,7 +205,7 @@ public void testCancelRecoveryIfFoundCopyWithNoopRetentionLease() { assertThat(unassignedShards, hasSize(1)); assertThat(unassignedShards.get(0).shardId(), equalTo(shardId)); assertThat(unassignedShards.get(0).unassignedInfo().getNumFailedAllocations(), equalTo(0)); - assertThat(unassignedShards.get(0).unassignedInfo().getFailedNoopAllocationNodeIds(), empty()); + assertThat(unassignedShards.get(0).unassignedInfo().getFailedNodeIds(), empty()); } public void testNotCancellingRecoveryIfCurrentRecoveryHasRetentionLease() {