From 0ba415f822945ac21bb6e497bc1fa8a7033c4a1c Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 17 Dec 2019 09:42:02 +0100 Subject: [PATCH 1/2] Fix Index Deletion During Partial Snapshot Create (#50234) * Fix Index Deletion During Partial Snapshot Create We can simply filter out shard generation updates for indices that were removed from the cluster state concurrently to fix index deletes during partial snapshots as that completely removes any reference to those shards from the snapshot. Follow up to #50202 Closes #50200 --- .../repositories/ShardGenerations.java | 7 +++ .../snapshots/SnapshotsService.java | 24 +++++++--- .../snapshots/SnapshotResiliencyTests.java | 46 +++++++++++++++++-- 3 files changed, 66 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java b/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java index 6351d5e2f2bf0..8b7f799d0e7c2 100644 --- a/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java +++ b/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java @@ -54,6 +54,13 @@ private ShardGenerations(Map> shardGenerations) { this.shardGenerations = shardGenerations; } + /** + * Returns the total number of shards tracked by this instance. + */ + public int totalShards() { + return shardGenerations.values().stream().mapToInt(List::size).sum(); + } + /** * Returns all indices for which shard generations are tracked. * diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 346f3ca9181bf..4a1ed129460ea 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -582,16 +582,17 @@ public void onNoLongerMaster() { private void cleanupAfterError(Exception exception) { threadPool.generic().execute(() -> { if (snapshotCreated) { + final MetaData metaData = clusterService.state().metaData(); repositoriesService.repository(snapshot.snapshot().getRepository()) .finalizeSnapshot(snapshot.snapshot().getSnapshotId(), - buildGenerations(snapshot), + buildGenerations(snapshot, metaData), snapshot.startTime(), ExceptionsHelper.stackTrace(exception), 0, Collections.emptyList(), snapshot.repositoryStateId(), snapshot.includeGlobalState(), - metaDataForSnapshot(snapshot, clusterService.state().metaData()), + metaDataForSnapshot(snapshot, metaData), snapshot.userMetadata(), snapshot.useShardGenerations(), ActionListener.runAfter(ActionListener.wrap(ignored -> { @@ -607,11 +608,21 @@ private void cleanupAfterError(Exception exception) { } } - private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot) { + private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, MetaData metaData) { ShardGenerations.Builder builder = ShardGenerations.builder(); final Map indexLookup = new HashMap<>(); snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx)); - snapshot.shards().forEach(c -> builder.put(indexLookup.get(c.key.getIndexName()), c.key.id(), c.value.generation())); + snapshot.shards().forEach(c -> { + if (metaData.index(c.key.getIndex()) == null) { + assert snapshot.partial() : + "Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial."; + return; + } + final IndexId indexId = indexLookup.get(c.key.getIndexName()); + if (indexId != null) { + builder.put(indexId, c.key.id(), c.value.generation()); + } + }); return builder.build(); } @@ -1046,12 +1057,13 @@ protected void doRun() { shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason())); } } + final ShardGenerations shardGenerations = buildGenerations(entry, metaData); repository.finalizeSnapshot( snapshot.getSnapshotId(), - buildGenerations(entry), + shardGenerations, entry.startTime(), failure, - entry.shards().size(), + entry.partial() ? shardGenerations.totalShards() : entry.shards().size(), unmodifiableList(shardFailures), entry.repositoryStateId(), entry.includeGlobalState(), diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 2157177752fbe..1622ba92a2efb 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -143,6 +144,7 @@ import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards; +import org.elasticsearch.index.Index; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseSyncer; @@ -213,6 +215,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.Mockito.mock; @@ -505,7 +508,7 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { } } - public void testConcurrentSnapshotDeleteAndDeleteIndex() { + public void testConcurrentSnapshotDeleteAndDeleteIndex() throws IOException { setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); String repoName = "repo"; @@ -516,11 +519,13 @@ public void testConcurrentSnapshotDeleteAndDeleteIndex() { testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); final StepListener> createIndicesListener = new StepListener<>(); + final int indices = randomIntBetween(5, 20); + final SetOnce firstIndex = new SetOnce<>(); continueOrDie(createRepoAndIndex(repoName, index, 1), createIndexResponse -> { + firstIndex.set(masterNode.clusterService.state().metaData().index(index).getIndex()); // create a few more indices to make it more likely that the subsequent index delete operation happens before snapshot // finalization - final int indices = randomIntBetween(5, 20); final GroupedActionListener listener = new GroupedActionListener<>(createIndicesListener, indices); for (int i = 0; i < indices; ++i) { client().admin().indices().create(new CreateIndexRequest("index-" + i), listener); @@ -529,23 +534,54 @@ public void testConcurrentSnapshotDeleteAndDeleteIndex() { final StepListener createSnapshotResponseStepListener = new StepListener<>(); + final boolean partialSnapshot = randomBoolean(); + continueOrDie(createIndicesListener, createIndexResponses -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(false) - .execute(createSnapshotResponseStepListener)); + .setPartial(partialSnapshot).execute(createSnapshotResponseStepListener)); continueOrDie(createSnapshotResponseStepListener, - createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), noopListener())); + createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + if (partialSnapshot) { + // Recreate index by the same name to test that we don't snapshot conflicting metadata in this scenario + client().admin().indices().create(new CreateIndexRequest(index), noopListener()); + } + } + + @Override + public void onFailure(Exception e) { + if (partialSnapshot) { + throw new AssertionError("Delete index should always work during partial snapshots", e); + } + } + })); deterministicTaskQueue.runAllRunnableTasks(); SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); + final RepositoryData repositoryData = getRepositoryData(repository); + Collection snapshotIds = repositoryData.getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + if (partialSnapshot) { + // Single shard for each index so we either get all indices or all except for the deleted index + assertThat(snapshotInfo.successfulShards(), either(is(indices + 1)).or(is(indices))); + if (snapshotInfo.successfulShards() == indices + 1) { + final IndexMetaData indexMetaData = + repository.getSnapshotIndexMetaData(snapshotInfo.snapshotId(), repositoryData.resolveIndexId(index)); + // Make sure we snapshotted the metadata of this index and not the recreated version + assertEquals(indexMetaData.getIndex(), firstIndex.get()); + } + } else { + // Index delete must be blocked for non-partial snapshots and we get a snapshot for every index + assertEquals(snapshotInfo.successfulShards(), indices + 1); + } assertEquals(0, snapshotInfo.failedShards()); } From 356e9413e249f766209d6ecbf5d282fc8a33dcf3 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 17 Dec 2019 10:09:19 +0100 Subject: [PATCH 2/2] fix compile --- .../snapshots/SnapshotResiliencyTests.java | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 1622ba92a2efb..17001f00ecc4c 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -541,22 +541,23 @@ public void testConcurrentSnapshotDeleteAndDeleteIndex() throws IOException { .setPartial(partialSnapshot).execute(createSnapshotResponseStepListener)); continueOrDie(createSnapshotResponseStepListener, - createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), new ActionListener<>() { - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - if (partialSnapshot) { - // Recreate index by the same name to test that we don't snapshot conflicting metadata in this scenario - client().admin().indices().create(new CreateIndexRequest(index), noopListener()); + createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), + new ActionListener() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + if (partialSnapshot) { + // Recreate index by the same name to test that we don't snapshot conflicting metadata in this scenario + client().admin().indices().create(new CreateIndexRequest(index), noopListener()); + } } - } - @Override - public void onFailure(Exception e) { - if (partialSnapshot) { - throw new AssertionError("Delete index should always work during partial snapshots", e); + @Override + public void onFailure(Exception e) { + if (partialSnapshot) { + throw new AssertionError("Delete index should always work during partial snapshots", e); + } } - } - })); + })); deterministicTaskQueue.runAllRunnableTasks();