Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ public void testEnforcedCooldownPeriod() throws IOException {
final RepositoryData repositoryData = getRepositoryData(repository);
final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot,
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion()));
final BytesReference serialized =
BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), false));
final BytesReference serialized = BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(),
SnapshotsService.OLD_SNAPSHOT_FORMAT));
PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> {
try (InputStream stream = serialized.streamInput()) {
repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards);
}
} else {
if (SnapshotsService.useShardGenerations(minimumNodeVersion()) == false) {
if (SnapshotsService.useIndexGenerations(minimumNodeVersion()) == false) {
assertThat(TEST_STEP, is(TestStep.STEP3_OLD_CLUSTER));
final List<Class<? extends Exception>> expectedExceptions =
Arrays.asList(ResponseException.class, ElasticsearchStatusException.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.IndexMetaDataGenerations;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
Expand Down Expand Up @@ -273,11 +274,12 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception {
SnapshotId::getUUID, Function.identity())),
repositoryData.getSnapshotIds().stream().collect(Collectors.toMap(
SnapshotId::getUUID, repositoryData::getSnapshotState)),
Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY);

Files.write(repo.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + withoutVersions.getGenId()),
BytesReference.toBytes(BytesReference.bytes(withoutVersions.snapshotsToXContent(XContentFactory.jsonBuilder(),
true))), StandardOpenOption.TRUNCATE_EXISTING);
BytesReference.toBytes(BytesReference.bytes(
withoutVersions.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT))),
StandardOpenOption.TRUNCATE_EXISTING);

logger.info("--> verify that repo is assumed in old metadata format");
final SnapshotsService snapshotsService = internalCluster().getCurrentMasterNodeInstance(SnapshotsService.class);
Expand Down Expand Up @@ -403,11 +405,12 @@ public void testRepairBrokenShardGenerations() throws IOException {
Collectors.toMap(SnapshotId::getUUID, repositoryData1::getVersion)),
repositoryData1.getIndices().values().stream().collect(
Collectors.toMap(Function.identity(), repositoryData1::getSnapshots)
), ShardGenerations.builder().putAll(repositoryData1.shardGenerations()).put(indexId, 0, "0").build()
), ShardGenerations.builder().putAll(repositoryData1.shardGenerations()).put(indexId, 0, "0").build(),
repositoryData1.indexMetaDataGenerations()
);
Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData1.getGenId()),
BytesReference.toBytes(BytesReference.bytes(
brokenRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), true))),
brokenRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT))),
StandardOpenOption.TRUNCATE_EXISTING);

logger.info("--> recreating repository to clear caches");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.rest.AbstractRestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.admin.cluster.RestClusterStateAction;
import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction;
import org.elasticsearch.snapshots.mockstore.MockRepository;
Expand Down Expand Up @@ -996,6 +998,8 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException {

SnapshotStats stats = snapshots.get(0).getStats();

final List<Path> snapshot0IndexMetaFiles = findRepoMetaBlobs(repoPath);
assertThat(snapshot0IndexMetaFiles, hasSize(1)); // snapshotting a single index
assertThat(stats.getTotalFileCount(), greaterThanOrEqualTo(snapshot0FileCount));
assertThat(stats.getTotalSize(), greaterThanOrEqualTo(snapshot0FileSize));

Expand Down Expand Up @@ -1023,6 +1027,10 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException {
.get();

final List<Path> snapshot1Files = scanSnapshotFolder(repoPath);
final List<Path> snapshot1IndexMetaFiles = findRepoMetaBlobs(repoPath);

// The IndexMetadata did not change between snapshots, verify that no new redundant IndexMetaData was written to the repository
assertThat(snapshot1IndexMetaFiles, is(snapshot0IndexMetaFiles));

final int snapshot1FileCount = snapshot1Files.size();
final long snapshot1FileSize = calculateTotalFilesSize(snapshot1Files);
Expand All @@ -1047,6 +1055,65 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException {
assertThat(anotherStats.getTotalSize(), greaterThanOrEqualTo(snapshot1FileSize));
}

public void testDeduplicateIndexMetadata() throws Exception {
final String indexName = "test-blocks-1";
final String repositoryName = "repo-" + indexName;
final String snapshot0 = "snapshot-0";
final String snapshot1 = "snapshot-1";
final String snapshot2 = "snapshot-2";

createIndex(indexName);

int docs = between(10, 100);
for (int i = 0; i < docs; i++) {
client().prepareIndex(indexName, "_doc").setSource("test", "init").execute().actionGet();
}

final Path repoPath = randomRepoPath();
createRepository(repositoryName, "fs", repoPath);

logger.info("--> create a snapshot");
client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot0)
.setIncludeGlobalState(true)
.setWaitForCompletion(true)
.get();

final List<Path> snapshot0IndexMetaFiles = findRepoMetaBlobs(repoPath);
assertThat(snapshot0IndexMetaFiles, hasSize(1)); // snapshotting a single index

docs = between(1, 5);
for (int i = 0; i < docs; i++) {
client().prepareIndex(indexName, "_doc").setSource("test", "test" + i).execute().actionGet();
}

logger.info("--> restart random data node and add new data node to change index allocation");
internalCluster().restartRandomDataNode();
internalCluster().startDataOnlyNode();
ensureGreen(indexName);

assertThat(client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot1).setWaitForCompletion(true).get().status(),
equalTo(RestStatus.OK));

final List<Path> snapshot1IndexMetaFiles = findRepoMetaBlobs(repoPath);

// The IndexMetadata did not change between snapshots, verify that no new redundant IndexMetaData was written to the repository
assertThat(snapshot1IndexMetaFiles, is(snapshot0IndexMetaFiles));

// index to some other field to trigger a change in index metadata
for (int i = 0; i < docs; i++) {
client().prepareIndex(indexName, "_doc").setSource("new_field", "test" + i).execute().actionGet();
}
assertThat(client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot2).setWaitForCompletion(true).get().status(),
equalTo(RestStatus.OK));

final List<Path> snapshot2IndexMetaFiles = findRepoMetaBlobs(repoPath);
assertThat(snapshot2IndexMetaFiles, hasSize(2)); // should have created one new metadata blob

assertAcked(client().admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot0, snapshot1).get());
final List<Path> snapshot3IndexMetaFiles = findRepoMetaBlobs(repoPath);
assertThat(snapshot3IndexMetaFiles, hasSize(1)); // should have deleted the metadata blob referenced by the first two snapshots
}

public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception {
logger.info("--> starting a master node and two data nodes");
internalCluster().startMasterOnlyNode();
Expand Down Expand Up @@ -1256,6 +1323,22 @@ private long calculateTotalFilesSize(List<Path> files) {
}).sum();
}

private static List<Path> findRepoMetaBlobs(Path repoPath) throws IOException {
List<Path> files = new ArrayList<>();
Files.walkFileTree(repoPath.resolve("indices"), new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
final String fileName = file.getFileName().toString();
if (fileName.startsWith(BlobStoreRepository.METADATA_PREFIX) && fileName.endsWith(".dat")) {
files.add(file);
}
return super.visitFile(file, attrs);
}
}
);
return files;
}

private List<Path> scanSnapshotFolder(Path repoPath) throws IOException {
List<Path> files = new ArrayList<>();
Files.walkFileTree(repoPath, new SimpleFileVisitor<Path>(){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
Expand All @@ -34,6 +35,7 @@
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.mockstore.MockRepository;

Expand Down Expand Up @@ -198,9 +200,10 @@ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
}

@Override
public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId indexId) throws IOException {
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId,
IndexId indexId) throws IOException {
indicesMetadata.computeIfAbsent(key(snapshotId.getName(), indexId.getName()), (s) -> new AtomicInteger(0)).incrementAndGet();
return super.getSnapshotIndexMetadata(snapshotId, indexId);
return super.getSnapshotIndexMetaData(PlainActionFuture.get(this::getRepositoryData), snapshotId, indexId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2546,7 +2546,8 @@ public void testRestoreSnapshotWithCorruptedIndexMetadata() throws Exception {
final IndexId corruptedIndex = randomFrom(indexIds.values());
final Path indexMetadataPath = repo.resolve("indices")
.resolve(corruptedIndex.getId())
.resolve("meta-" + snapshotInfo.snapshotId().getUUID() + ".dat");
.resolve(
"meta-" + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotInfo.snapshotId(), corruptedIndex) + ".dat");

// Truncate the index metadata file
try(SeekableByteChannel outChan = Files.newByteChannel(indexMetadataPath, StandardOpenOption.WRITE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ private Map<ShardId, IndexShardSnapshotStatus> snapshotShards(final String repos
final Map<ShardId, IndexShardSnapshotStatus> shardStatus = new HashMap<>();
for (String index : snapshotInfo.indices()) {
IndexId indexId = repositoryData.resolveIndexId(index);
IndexMetadata indexMetadata = repository.getSnapshotIndexMetadata(snapshotInfo.snapshotId(), indexId);
IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(repositoryData, snapshotInfo.snapshotId(), indexId);
if (indexMetadata != null) {
int numberOfShards = indexMetadata.getNumberOfShards();
for (int i = 0; i < numberOfShards; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
}

@Override
public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException {
return in.getSnapshotIndexMetadata(snapshotId, index);
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException {
return in.getSnapshotIndexMetaData(repositoryData, snapshotId, index);
}

@Override
Expand Down
Loading