diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index e240485fe6257..fb8215294e17b 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -160,8 +160,8 @@ public void testEnforcedCooldownPeriod() throws IOException { final RepositoryData repositoryData = getRepositoryData(repository); final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion())); - final BytesReference serialized = - BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), false)); + final BytesReference serialized = BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), + SnapshotsService.OLD_SNAPSHOT_FORMAT)); PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> { try (InputStream stream = serialized.streamInput()) { repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic( diff --git a/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java b/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java index f3b02c71578b1..616b19345c241 100644 --- a/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java +++ b/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java @@ -218,7 +218,7 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException { ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards); } } else { - if (SnapshotsService.useShardGenerations(minimumNodeVersion()) == false) { + if (SnapshotsService.useIndexGenerations(minimumNodeVersion()) == false) { assertThat(TEST_STEP, is(TestStep.STEP3_OLD_CLUSTER)); final List> expectedExceptions = Arrays.asList(ResponseException.class, ElasticsearchStatusException.class); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java index d4f3d117732f0..c12c0b5d17c61 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.IndexMetaDataGenerations; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; @@ -273,11 +274,12 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception { SnapshotId::getUUID, Function.identity())), repositoryData.getSnapshotIds().stream().collect(Collectors.toMap( SnapshotId::getUUID, repositoryData::getSnapshotState)), - Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY); + Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY); Files.write(repo.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + withoutVersions.getGenId()), - BytesReference.toBytes(BytesReference.bytes(withoutVersions.snapshotsToXContent(XContentFactory.jsonBuilder(), - true))), StandardOpenOption.TRUNCATE_EXISTING); + BytesReference.toBytes(BytesReference.bytes( + withoutVersions.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT))), + StandardOpenOption.TRUNCATE_EXISTING); logger.info("--> verify that repo is assumed in old metadata format"); final SnapshotsService snapshotsService = internalCluster().getCurrentMasterNodeInstance(SnapshotsService.class); @@ -403,11 +405,12 @@ public void testRepairBrokenShardGenerations() throws IOException { Collectors.toMap(SnapshotId::getUUID, repositoryData1::getVersion)), repositoryData1.getIndices().values().stream().collect( Collectors.toMap(Function.identity(), repositoryData1::getSnapshots) - ), ShardGenerations.builder().putAll(repositoryData1.shardGenerations()).put(indexId, 0, "0").build() + ), ShardGenerations.builder().putAll(repositoryData1.shardGenerations()).put(indexId, 0, "0").build(), + repositoryData1.indexMetaDataGenerations() ); Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData1.getGenId()), BytesReference.toBytes(BytesReference.bytes( - brokenRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), true))), + brokenRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT))), StandardOpenOption.TRUNCATE_EXISTING); logger.info("--> recreating repository to clear caches"); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 7e33d11d1dba3..71ce2cfe4f4c1 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -73,9 +73,11 @@ import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.RepositoryMissingException; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.rest.AbstractRestChannel; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.admin.cluster.RestClusterStateAction; import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction; import org.elasticsearch.snapshots.mockstore.MockRepository; @@ -996,6 +998,8 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException { SnapshotStats stats = snapshots.get(0).getStats(); + final List snapshot0IndexMetaFiles = findRepoMetaBlobs(repoPath); + assertThat(snapshot0IndexMetaFiles, hasSize(1)); // snapshotting a single index assertThat(stats.getTotalFileCount(), greaterThanOrEqualTo(snapshot0FileCount)); assertThat(stats.getTotalSize(), greaterThanOrEqualTo(snapshot0FileSize)); @@ -1023,6 +1027,10 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException { .get(); final List snapshot1Files = scanSnapshotFolder(repoPath); + final List snapshot1IndexMetaFiles = findRepoMetaBlobs(repoPath); + + // The IndexMetadata did not change between snapshots, verify that no new redundant IndexMetaData was written to the repository + assertThat(snapshot1IndexMetaFiles, is(snapshot0IndexMetaFiles)); final int snapshot1FileCount = snapshot1Files.size(); final long snapshot1FileSize = calculateTotalFilesSize(snapshot1Files); @@ -1047,6 +1055,65 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException { assertThat(anotherStats.getTotalSize(), greaterThanOrEqualTo(snapshot1FileSize)); } + public void testDeduplicateIndexMetadata() throws Exception { + final String indexName = "test-blocks-1"; + final String repositoryName = "repo-" + indexName; + final String snapshot0 = "snapshot-0"; + final String snapshot1 = "snapshot-1"; + final String snapshot2 = "snapshot-2"; + + createIndex(indexName); + + int docs = between(10, 100); + for (int i = 0; i < docs; i++) { + client().prepareIndex(indexName, "_doc").setSource("test", "init").execute().actionGet(); + } + + final Path repoPath = randomRepoPath(); + createRepository(repositoryName, "fs", repoPath); + + logger.info("--> create a snapshot"); + client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot0) + .setIncludeGlobalState(true) + .setWaitForCompletion(true) + .get(); + + final List snapshot0IndexMetaFiles = findRepoMetaBlobs(repoPath); + assertThat(snapshot0IndexMetaFiles, hasSize(1)); // snapshotting a single index + + docs = between(1, 5); + for (int i = 0; i < docs; i++) { + client().prepareIndex(indexName, "_doc").setSource("test", "test" + i).execute().actionGet(); + } + + logger.info("--> restart random data node and add new data node to change index allocation"); + internalCluster().restartRandomDataNode(); + internalCluster().startDataOnlyNode(); + ensureGreen(indexName); + + assertThat(client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot1).setWaitForCompletion(true).get().status(), + equalTo(RestStatus.OK)); + + final List snapshot1IndexMetaFiles = findRepoMetaBlobs(repoPath); + + // The IndexMetadata did not change between snapshots, verify that no new redundant IndexMetaData was written to the repository + assertThat(snapshot1IndexMetaFiles, is(snapshot0IndexMetaFiles)); + + // index to some other field to trigger a change in index metadata + for (int i = 0; i < docs; i++) { + client().prepareIndex(indexName, "_doc").setSource("new_field", "test" + i).execute().actionGet(); + } + assertThat(client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot2).setWaitForCompletion(true).get().status(), + equalTo(RestStatus.OK)); + + final List snapshot2IndexMetaFiles = findRepoMetaBlobs(repoPath); + assertThat(snapshot2IndexMetaFiles, hasSize(2)); // should have created one new metadata blob + + assertAcked(client().admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot0, snapshot1).get()); + final List snapshot3IndexMetaFiles = findRepoMetaBlobs(repoPath); + assertThat(snapshot3IndexMetaFiles, hasSize(1)); // should have deleted the metadata blob referenced by the first two snapshots + } + public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception { logger.info("--> starting a master node and two data nodes"); internalCluster().startMasterOnlyNode(); @@ -1256,6 +1323,22 @@ private long calculateTotalFilesSize(List files) { }).sum(); } + private static List findRepoMetaBlobs(Path repoPath) throws IOException { + List files = new ArrayList<>(); + Files.walkFileTree(repoPath.resolve("indices"), new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + final String fileName = file.getFileName().toString(); + if (fileName.startsWith(BlobStoreRepository.METADATA_PREFIX) && fileName.endsWith(".dat")) { + files.add(file); + } + return super.visitFile(file, attrs); + } + } + ); + return files; + } + private List scanSnapshotFolder(Path repoPath) throws IOException { List files = new ArrayList<>(); Files.walkFileTree(repoPath, new SimpleFileVisitor(){ diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java index c12a0b9b76483..2850eb777dca7 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; @@ -34,6 +35,7 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.mockstore.MockRepository; @@ -198,9 +200,10 @@ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) { } @Override - public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId indexId) throws IOException { + public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, + IndexId indexId) throws IOException { indicesMetadata.computeIfAbsent(key(snapshotId.getName(), indexId.getName()), (s) -> new AtomicInteger(0)).incrementAndGet(); - return super.getSnapshotIndexMetadata(snapshotId, indexId); + return super.getSnapshotIndexMetaData(PlainActionFuture.get(this::getRepositoryData), snapshotId, indexId); } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 2666b2434198d..7234c192209f8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -2546,7 +2546,8 @@ public void testRestoreSnapshotWithCorruptedIndexMetadata() throws Exception { final IndexId corruptedIndex = randomFrom(indexIds.values()); final Path indexMetadataPath = repo.resolve("indices") .resolve(corruptedIndex.getId()) - .resolve("meta-" + snapshotInfo.snapshotId().getUUID() + ".dat"); + .resolve( + "meta-" + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotInfo.snapshotId(), corruptedIndex) + ".dat"); // Truncate the index metadata file try(SeekableByteChannel outChan = Files.newByteChannel(indexMetadataPath, StandardOpenOption.WRITE)) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index c9c4a46963898..e2dea018b4ff8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -334,7 +334,7 @@ private Map snapshotShards(final String repos final Map shardStatus = new HashMap<>(); for (String index : snapshotInfo.indices()) { IndexId indexId = repositoryData.resolveIndexId(index); - IndexMetadata indexMetadata = repository.getSnapshotIndexMetadata(snapshotInfo.snapshotId(), indexId); + IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(repositoryData, snapshotInfo.snapshotId(), indexId); if (indexMetadata != null) { int numberOfShards = indexMetadata.getNumberOfShards(); for (int i = 0; i < numberOfShards; i++) { diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index b6ac4958975c3..588b85f23d599 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -70,8 +70,8 @@ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) { } @Override - public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException { - return in.getSnapshotIndexMetadata(snapshotId, index); + public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException { + return in.getSnapshotIndexMetaData(repositoryData, snapshotId, index); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/IndexMetaDataGenerations.java b/server/src/main/java/org/elasticsearch/repositories/IndexMetaDataGenerations.java new file mode 100644 index 0000000000000..bc1b6ae8b436c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/IndexMetaDataGenerations.java @@ -0,0 +1,177 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.snapshots.SnapshotId; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Tracks the blob uuids of blobs containing {@link IndexMetadata} for snapshots as well an identifier for each of these blobs. + * Before writing a new {@link IndexMetadata} blob during snapshot finalization in + * {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository#finalizeSnapshot} the identifier for an instance of + * {@link IndexMetadata} should be computed and then used to check if it already exists in the repository via + * {@link #getIndexMetaBlobId(String)}. + */ +public final class IndexMetaDataGenerations { + + public static final IndexMetaDataGenerations EMPTY = new IndexMetaDataGenerations(Collections.emptyMap(), Collections.emptyMap()); + + /** + * Map of {@link SnapshotId} to a map of the indices in a snapshot mapping {@link IndexId} to metadata identifiers. + * The identifiers in the nested map can be mapped to the relevant blob uuid via {@link #getIndexMetaBlobId}. + */ + final Map> lookup; + + /** + * Map of index metadata identifier to blob uuid. + */ + final Map identifiers; + + IndexMetaDataGenerations(Map> lookup, Map identifiers) { + assert identifiers.keySet().equals(lookup.values().stream().flatMap(m -> m.values().stream()).collect(Collectors.toSet())) : + "identifier mappings " + identifiers + " don't track the same blob ids as the lookup map " + lookup; + assert lookup.values().stream().noneMatch(Map::isEmpty) : "Lookup contained empty map [" + lookup + "]"; + this.lookup = Collections.unmodifiableMap(lookup); + this.identifiers = Collections.unmodifiableMap(identifiers); + } + + public boolean isEmpty() { + return identifiers.isEmpty(); + } + + /** + * Gets the blob id by the identifier of {@link org.elasticsearch.cluster.metadata.IndexMetadata} + * (computed via {@link #buildUniqueIdentifier}) or {@code null} if none is tracked for the identifier. + * + * @param metaIdentifier identifier for {@link IndexMetadata} + * @return blob id for the given metadata identifier or {@code null} if the identifier is not part of the repository yet + */ + @Nullable + public String getIndexMetaBlobId(String metaIdentifier) { + return identifiers.get(metaIdentifier); + } + + /** + * Get the blob id by {@link SnapshotId} and {@link IndexId} and fall back to the value of {@link SnapshotId#getUUID()} if none is + * known to enable backwards compatibility with versions older than + * {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} which used the snapshot uuid as index metadata + * blob uuid. + * + * @param snapshotId Snapshot Id + * @param indexId Index Id + * @return blob id for the given index metadata + */ + public String indexMetaBlobId(SnapshotId snapshotId, IndexId indexId) { + final String identifier = lookup.getOrDefault(snapshotId, Collections.emptyMap()).get(indexId); + if (identifier == null) { + return snapshotId.getUUID(); + } else { + return identifiers.get(identifier); + } + } + + /** + * Create a new instance with the given snapshot and index metadata uuids and identifiers added. + * + * @param snapshotId SnapshotId + * @param newLookup new mappings of index + snapshot to index metadata identifier + * @param newIdentifiers new mappings of index metadata identifier to blob id + * @return instance with added snapshot + */ + public IndexMetaDataGenerations withAddedSnapshot(SnapshotId snapshotId, Map newLookup, + Map newIdentifiers) { + final Map> updatedIndexMetaLookup = new HashMap<>(this.lookup); + final Map updatedIndexMetaIdentifiers = new HashMap<>(identifiers); + updatedIndexMetaIdentifiers.putAll(newIdentifiers); + updatedIndexMetaLookup.compute(snapshotId, (snId, lookup) -> { + if (lookup == null) { + if (newLookup.isEmpty()) { + return null; + } + return Collections.unmodifiableMap(new HashMap<>(newLookup)); + } else { + final Map updated = new HashMap<>(lookup); + updated.putAll(newLookup); + return Collections.unmodifiableMap(updated); + } + }); + return new IndexMetaDataGenerations(updatedIndexMetaLookup, updatedIndexMetaIdentifiers); + } + + /** + * Create a new instance with the given snapshot removed. + * + * @param snapshotIds SnapshotIds to remove + * @return new instance without the given snapshot + */ + public IndexMetaDataGenerations withRemovedSnapshots(Collection snapshotIds) { + final Map> updatedIndexMetaLookup = new HashMap<>(lookup); + updatedIndexMetaLookup.keySet().removeAll(snapshotIds); + final Map updatedIndexMetaIdentifiers = new HashMap<>(identifiers); + updatedIndexMetaIdentifiers.keySet().removeIf( + k -> updatedIndexMetaLookup.values().stream().noneMatch(identifiers -> identifiers.containsValue(k))); + return new IndexMetaDataGenerations(updatedIndexMetaLookup, updatedIndexMetaIdentifiers); + } + + @Override + public int hashCode() { + return Objects.hash(identifiers, lookup); + } + + @Override + public boolean equals(Object that) { + if (this == that) { + return true; + } + if (that instanceof IndexMetaDataGenerations == false) { + return false; + } + final IndexMetaDataGenerations other = (IndexMetaDataGenerations) that; + return lookup.equals(other.lookup) && identifiers.equals(other.identifiers); + } + + @Override + public String toString() { + return "IndexMetaDataGenerations{lookup:" + lookup + "}{identifier:" + identifiers + "}"; + } + + /** + * Compute identifier for {@link IndexMetadata} from its index- and history-uuid as well as its settings-, mapping- and alias-version. + * If an index did not see a change in its settings, mappings or aliases between two points in time then the identifier will not change + * between them either. + * + * @param indexMetaData IndexMetaData + * @return identifier string + */ + public static String buildUniqueIdentifier(IndexMetadata indexMetaData) { + return indexMetaData.getIndexUUID() + + "-" + indexMetaData.getSettings().get(IndexMetadata.SETTING_HISTORY_UUID, IndexMetadata.INDEX_UUID_NA_VALUE) + + "-" + indexMetaData.getSettingsVersion() + "-" + indexMetaData.getMappingVersion() + + "-" + indexMetaData.getAliasesVersion(); + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 1fb3bca5fc696..727ee7eef104b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -102,11 +102,12 @@ default Repository create(RepositoryMetadata metadata, Function snapshotVersions; + /** + * Index metadata generations. + */ + private final IndexMetaDataGenerations indexMetaDataGenerations; + /** * Shard generations. */ @@ -100,7 +106,7 @@ public final class RepositoryData { public RepositoryData(long genId, Map snapshotIds, Map snapshotStates, Map snapshotVersions, Map> indexSnapshots, - ShardGenerations shardGenerations) { + ShardGenerations shardGenerations, IndexMetaDataGenerations indexMetaDataGenerations) { this.genId = genId; this.snapshotIds = Collections.unmodifiableMap(snapshotIds); this.snapshotStates = Collections.unmodifiableMap(snapshotStates); @@ -108,6 +114,7 @@ public RepositoryData(long genId, Map snapshotIds, Map snapshotIds, Map versions) { } final Map newVersions = new HashMap<>(snapshotVersions); versions.forEach((id, version) -> newVersions.put(id.getUUID(), version)); - return new RepositoryData(genId, snapshotIds, snapshotStates, newVersions, indexSnapshots, shardGenerations); + return new RepositoryData( + genId, snapshotIds, snapshotStates, newVersions, indexSnapshots, shardGenerations, indexMetaDataGenerations); } public ShardGenerations shardGenerations() { @@ -198,6 +207,32 @@ public List indicesToUpdateAfterRemovingSnapshot(Collection }).map(Map.Entry::getKey).collect(Collectors.toList()); } + /** + * Returns a map of {@link IndexId} to a collection of {@link String} containing all the {@link IndexId} and the + * {@link org.elasticsearch.cluster.metadata.IndexMetadata} blob name in it that can be removed after removing the given snapshot from + * the repository. + * NOTE: Does not return a mapping for {@link IndexId} values that will be removed completely from the repository. + * + * @param snapshotIds SnapshotIds to remove + * @return map of index to index metadata blob id to delete + */ + public Map> indexMetaDataToRemoveAfterRemovingSnapshots(Collection snapshotIds) { + Collection indicesForSnapshot = indicesToUpdateAfterRemovingSnapshot(snapshotIds); + final Set allRemainingIdentifiers = indexMetaDataGenerations.lookup.entrySet().stream() + .filter(e -> snapshotIds.contains(e.getKey()) == false).flatMap(e -> e.getValue().values().stream()) + .map(indexMetaDataGenerations::getIndexMetaBlobId).collect(Collectors.toSet()); + final Map> toRemove = new HashMap<>(); + for (IndexId indexId : indicesForSnapshot) { + for (SnapshotId snapshotId : snapshotIds) { + final String identifier = indexMetaDataGenerations.indexMetaBlobId(snapshotId, indexId); + if (allRemainingIdentifiers.contains(identifier) == false) { + toRemove.computeIfAbsent(indexId, k -> new HashSet<>()).add(identifier); + } + } + } + return toRemove; + } + /** * Add a snapshot and its indices to the repository; returns a new instance. If the snapshot * already exists in the repository data, this method throws an IllegalArgumentException. @@ -206,11 +241,16 @@ public List indicesToUpdateAfterRemovingSnapshot(Collection * @param snapshotState State of the new snapshot * @param shardGenerations Updated shard generations in the new snapshot. For each index contained in the snapshot an array of new * generations indexed by the shard id they correspond to must be supplied. + * @param indexMetaBlobs Map of index metadata blob uuids + * @param newIdentifiers Map of new index metadata blob uuids keyed by the identifiers of the + * {@link org.elasticsearch.cluster.metadata.IndexMetadata} in them */ public RepositoryData addSnapshot(final SnapshotId snapshotId, final SnapshotState snapshotState, final Version version, - final ShardGenerations shardGenerations) { + final ShardGenerations shardGenerations, + @Nullable final Map indexMetaBlobs, + @Nullable final Map newIdentifiers) { if (snapshotIds.containsKey(snapshotId.getUUID())) { // if the snapshot id already exists in the repository data, it means an old master // that is blocked from the cluster is trying to finalize a snapshot concurrently with @@ -235,8 +275,23 @@ public RepositoryData addSnapshot(final SnapshotId snapshotId, allIndexSnapshots.put(indexId, Collections.unmodifiableList(copy)); } } + + final IndexMetaDataGenerations newIndexMetaGenerations; + if (indexMetaBlobs == null) { + assert newIdentifiers == null : "Non-null new identifiers [" + newIdentifiers + "] for null lookup"; + assert indexMetaDataGenerations.lookup.isEmpty() : + "Index meta generations should have been empty but was [" + indexMetaDataGenerations + "]"; + newIndexMetaGenerations = IndexMetaDataGenerations.EMPTY; + } else { + assert indexMetaBlobs.isEmpty() || shardGenerations.indices().equals(indexMetaBlobs.keySet()) : + "Shard generations contained indices " + shardGenerations.indices() + + " but indexMetaData was given for " + indexMetaBlobs.keySet(); + newIndexMetaGenerations = indexMetaDataGenerations.withAddedSnapshot(snapshotId, indexMetaBlobs, newIdentifiers); + } + return new RepositoryData(genId, snapshots, newSnapshotStates, newSnapshotVersions, allIndexSnapshots, - ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build()); + ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build(), + newIndexMetaGenerations); } /** @@ -249,7 +304,8 @@ public RepositoryData withGenId(long newGeneration) { if (newGeneration == genId) { return this; } - return new RepositoryData(newGeneration, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations); + return new RepositoryData( + newGeneration, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations, indexMetaDataGenerations); } /** @@ -291,7 +347,8 @@ public RepositoryData removeSnapshots(final Collection snapshots, fi return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, newSnapshotVersions, indexSnapshots, ShardGenerations.builder().putAll(shardGenerations).putAll(updatedShardGenerations) - .retainIndicesAndPruneDeletes(indexSnapshots.keySet()).build() + .retainIndicesAndPruneDeletes(indexSnapshots.keySet()).build(), + indexMetaDataGenerations.withRemovedSnapshots(snapshots) ); } @@ -320,12 +377,14 @@ public boolean equals(Object obj) { && snapshotVersions.equals(that.snapshotVersions) && indices.equals(that.indices) && indexSnapshots.equals(that.indexSnapshots) - && shardGenerations.equals(that.shardGenerations); + && shardGenerations.equals(that.shardGenerations) + && indexMetaDataGenerations.equals(that.indexMetaDataGenerations); } @Override public int hashCode() { - return Objects.hash(snapshotIds, snapshotStates, snapshotVersions, indices, indexSnapshots, shardGenerations); + return Objects.hash( + snapshotIds, snapshotStates, snapshotVersions, indices, indexSnapshots, shardGenerations, indexMetaDataGenerations); } /** @@ -366,6 +425,8 @@ public List resolveNewIndices(final List indicesToResolve) { } private static final String SHARD_GENERATIONS = "shard_generations"; + private static final String INDEX_METADATA_IDENTIFIERS = "index_metadata_identifiers"; + private static final String INDEX_METADATA_LOOKUP = "index_metadata_lookup"; private static final String SNAPSHOTS = "snapshots"; private static final String INDICES = "indices"; private static final String INDEX_ID = "id"; @@ -378,10 +439,12 @@ public List resolveNewIndices(final List indicesToResolve) { /** * Writes the snapshots metadata and the related indices metadata to x-content. */ - public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final boolean shouldWriteShardGens) throws IOException { + public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final Version repoMetaVersion) throws IOException { builder.startObject(); // write the snapshots list builder.startArray(SNAPSHOTS); + final boolean shouldWriteIndexGens = SnapshotsService.useIndexGenerations(repoMetaVersion); + final boolean shouldWriteShardGens = SnapshotsService.useShardGenerations(repoMetaVersion); for (final SnapshotId snapshot : getSnapshotIds()) { builder.startObject(); builder.field(NAME, snapshot.getName()); @@ -389,6 +452,10 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final if (snapshotStates.containsKey(snapshot.getUUID())) { builder.field(STATE, snapshotStates.get(snapshot.getUUID()).value()); } + if (shouldWriteIndexGens) { + builder.field(INDEX_METADATA_LOOKUP, indexMetaDataGenerations.lookup.getOrDefault(snapshot, Collections.emptyMap()) + .entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey().getId(), Map.Entry::getValue))); + } if (snapshotVersions.containsKey(snapshot.getUUID())) { builder.field(VERSION, snapshotVersions.get(snapshot.getUUID()).toString()); } @@ -417,7 +484,10 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final builder.endObject(); } builder.endObject(); - if (shouldWriteShardGens) { + if (shouldWriteIndexGens) { + builder.field(MIN_VERSION, SnapshotsService.INDEX_GEN_IN_REPO_DATA_VERSION.toString()); + builder.field(INDEX_METADATA_IDENTIFIERS, indexMetaDataGenerations.identifiers); + } else if (shouldWriteShardGens) { // Add min version field to make it impossible for older ES versions to deserialize this object builder.field(MIN_VERSION, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.toString()); } @@ -425,6 +495,10 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final return builder; } + public IndexMetaDataGenerations indexMetaDataGenerations() { + return indexMetaDataGenerations; + } + /** * Reads an instance of {@link RepositoryData} from x-content, loading the snapshots and indices metadata. * @@ -438,6 +512,8 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g final Map snapshotVersions = new HashMap<>(); final Map> indexSnapshots = new HashMap<>(); final ShardGenerations.Builder shardGenerations = ShardGenerations.builder(); + final Map indexMetaIdentifiers = new HashMap<>(); + final Map> indexMetaLookup = new HashMap<>(); if (parser.nextToken() == XContentParser.Token.START_OBJECT) { while (parser.nextToken() == XContentParser.Token.FIELD_NAME) { @@ -448,6 +524,7 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g String name = null; String uuid = null; SnapshotState state = null; + Map metaGenerations = new HashMap<>(); Version version = null; while (parser.nextToken() != XContentParser.Token.END_OBJECT) { String currentFieldName = parser.currentName(); @@ -458,6 +535,8 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g uuid = parser.text(); } else if (STATE.equals(currentFieldName)) { state = SnapshotState.fromValue(parser.numberValue().byteValue()); + } else if (INDEX_METADATA_LOOKUP.equals(currentFieldName)) { + metaGenerations.putAll(parser.mapStrings()); } else if (VERSION.equals(currentFieldName)) { version = Version.fromString(parser.text()); } @@ -470,6 +549,9 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g snapshotVersions.put(uuid, version); } snapshots.put(snapshotId.getUUID(), snapshotId); + if (metaGenerations.isEmpty() == false) { + indexMetaLookup.put(snapshotId, metaGenerations); + } } } else { throw new ElasticsearchParseException("expected array for [" + field + "]"); @@ -545,6 +627,11 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g } } } + } else if (INDEX_METADATA_IDENTIFIERS.equals(field)) { + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new ElasticsearchParseException("start object expected [" + INDEX_METADATA_IDENTIFIERS + "]"); + } + indexMetaIdentifiers.putAll(parser.mapStrings()); } else if (MIN_VERSION.equals(field)) { if (parser.nextToken() != XContentParser.Token.VALUE_STRING) { throw new ElasticsearchParseException("version string expected [min_version]"); @@ -558,7 +645,12 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g } else { throw new ElasticsearchParseException("start object expected"); } - return new RepositoryData(genId, snapshots, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations.build()); + final Map indexLookup = + indexSnapshots.keySet().stream().collect(Collectors.toMap(IndexId::getId, Function.identity())); + return new RepositoryData(genId, snapshots, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations.build(), + new IndexMetaDataGenerations(indexMetaLookup.entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, e -> e.getValue().entrySet().stream() + .collect(Collectors.toMap(entry -> indexLookup.get(entry.getKey()), Map.Entry::getValue)))), indexMetaIdentifiers)); } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 89ccf6824d47f..97ecfaf3a3529 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -79,6 +79,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -100,6 +101,7 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.IndexMetaDataGenerations; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryCleanupResult; import org.elasticsearch.repositories.RepositoryData; @@ -579,7 +581,7 @@ protected void doRun() throws Exception { // delete an index that was created by another master node after writing this index-N blob. final Map foundIndices = blobStore().blobContainer(indicesPath()).children(); doDeleteShardSnapshots(snapshotIds, repositoryStateId, foundIndices, rootBlobs, repositoryData, - SnapshotsService.useShardGenerations(repositoryMetaVersion), listener); + repositoryMetaVersion, listener); } @Override @@ -639,10 +641,10 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map snapshotIds, long repositoryStateId, Map foundIndices, - Map rootBlobs, RepositoryData repositoryData, boolean writeShardGens, + Map rootBlobs, RepositoryData repositoryData, Version repoMetaVersion, ActionListener listener) { - if (writeShardGens) { + if (SnapshotsService.useShardGenerations(repoMetaVersion)) { // First write the new shard state metadata (with the removed snapshot) and compute deletion targets final StepListener> writeShardMetaDataAndComputeDeletesStep = new StepListener<>(); writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, true, writeShardMetaDataAndComputeDeletesStep); @@ -660,7 +662,7 @@ private void doDeleteShardSnapshots(Collection snapshotIds, long rep builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration); } final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, builder.build()); - writeIndexGen(updatedRepoData, repositoryStateId, true, Function.identity(), + writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(), ActionListener.wrap(v -> writeUpdatedRepoDataStep.onResponse(updatedRepoData), listener::onFailure)); }, listener::onFailure); // Once we have updated the repository, run the clean-ups @@ -669,12 +671,13 @@ private void doDeleteShardSnapshots(Collection snapshotIds, long rep final ActionListener afterCleanupsListener = new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2); asyncCleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener); - asyncCleanupUnlinkedShardLevelBlobs(snapshotIds, writeShardMetaDataAndComputeDeletesStep.result(), afterCleanupsListener); + asyncCleanupUnlinkedShardLevelBlobs(repositoryData, snapshotIds, writeShardMetaDataAndComputeDeletesStep.result(), + afterCleanupsListener); }, listener::onFailure); } else { // Write the new repository data first (with the removed snapshot), using no shard generations final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY); - writeIndexGen(updatedRepoData, repositoryStateId, false, Function.identity(), ActionListener.wrap(v -> { + writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(), ActionListener.wrap(v -> { // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion final ActionListener afterCleanupsListener = new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2); @@ -682,7 +685,7 @@ private void doDeleteShardSnapshots(Collection snapshotIds, long rep final StepListener> writeMetaAndComputeDeletesStep = new StepListener<>(); writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, false, writeMetaAndComputeDeletesStep); writeMetaAndComputeDeletesStep.whenComplete(deleteResults -> - asyncCleanupUnlinkedShardLevelBlobs(snapshotIds, deleteResults, afterCleanupsListener), + asyncCleanupUnlinkedShardLevelBlobs(repositoryData, snapshotIds, deleteResults, afterCleanupsListener), afterCleanupsListener::onFailure); }, listener::onFailure)); } @@ -696,14 +699,14 @@ private void asyncCleanupUnlinkedRootAndIndicesBlobs(Collection dele l -> cleanupStaleBlobs(deletedSnapshots, foundIndices, rootBlobs, updatedRepoData, ActionListener.map(l, ignored -> null)))); } - private void asyncCleanupUnlinkedShardLevelBlobs(Collection snapshotIds, + private void asyncCleanupUnlinkedShardLevelBlobs(RepositoryData oldRepositoryData, Collection snapshotIds, Collection deleteResults, ActionListener listener) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap( listener, l -> { try { - deleteFromContainer(blobContainer(), resolveFilesToDelete(snapshotIds, deleteResults)); + deleteFromContainer(blobContainer(), resolveFilesToDelete(oldRepositoryData, snapshotIds, deleteResults)); l.onResponse(null); } catch (Exception e) { logger.warn( @@ -735,14 +738,18 @@ private void writeUpdatedShardMetaDataAndComputeDeletes(Collection s final Set survivingSnapshots = oldRepositoryData.getSnapshots(indexId).stream() .filter(id -> snapshotIds.contains(id) == false).collect(Collectors.toSet()); final StepListener> shardCountListener = new StepListener<>(); - final ActionListener allShardCountsListener = new GroupedActionListener<>(shardCountListener, snapshotIds.size()); - for (SnapshotId snapshotId : snapshotIds) { + final Collection indexMetaGenerations = snapshotIds.stream().map( + id -> oldRepositoryData.indexMetaDataGenerations().indexMetaBlobId(id, indexId)).collect(Collectors.toSet()); + final ActionListener allShardCountsListener = + new GroupedActionListener<>(shardCountListener, indexMetaGenerations.size()); + final BlobContainer indexContainer = indexContainer(indexId); + for (String indexMetaGeneration : indexMetaGenerations) { executor.execute(ActionRunnable.supply(allShardCountsListener, () -> { try { - return getSnapshotIndexMetadata(snapshotId, indexId).getNumberOfShards(); + return indexMetadataFormat.read(indexContainer, indexMetaGeneration).getNumberOfShards(); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage( - "[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex); + "[{}] [{}] failed to read metadata for index", indexMetaGeneration, indexId.getName()), ex); // Just invoke the listener without any shard generations to count it down, this index will be cleaned up // by the stale data cleanup in the end. // TODO: Getting here means repository corruption. We should find a way of dealing with this instead of just @@ -797,20 +804,22 @@ public void onFailure(Exception ex) { } } - private List resolveFilesToDelete(Collection snapshotIds, + private List resolveFilesToDelete(RepositoryData oldRepositoryData, Collection snapshotIds, Collection deleteResults) { final String basePath = basePath().buildAsString(); final int basePathLen = basePath.length(); + final Map> indexMetaGenerations = + oldRepositoryData.indexMetaDataToRemoveAfterRemovingSnapshots(snapshotIds); return Stream.concat( - deleteResults.stream().flatMap(shardResult -> { - final String shardPath = - shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString(); - return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob); - }), - deleteResults.stream().map(shardResult -> shardResult.indexId).distinct().flatMap(indexId -> { - final String indexContainerPath = indexContainer(indexId).path().buildAsString(); - return snapshotIds.stream().map(snapshotId -> indexContainerPath + globalMetadataFormat.blobName(snapshotId.getUUID())); - }) + deleteResults.stream().flatMap(shardResult -> { + final String shardPath = + shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString(); + return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob); + }), + indexMetaGenerations.entrySet().stream().flatMap(entry -> { + final String indexContainerPath = indexContainer(entry.getKey()).path().buildAsString(); + return entry.getValue().stream().map(id -> indexContainerPath + indexMetadataFormat.blobName(id)); + }) ).map(absolutePath -> { assert absolutePath.startsWith(basePath); return absolutePath.substring(basePathLen); @@ -854,6 +863,7 @@ private void cleanupStaleBlobs(Collection deletedSnapshots, Map *
  • Deleting stale indices {@link #cleanupStaleIndices}
  • *
  • Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}
  • @@ -878,7 +888,7 @@ public void cleanup(long repositoryStateId, Version repositoryMetaVersion, Actio listener.onResponse(new RepositoryCleanupResult(DeleteResult.ZERO)); } else { // write new index-N blob to ensure concurrent operations will fail - writeIndexGen(repositoryData, repositoryStateId, SnapshotsService.useShardGenerations(repositoryMetaVersion), + writeIndexGen(repositoryData, repositoryStateId, repositoryMetaVersion, Function.identity(), ActionListener.wrap(v -> cleanupStaleBlobs(Collections.emptyList(), foundIndices, rootBlobs, repositoryData, ActionListener.map(listener, RepositoryCleanupResult::new)), listener::onFailure)); } @@ -1002,49 +1012,82 @@ public void finalizeSnapshot(final SnapshotId snapshotId, final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion); final Consumer onUpdateFailure = e -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e)); - final ActionListener allMetaListener = new GroupedActionListener<>( - ActionListener.wrap(snapshotInfos -> { - assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos; - final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next(); - getRepositoryData(ActionListener.wrap(existingRepositoryData -> { - final RepositoryData updatedRepositoryData = - existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations); - writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, stateTransformer, - ActionListener.wrap(writtenRepoData -> { - if (writeShardGens) { - cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); - } - listener.onResponse(new Tuple<>(writtenRepoData, snapshotInfo)); - }, onUpdateFailure)); - }, onUpdateFailure)); - }, onUpdateFailure), 2 + indices.size()); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will - // mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the - // index or global metadata will be compatible with the segments written in this snapshot as well. - // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way - // that decrements the generation it points at + final boolean writeIndexGens = SnapshotsService.useIndexGenerations(repositoryMetaVersion); - // Write Global Metadata - executor.execute(ActionRunnable.run(allMetaListener, - () -> globalMetadataFormat.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), false))); + final StepListener repoDataListener = new StepListener<>(); + executor.execute(ActionRunnable.wrap(repoDataListener, this::getRepositoryData)); + repoDataListener.whenComplete(existingRepositoryData -> { - // write the index metadata for each index in the snapshot - for (IndexId index : indices) { - executor.execute(ActionRunnable.run(allMetaListener, () -> - indexMetadataFormat.write(clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false))); - } + final Map indexMetas; + final Map indexMetaIdentifiers; + if (writeIndexGens) { + indexMetaIdentifiers = ConcurrentCollections.newConcurrentMap(); + indexMetas = ConcurrentCollections.newConcurrentMap(); + } else { + indexMetas = null; + indexMetaIdentifiers = null; + } + + final ActionListener allMetaListener = new GroupedActionListener<>( + ActionListener.wrap(snapshotInfos -> { + assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos; + final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next(); + final RepositoryData updatedRepositoryData = existingRepositoryData.addSnapshot( + snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations, indexMetas, indexMetaIdentifiers); + writeIndexGen(updatedRepositoryData, repositoryStateId, repositoryMetaVersion, stateTransformer, + ActionListener.wrap( + newRepoData -> { + if (writeShardGens) { + cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); + } + listener.onResponse(new Tuple<>(newRepoData, snapshotInfo)); + }, onUpdateFailure)); + }, onUpdateFailure), 2 + indices.size()); + + // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will + // mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the + // index or global metadata will be compatible with the segments written in this snapshot as well. + // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way + // that decrements the generation it points at + + // Write Global MetaData + executor.execute(ActionRunnable.run(allMetaListener, + () -> globalMetadataFormat.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), false))); - executor.execute(ActionRunnable.supply(allMetaListener, () -> { - final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId, - indices.stream().map(IndexId::getName).collect(Collectors.toList()), + // write the index metadata for each index in the snapshot + for (IndexId index : indices) { + executor.execute(ActionRunnable.run(allMetaListener, () -> { + final IndexMetadata indexMetaData = clusterMetadata.index(index.getName()); + if (writeIndexGens) { + final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData); + String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers); + if (metaUUID == null) { + // We don't yet have this version of the metadata so we write it + metaUUID = UUIDs.base64UUID(); + indexMetadataFormat.write(indexMetaData, indexContainer(index), metaUUID, false); + indexMetaIdentifiers.put(identifiers, metaUUID); + } + indexMetas.put(index, identifiers); + } else { + indexMetadataFormat.write( + clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false); + } + } + )); + } + executor.execute(ActionRunnable.supply(allMetaListener, () -> { + final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId, + indices.stream().map(IndexId::getName).collect(Collectors.toList()), new ArrayList<>(clusterMetadata.dataStreams().keySet()), - startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures, - includeGlobalState, userMetadata); - snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false); - return snapshotInfo; - })); + startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures, + includeGlobalState, userMetadata); + snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false); + return snapshotInfo; + })); + }, onUpdateFailure); } // Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data @@ -1084,9 +1127,10 @@ public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) { } @Override - public IndexMetadata getSnapshotIndexMetadata(final SnapshotId snapshotId, final IndexId index) throws IOException { + public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException { try { - return indexMetadataFormat.read(indexContainer(index), snapshotId.getUUID()); + return indexMetadataFormat.read(indexContainer(index), + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index)); } catch (NoSuchFileException e) { throw new SnapshotMissingException(metadata.name(), snapshotId, e); } @@ -1245,7 +1289,7 @@ private void doGetRepositoryData(ActionListener listener) { loaded = getRepositoryData(genToLoad); // We can cache in the most recent version here without regard to the actual repository metadata version since we're // only caching the information that we just wrote and thus won't accidentally cache any information that isn't safe - cacheRepositoryData(loaded, true); + cacheRepositoryData(loaded, Version.CURRENT); } listener.onResponse(loaded); return; @@ -1280,17 +1324,17 @@ private void doGetRepositoryData(ActionListener listener) { * modification can lead to moving from a higher {@code N} to a lower {@code N} value which mean we can't safely assume that a given * generation will always contain the same {@link RepositoryData}. * - * @param updated RepositoryData to cache if newer than the cache contents - * @param writeShardGens whether to cache shard generation values + * @param updated RepositoryData to cache if newer than the cache contents + * @param version version of the repository metadata that was cached */ - private void cacheRepositoryData(RepositoryData updated, boolean writeShardGens) { + private void cacheRepositoryData(RepositoryData updated, Version version) { if (cacheRepositoryData && bestEffortConsistency == false) { final BytesReference serialized; BytesStreamOutput out = new BytesStreamOutput(); try { try (StreamOutput tmp = CompressorFactory.COMPRESSOR.streamOutput(out); XContentBuilder builder = XContentFactory.jsonBuilder(tmp)) { - updated.snapshotsToXContent(builder, writeShardGens); + updated.snapshotsToXContent(builder, version); } serialized = out.bytes(); final int len = serialized.length(); @@ -1423,11 +1467,11 @@ public boolean isReadOnly() { * * @param repositoryData RepositoryData to write * @param expectedGen expected repository generation at the start of the operation - * @param writeShardGens whether to write {@link ShardGenerations} to the new {@link RepositoryData} blob + * @param version version of the repository metadata to write * @param stateFilter filter for the last cluster state update executed by this method * @param listener completion listener */ - protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens, + protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, Version version, Function stateFilter, ActionListener listener) { assert isReadOnly() == false; // can not write to a read only repository final long currentGen = repositoryData.getGenId(); @@ -1538,7 +1582,8 @@ public void onFailure(Exception e) { final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen); logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob); writeAtomic(indexBlob, - BytesReference.bytes(filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true); + BytesReference.bytes( + filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), version)), true); // write the current generation to the index-latest file final BytesReference genBytes; try (BytesStreamOutput bStream = new BytesStreamOutput()) { @@ -1579,7 +1624,7 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { final RepositoryData writtenRepositoryData = filteredRepositoryData.withGenId(newGen); - cacheRepositoryData(writtenRepositoryData, writeShardGens); + cacheRepositoryData(writtenRepositoryData, version); threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> { // Delete all now outdated index files up to 1000 blobs back from the new generation. // If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them. diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 9a55a515ad488..8b8e7c0e5d125 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -246,7 +246,7 @@ public void restoreSnapshot(final RestoreSnapshotRequest request, final ActionLi final List indexIdsInSnapshot = repositoryData.resolveIndices(indicesInSnapshot); for (IndexId indexId : indexIdsInSnapshot) { - metadataBuilder.put(repository.getSnapshotIndexMetadata(snapshotId, indexId), false); + metadataBuilder.put(repository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId), false); } final Metadata metadata = metadataBuilder.build(); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 7959901b0894a..00029f8ef33a2 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -122,6 +122,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus public static final Version SHARD_GEN_IN_REPO_DATA_VERSION = Version.V_7_6_0; + public static final Version INDEX_GEN_IN_REPO_DATA_VERSION = Version.V_7_9_0; + public static final Version OLD_SNAPSHOT_FORMAT = Version.V_7_5_0; public static final Version MULTI_DELETE_VERSION = Version.V_7_8_0; @@ -1505,7 +1507,16 @@ public static boolean useShardGenerations(Version repositoryMetaVersion) { } /** - * Deletes snapshot from repository + * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository. + * + * @param repositoryMetaVersion version to check + * @return true if version supports {@link ShardGenerations} + */ + public static boolean useIndexGenerations(Version repositoryMetaVersion) { + return repositoryMetaVersion.onOrAfter(INDEX_GEN_IN_REPO_DATA_VERSION); + } + + /** Deletes snapshot from repository * * @param repoName repository name * @param snapshotIds snapshot ids diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index e6bab4428d74c..38afb94f81524 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -49,7 +49,6 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -151,7 +150,7 @@ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) { } @Override - public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException { + public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) { return null; } diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java index 33c67f77ebcbb..584956bbffcdf 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -41,6 +42,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -74,7 +77,7 @@ public void testIndicesToUpdateAfterRemovingSnapshot() { public void testXContent() throws IOException { RepositoryData repositoryData = generateRandomRepoData(); XContentBuilder builder = JsonXContent.contentBuilder(); - repositoryData.snapshotsToXContent(builder, true); + repositoryData.snapshotsToXContent(builder, Version.CURRENT); try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { long gen = (long) randomIntBetween(0, 500); RepositoryData fromXContent = RepositoryData.snapshotsFromXContent(parser, gen, randomBoolean()); @@ -106,9 +109,14 @@ public void testAddSnapshots() { indices.add(indexId); builder.put(indexId, 0, UUIDs.randomBase64UUID(random())); } + final ShardGenerations shardGenerations = builder.build(); + final Map indexLookup = + shardGenerations.indices().stream().collect(Collectors.toMap(Function.identity(), ind -> randomAlphaOfLength(256))); RepositoryData newRepoData = repositoryData.addSnapshot(newSnapshot, randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), - randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), builder.build()); + randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), + shardGenerations, indexLookup, + indexLookup.values().stream().collect(Collectors.toMap(Function.identity(), ignored -> UUIDs.randomBase64UUID(random())))); // verify that the new repository data has the new snapshot and its indices assertTrue(newRepoData.getSnapshotIds().contains(newSnapshot)); for (IndexId indexId : indices) { @@ -132,12 +140,12 @@ public void testInitIndices() { snapshotStates.put(snapshotId.getUUID(), randomFrom(SnapshotState.values())); snapshotVersions.put(snapshotId.getUUID(), randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion())); } - RepositoryData repositoryData = new RepositoryData(EMPTY_REPO_GEN, snapshotIds, - Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY); + RepositoryData repositoryData = new RepositoryData(EMPTY_REPO_GEN, snapshotIds, Collections.emptyMap(), Collections.emptyMap(), + Collections.emptyMap(), ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY); // test that initializing indices works Map> indices = randomIndices(snapshotIds); - RepositoryData newRepoData = - new RepositoryData(repositoryData.getGenId(), snapshotIds, snapshotStates, snapshotVersions, indices, ShardGenerations.EMPTY); + RepositoryData newRepoData = new RepositoryData(repositoryData.getGenId(), snapshotIds, snapshotStates, snapshotVersions, indices, + ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY); List expected = new ArrayList<>(repositoryData.getSnapshotIds()); Collections.sort(expected); List actual = new ArrayList<>(newRepoData.getSnapshotIds()); @@ -153,7 +161,8 @@ public void testRemoveSnapshot() { List snapshotIds = new ArrayList<>(repositoryData.getSnapshotIds()); assertThat(snapshotIds.size(), greaterThan(0)); SnapshotId removedSnapshotId = snapshotIds.remove(randomIntBetween(0, snapshotIds.size() - 1)); - RepositoryData newRepositoryData = repositoryData.removeSnapshots(Collections.singleton(removedSnapshotId), ShardGenerations.EMPTY); + RepositoryData newRepositoryData = + repositoryData.removeSnapshots(Collections.singleton(removedSnapshotId), ShardGenerations.EMPTY); // make sure the repository data's indices no longer contain the removed snapshot for (final IndexId indexId : newRepositoryData.getIndices().values()) { assertFalse(newRepositoryData.getSnapshots(indexId).contains(removedSnapshotId)); @@ -173,8 +182,9 @@ public void testResolveIndexId() { public void testGetSnapshotState() { final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()); final SnapshotState state = randomFrom(SnapshotState.values()); - final RepositoryData repositoryData = RepositoryData.EMPTY.addSnapshot(snapshotId, state, - randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), ShardGenerations.EMPTY); + final RepositoryData repositoryData = + RepositoryData.EMPTY.addSnapshot(snapshotId, state, randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), + ShardGenerations.EMPTY, Collections.emptyMap(), Collections.emptyMap()); assertEquals(state, repositoryData.getSnapshotState(snapshotId)); assertNull(repositoryData.getSnapshotState(new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()))); } @@ -184,7 +194,7 @@ public void testIndexThatReferencesAnUnknownSnapshot() throws IOException { final RepositoryData repositoryData = generateRandomRepoData(); XContentBuilder builder = XContentBuilder.builder(xContent); - repositoryData.snapshotsToXContent(builder, true); + repositoryData.snapshotsToXContent(builder, Version.CURRENT); RepositoryData parsedRepositoryData; try (XContentParser xParser = createParser(builder)) { parsedRepositoryData = RepositoryData.snapshotsFromXContent(xParser, repositoryData.getGenId(), randomBoolean()); @@ -219,10 +229,10 @@ public void testIndexThatReferencesAnUnknownSnapshot() throws IOException { assertNotNull(corruptedIndexId); RepositoryData corruptedRepositoryData = new RepositoryData(parsedRepositoryData.getGenId(), snapshotIds, snapshotStates, - snapshotVersions, indexSnapshots, shardGenBuilder.build()); + snapshotVersions, indexSnapshots, shardGenBuilder.build(), IndexMetaDataGenerations.EMPTY); final XContentBuilder corruptedBuilder = XContentBuilder.builder(xContent); - corruptedRepositoryData.snapshotsToXContent(corruptedBuilder, true); + corruptedRepositoryData.snapshotsToXContent(corruptedBuilder, Version.CURRENT); try (XContentParser xParser = createParser(corruptedBuilder)) { ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> @@ -269,6 +279,57 @@ public void testIndexThatReferenceANullSnapshot() throws IOException { } } + // Test removing snapshot from random data where no two snapshots share any index metadata blobs + public void testIndexMetaDataToRemoveAfterRemovingSnapshotNoSharing() { + final RepositoryData repositoryData = generateRandomRepoData(); + final SnapshotId snapshotId = randomFrom(repositoryData.getSnapshotIds()); + final IndexMetaDataGenerations indexMetaDataGenerations = repositoryData.indexMetaDataGenerations(); + final Collection indicesToUpdate = repositoryData.indicesToUpdateAfterRemovingSnapshot(Collections.singleton(snapshotId)); + final Map> identifiersToRemove = indexMetaDataGenerations.lookup.get(snapshotId).entrySet().stream() + .filter(e -> indicesToUpdate.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, + e -> Collections.singleton(indexMetaDataGenerations.getIndexMetaBlobId(e.getValue())))); + assertEquals(repositoryData.indexMetaDataToRemoveAfterRemovingSnapshots(Collections.singleton(snapshotId)), identifiersToRemove); + } + + // Test removing snapshot from random data that has some or all index metadata shared + public void testIndexMetaDataToRemoveAfterRemovingSnapshotWithSharing() { + final RepositoryData repositoryData = generateRandomRepoData(); + final ShardGenerations.Builder builder = ShardGenerations.builder(); + final SnapshotId otherSnapshotId = randomFrom(repositoryData.getSnapshotIds()); + final Collection indicesInOther = repositoryData.getIndices().values() + .stream() + .filter(index -> repositoryData.getSnapshots(index).contains(otherSnapshotId)) + .collect(Collectors.toSet()); + for (IndexId indexId : indicesInOther) { + builder.put(indexId, 0, UUIDs.randomBase64UUID(random())); + } + final Map newIndices = new HashMap<>(); + final Map newIdentifiers = new HashMap<>(); + final Map> removeFromOther = new HashMap<>(); + for (IndexId indexId : randomSubsetOf(repositoryData.getIndices().values())) { + if (indicesInOther.contains(indexId)) { + removeFromOther.put(indexId, Collections.singleton( + repositoryData.indexMetaDataGenerations().indexMetaBlobId(otherSnapshotId, indexId))); + } + final String identifier = randomAlphaOfLength(20); + newIndices.put(indexId, identifier); + newIdentifiers.put(identifier, UUIDs.randomBase64UUID(random())); + builder.put(indexId, 0, UUIDs.randomBase64UUID(random())); + } + final ShardGenerations shardGenerations = builder.build(); + final Map indexLookup = new HashMap<>(repositoryData.indexMetaDataGenerations().lookup.get(otherSnapshotId)); + indexLookup.putAll(newIndices); + final SnapshotId newSnapshot = new SnapshotId(randomAlphaOfLength(7), UUIDs.randomBase64UUID(random())); + + RepositoryData newRepoData = + repositoryData.addSnapshot(newSnapshot, SnapshotState.SUCCESS, Version.CURRENT, shardGenerations, indexLookup, newIdentifiers); + assertEquals(newRepoData.indexMetaDataToRemoveAfterRemovingSnapshots(Collections.singleton(newSnapshot)), + newIndices.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, + e -> Collections.singleton(newIdentifiers.get(e.getValue()))))); + assertEquals(newRepoData.indexMetaDataToRemoveAfterRemovingSnapshots(Collections.singleton(otherSnapshotId)), removeFromOther); + } + public static RepositoryData generateRandomRepoData() { final int numIndices = randomIntBetween(1, 30); final List indices = new ArrayList<>(numIndices); @@ -288,8 +349,14 @@ public static RepositoryData generateRandomRepoData() { builder.put(someIndex, j, uuid); } } - repositoryData = repositoryData.addSnapshot(snapshotId, randomFrom(SnapshotState.values()), - randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), builder.build()); + final Map indexLookup = + someIndices.stream().collect(Collectors.toMap(Function.identity(), ind -> randomAlphaOfLength(256))); + repositoryData = repositoryData.addSnapshot( + snapshotId, randomFrom(SnapshotState.values()), + randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), + builder.build(), + indexLookup, + indexLookup.values().stream().collect(Collectors.toMap(Function.identity(), ignored -> UUIDs.randomBase64UUID(random())))); } return repositoryData; } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index ba142f6cafef2..c086bbb42e003 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -200,7 +200,7 @@ public void testRepositoryDataConcurrentModificationNotAllowed() { RepositoryData repositoryData = generateRandomRepoData(); final long startingGeneration = repositoryData.getGenId(); final PlainActionFuture future1 = PlainActionFuture.newFuture(); - repository.writeIndexGen(repositoryData, startingGeneration, true, Function.identity(),future1); + repository.writeIndexGen(repositoryData, startingGeneration, Version.CURRENT, Function.identity(),future1); // write repo data again to index generational file, errors because we already wrote to the // N+1 generation from which this repository data instance was created @@ -241,8 +241,8 @@ public void testFsRepositoryCompressDeprecated() { } private static void writeIndexGen(BlobStoreRepository repository, RepositoryData repositoryData, long generation) throws Exception { - PlainActionFuture.get(f -> repository.writeIndexGen(repositoryData, generation, true, - Function.identity(), f)); + PlainActionFuture.get( + f -> repository.writeIndexGen(repositoryData, generation, Version.CURRENT, Function.identity(), f)); } private BlobStoreRepository setupRepo() { @@ -273,8 +273,13 @@ private RepositoryData addRandomSnapshotsToRepoData(RepositoryData repoData, boo for (int j = 0; j < numIndices; j++) { builder.put(new IndexId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()), 0, "1"); } + final ShardGenerations shardGenerations = builder.build(); + final Map indexLookup = + shardGenerations.indices().stream().collect(Collectors.toMap(Function.identity(), ind -> randomAlphaOfLength(256))); repoData = repoData.addSnapshot(snapshotId, - randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), Version.CURRENT, builder.build()); + randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), Version.CURRENT, shardGenerations, + indexLookup, + indexLookup.values().stream().collect(Collectors.toMap(Function.identity(), ignored -> UUIDs.randomBase64UUID(random())))); } return repoData; } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index e002fc197b2aa..3e81f15bfd8d9 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -778,7 +778,7 @@ public void onFailure(Exception e) { assertThat(snapshotInfo.successfulShards(), either(is(indices + 1)).or(is(indices))); if (snapshotInfo.successfulShards() == indices + 1) { final IndexMetadata indexMetadata = - repository.getSnapshotIndexMetadata(snapshotInfo.snapshotId(), repositoryData.resolveIndexId(index)); + repository.getSnapshotIndexMetaData(repositoryData, snapshotInfo.snapshotId(), repositoryData.resolveIndexId(index)); // Make sure we snapshotted the metadata of this index and not the recreated version assertEquals(indexMetadata.getIndex(), firstIndex.get()); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 9c9b144e94cfd..a3d046efb400f 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -33,6 +33,7 @@ import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.IndexMetaDataGenerations; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardGenerations; @@ -40,7 +41,6 @@ import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotShardFailure; -import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -87,7 +87,7 @@ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) { } @Override - public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException { + public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) { return null; } @@ -95,7 +95,7 @@ public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId ind public void getRepositoryData(ActionListener listener) { final IndexId indexId = new IndexId(indexName, "blah"); listener.onResponse(new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), - Collections.singletonMap(indexId, emptyList()), ShardGenerations.EMPTY)); + Collections.singletonMap(indexId, emptyList()), ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY)); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index c3812602baf38..a8b664b345b56 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -61,6 +61,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -122,7 +123,7 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex LoggingDeprecationHandler.INSTANCE, blob)) { repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen, false); } - assertIndexUUIDs(blobContainer, repositoryData); + assertIndexUUIDs(repository, repositoryData); assertSnapshotUUIDs(repository, repositoryData); assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations()); return null; @@ -165,10 +166,10 @@ private static void assertShardIndexGenerations(BlobContainer repoRoot, ShardGen } } - private static void assertIndexUUIDs(BlobContainer repoRoot, RepositoryData repositoryData) throws IOException { + private static void assertIndexUUIDs(BlobStoreRepository repository, RepositoryData repositoryData) throws IOException { final List expectedIndexUUIDs = repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toList()); - final BlobContainer indicesContainer = repoRoot.children().get("indices"); + final BlobContainer indicesContainer = repository.blobContainer().children().get("indices"); final List foundIndexUUIDs; if (indicesContainer == null) { foundIndexUUIDs = Collections.emptyList(); @@ -178,6 +179,21 @@ private static void assertIndexUUIDs(BlobContainer repoRoot, RepositoryData repo s -> s.startsWith("extra") == false).collect(Collectors.toList()); } assertThat(foundIndexUUIDs, containsInAnyOrder(expectedIndexUUIDs.toArray(Strings.EMPTY_ARRAY))); + for (String indexId : foundIndexUUIDs) { + final Set indexMetaGenerationsFound = indicesContainer.children().get(indexId) + .listBlobsByPrefix(BlobStoreRepository.METADATA_PREFIX).keySet().stream() + .map(p -> p.replace(BlobStoreRepository.METADATA_PREFIX, "").replace(".dat", "")) + .collect(Collectors.toSet()); + final Set indexMetaGenerationsExpected = new HashSet<>(); + final IndexId idx = + repositoryData.getIndices().values().stream().filter(i -> i.getId().equals(indexId)).findFirst().get(); + for (SnapshotId snapshotId : repositoryData.getSnapshots(idx)) { + indexMetaGenerationsExpected.add(repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, idx)); + } + // TODO: assertEquals(indexMetaGenerationsExpected, indexMetaGenerationsFound); requires cleanup functionality for + // index meta generations blobs + assertTrue(indexMetaGenerationsFound.containsAll(indexMetaGenerationsExpected)); + } } private static void assertSnapshotUUIDs(BlobStoreRepository repository, RepositoryData repositoryData) throws IOException { @@ -208,8 +224,9 @@ private static void assertSnapshotUUIDs(BlobStoreRepository repository, Reposito assertThat(indices, hasKey(indexId.getId())); final BlobContainer indexContainer = indices.get(indexId.getId()); assertThat(indexContainer.listBlobs(), - hasKey(String.format(Locale.ROOT, BlobStoreRepository.METADATA_NAME_FORMAT, snapshotId.getUUID()))); - final IndexMetadata indexMetadata = repository.getSnapshotIndexMetadata(snapshotId, indexId); + hasKey(String.format(Locale.ROOT, BlobStoreRepository.METADATA_NAME_FORMAT, + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, indexId)))); + final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId); for (Map.Entry entry : indexContainer.children().entrySet()) { // Skip Lucene MockFS extraN directory if (entry.getKey().startsWith("extra")) { diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index d36c69d0c6b30..3268318f5f720 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -53,7 +53,6 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.test.VersionUtils; -import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import java.io.IOException; @@ -325,8 +324,7 @@ protected String initWithSnapshotVersion(String repoName, Path repoPath, Version logger.info("--> writing downgraded RepositoryData for repository metadata version [{}]", version); final RepositoryData repositoryData = getRepositoryData(repoName); final XContentBuilder jsonBuilder = JsonXContent.contentBuilder(); - final boolean writeShardGens = version.onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION); - repositoryData.snapshotsToXContent(jsonBuilder, writeShardGens); + repositoryData.snapshotsToXContent(jsonBuilder, version); final RepositoryData downgradedRepoData = RepositoryData.snapshotsFromXContent(JsonXContent.jsonXContent.createParser( NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, @@ -334,7 +332,7 @@ protected String initWithSnapshotVersion(String repoName, Path repoPath, Version repositoryData.getGenId(), randomBoolean()); Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData.getGenId()), BytesReference.toBytes(BytesReference.bytes( - downgradedRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens))), + downgradedRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), version))), StandardOpenOption.TRUNCATE_EXISTING); return oldVersionSnapshot; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 755683bf651d3..f21a432ce0706 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -61,6 +61,7 @@ import org.elasticsearch.indices.recovery.MultiFileWriter; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.IndexMetaDataGenerations; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardGenerations; @@ -193,7 +194,7 @@ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) { } @Override - public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException { + public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) { assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId"; String leaderIndex = index.getName(); Client remoteClient = getRemoteClusterClient(); @@ -256,7 +257,8 @@ public void getRepositoryData(ActionListener listener) { Index index = remoteIndices.get(indexName).getIndex(); indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singletonList(snapshotId)); } - return new RepositoryData(1, copiedSnapshotIds, snapshotStates, snapshotVersions, indexSnapshots, ShardGenerations.EMPTY); + return new RepositoryData(1, copiedSnapshotIds, snapshotStates, snapshotVersions, indexSnapshots, + ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY); }); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 91140d57f2eaf..43f7f97cd5715 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -237,7 +237,8 @@ public void testRestoreMinmal() throws IOException { ShardRoutingState.INITIALIZING, new RecoverySource.SnapshotRecoverySource( UUIDs.randomBase64UUID(), new Snapshot("src_only", snapshotId), Version.CURRENT, indexId)); - IndexMetadata metadata = runAsSnapshot(threadPool, () -> repository.getSnapshotIndexMetadata(snapshotId, indexId)); + IndexMetadata metadata = runAsSnapshot(threadPool, () -> + repository.getSnapshotIndexMetaData(PlainActionFuture.get(repository::getRepositoryData), snapshotId, indexId)); IndexShard restoredShard = newShard( shardRouting, metadata, null, SourceOnlySnapshotRepository.getEngineFactory(), () -> {}, RetentionLeaseSyncer.EMPTY); DiscoveryNode discoveryNode = new DiscoveryNode("node_g", buildNewFakeTransportAddress(), Version.CURRENT);