From 0b67ab140472c06f0c4a2adf53ccbcf731dac534 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Tue, 19 Oct 2021 02:06:40 +0200 Subject: [PATCH 1/3] Fix race condition in SnapshotBasedIndexRecoveryIT If we don't cancel the re-location of the index to the same target node, it is possible that the recovery is retried, causing a race condition. --- .../indices/recovery/SnapshotBasedIndexRecoveryIT.java | 5 +++++ .../org/elasticsearch/indices/recovery/RecoveryTarget.java | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java index 5dd6d8633d9d7..e42e91b65a58a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java @@ -933,6 +933,11 @@ public void testRecoveryUsingSnapshotsPermitIsReturnedAfterFailureOrCancellation targetMockTransportService.clearAllRules(); channelRef.get().sendResponse(new IOException("unable to clean files")); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot1) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", sourceNode)).get() + ); } String indexRecoveredFromSnapshot2 = indices.get(1); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 1a8e29e48c2de..dde22a4def4a5 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -295,11 +295,11 @@ protected void closeInternal() { try { multiFileWriter.close(); } finally { + releaseSnapshotFileDownloadsPermit(); // free store. increment happens in constructor store.decRef(); indexShard.recoveryStats().decCurrentAsTarget(); closedLatch.countDown(); - releaseSnapshotFileDownloadsPermit(); } } From a884abaaa53b71f635f618bad12d6b3b528e5607 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Tue, 19 Oct 2021 12:04:02 +0200 Subject: [PATCH 2/3] More robust fix --- .../SnapshotBasedIndexRecoveryIT.java | 22 ++++++++++++------- .../recovery/PeerRecoveryTargetService.java | 7 +++++- .../indices/recovery/RecoveryTarget.java | 2 +- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java index a59d06a1ca482..91c49090ddf69 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.CheckedRunnable; +import org.elasticsearch.core.Releasable; import org.elasticsearch.env.Environment; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.MergePolicyConfig; @@ -891,7 +892,6 @@ public void testRecoveryUsingSnapshotsIsThrottledPerNode() throws Exception { }); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/79420") public void testRecoveryUsingSnapshotsPermitIsReturnedAfterFailureOrCancellation() throws Exception { executeRecoveryWithSnapshotFileDownloadThrottled((indices, sourceNode, @@ -904,7 +904,9 @@ public void testRecoveryUsingSnapshotsPermitIsReturnedAfterFailureOrCancellation assertAcked( client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot1) .setSettings(Settings.builder() - .put("index.routing.allocation.require._name", targetNode)).get() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.routing.allocation.require._name", (String) null) + .put("index.routing.allocation.include._name", sourceNode + "," + targetNode)).get() ); awaitForRecoverSnapshotFileRequestReceived.run(); @@ -935,11 +937,14 @@ public void testRecoveryUsingSnapshotsPermitIsReturnedAfterFailureOrCancellation targetMockTransportService.clearAllRules(); channelRef.get().sendResponse(new IOException("unable to clean files")); - assertAcked( - client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot1) - .setSettings(Settings.builder() - .put("index.routing.allocation.require._name", sourceNode)).get() - ); + PeerRecoveryTargetService peerRecoveryTargetService = + internalCluster().getInstance(PeerRecoveryTargetService.class, targetNode); + assertBusy(() -> { + // Wait until the current RecoveryTarget releases the snapshot download permit + Releasable snapshotDownloadPermit = peerRecoveryTargetService.tryAcquireSnapshotDownloadPermits(); + assertThat(snapshotDownloadPermit, is(notNullValue())); + snapshotDownloadPermit.close(); + }); } String indexRecoveredFromSnapshot2 = indices.get(1); @@ -1092,10 +1097,11 @@ private void executeRecoveryWithSnapshotFileDownloadThrottled(SnapshotBasedRecov createIndex(indexName, Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s") .put("index.routing.allocation.require._name", dataNodes.get(0)) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.allocation.max_retries", 0) .build() ); indices.add(indexName); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 471e93451532d..f777ae5ab467b 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -138,7 +138,7 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh } public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) { - final Releasable snapshotFileDownloadsPermit = recoverySettings.tryAcquireSnapshotDownloadPermits(); + final Releasable snapshotFileDownloadsPermit = tryAcquireSnapshotDownloadPermits(); // create a new recovery status, and process... final long recoveryId = onGoingRecoveries.startRecovery( indexShard, @@ -221,6 +221,11 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi new RecoveryResponseHandler(startRequest, timer)); } + // Visible for testing + Releasable tryAcquireSnapshotDownloadPermits() { + return recoverySettings.tryAcquireSnapshotDownloadPermits(); + } + /** * Prepare the start recovery request. * diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 11c750da06e2c..f3406cf22cf71 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -300,11 +300,11 @@ protected void closeInternal() { try { multiFileWriter.close(); } finally { - releaseSnapshotFileDownloadsPermit(); // free store. increment happens in constructor store.decRef(); indexShard.recoveryStats().decCurrentAsTarget(); closedLatch.countDown(); + releaseSnapshotFileDownloadsPermit(); } } From 79210656526c2bd18f969479fb38a8c9d676abc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Wed, 24 Nov 2021 12:20:39 +0100 Subject: [PATCH 3/3] Review nits --- .../recovery/SnapshotBasedIndexRecoveryIT.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/snapshot-based-recoveries/src/internalClusterTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/SnapshotBasedIndexRecoveryIT.java b/x-pack/plugin/snapshot-based-recoveries/src/internalClusterTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/SnapshotBasedIndexRecoveryIT.java index 17ba24332ed01..40bc86fbf77b9 100644 --- a/x-pack/plugin/snapshot-based-recoveries/src/internalClusterTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/SnapshotBasedIndexRecoveryIT.java +++ b/x-pack/plugin/snapshot-based-recoveries/src/internalClusterTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/SnapshotBasedIndexRecoveryIT.java @@ -86,6 +86,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY; import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING; import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS; import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE; @@ -974,9 +975,9 @@ public void testRecoveryUsingSnapshotsPermitIsReturnedAfterFailureOrCancellation ); assertBusy(() -> { // Wait until the current RecoveryTarget releases the snapshot download permit - Releasable snapshotDownloadPermit = peerRecoveryTargetService.tryAcquireSnapshotDownloadPermits(); - assertThat(snapshotDownloadPermit, is(notNullValue())); - snapshotDownloadPermit.close(); + try (Releasable snapshotDownloadPermit = peerRecoveryTargetService.tryAcquireSnapshotDownloadPermits()) { + assertThat(snapshotDownloadPermit, is(notNullValue())); + } }); } @@ -1159,7 +1160,7 @@ private void executeRecoveryWithSnapshotFileDownloadThrottled(SnapshotBasedRecov .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s") .put("index.routing.allocation.require._name", dataNodes.get(0)) - .put("index.allocation.max_retries", 0) + .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), 0) .build() ); indices.add(indexName);