Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.ClusterSettings;
Expand All @@ -115,7 +117,11 @@
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.TestEnvironment;
Expand All @@ -140,8 +146,10 @@
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.node.ResponseCollectorService;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
Expand All @@ -159,8 +167,16 @@
import org.junit.After;
import org.junit.Before;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -206,8 +222,12 @@ public void createServices() {
}

@After
public void stopServices() {
testClusterNodes.nodes.values().forEach(TestClusterNode::stop);
public void verifyReposThenStopServices() throws IOException {
try {
assertNoStaleRepositoryData();
} finally {
testClusterNodes.nodes.values().forEach(TestClusterNode::stop);
}
}

public void testSuccessfulSnapshotAndRestore() {
Expand Down Expand Up @@ -364,7 +384,6 @@ public void testSnapshotWithNodeDisconnects() {
assertThat(snapshotIds, hasSize(1));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/41326")
public void testConcurrentSnapshotCreateAndDelete() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

Expand Down Expand Up @@ -414,7 +433,6 @@ public void testConcurrentSnapshotCreateAndDelete() {
* Simulates concurrent restarts of data and master nodes as well as relocating a primary shard, while starting and subsequently
* deleting a snapshot.
*/
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/41326")
public void testSnapshotPrimaryRelocations() {
final int masterNodeCount = randomFrom(1, 3, 5);
setupTestCluster(masterNodeCount, randomIntBetween(2, 10));
Expand Down Expand Up @@ -504,6 +522,109 @@ public void run() {
assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0)));
}

/**
* Assert that there are no unreferenced indices or unreferenced root-level metadata blobs in any repository.
* TODO: Expand the logic here to also check for unreferenced segment blobs and shard level metadata
*/
private void assertNoStaleRepositoryData() throws IOException {
final Path repoPath = tempDir.resolve("repo").toAbsolutePath();
final List<Path> repos;
try (Stream<Path> reposDir = repoFilesByPrefix(repoPath)) {
repos = reposDir.filter(s -> s.getFileName().toString().startsWith("extra") == false).collect(Collectors.toList());
}
for (Path repoRoot : repos) {
cleanupEmptyTrees(repoRoot);
final Path latestIndexGenBlob = repoRoot.resolve("index.latest");
assertTrue("Could not find index.latest blob for repo at [" + repoRoot + ']', Files.exists(latestIndexGenBlob));
final long latestGen = ByteBuffer.wrap(Files.readAllBytes(latestIndexGenBlob)).getLong(0);
assertIndexGenerations(repoRoot, latestGen);
final RepositoryData repositoryData;
try (XContentParser parser =
XContentHelper.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE,
new BytesArray(Files.readAllBytes(repoRoot.resolve("index-" + latestGen))), XContentType.JSON)) {
repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen);
}
assertIndexUUIDs(repoRoot, repositoryData);
assertSnapshotUUIDs(repoRoot, repositoryData);
}
}

// Lucene's mock file system randomly generates empty `extra0` files that break the deletion of blob-store directories.
// We clean those up here before checking a blob-store for stale files.
private void cleanupEmptyTrees(Path repoPath) {
try {
Files.walkFileTree(repoPath, new SimpleFileVisitor<Path>() {

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (file.getFileName().toString().startsWith("extra")) {
Files.delete(file);
}
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
try {
Files.delete(dir);
} catch (DirectoryNotEmptyException e) {
// We're only interested in deleting empty trees here, just ignore directories with content
}
return FileVisitResult.CONTINUE;
}
});
} catch (IOException e) {
throw new AssertionError(e);
}
}

private static void assertIndexGenerations(Path repoRoot, long latestGen) throws IOException {
try (Stream<Path> repoRootBlobs = repoFilesByPrefix(repoRoot)) {
final long[] indexGenerations = repoRootBlobs.filter(p -> p.getFileName().toString().startsWith("index-"))
.map(p -> p.getFileName().toString().replace("index-", ""))
.mapToLong(Long::parseLong).sorted().toArray();
assertEquals(latestGen, indexGenerations[indexGenerations.length - 1]);
assertTrue(indexGenerations.length <= 2);
}
}

private static void assertIndexUUIDs(Path repoRoot, RepositoryData repositoryData) throws IOException {
final List<String> expectedIndexUUIDs =
repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toList());
try (Stream<Path> indexRoots = repoFilesByPrefix(repoRoot.resolve("indices"))) {
final List<String> foundIndexUUIDs = indexRoots.filter(s -> s.getFileName().toString().startsWith("extra") == false)
.map(p -> p.getFileName().toString()).collect(Collectors.toList());
assertThat(foundIndexUUIDs, containsInAnyOrder(expectedIndexUUIDs.toArray(Strings.EMPTY_ARRAY)));
}
}

private static void assertSnapshotUUIDs(Path repoRoot, RepositoryData repositoryData) throws IOException {
final List<String> expectedSnapshotUUIDs =
repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toList());
for (String prefix : new String[]{"snap-", "meta-"}) {
try (Stream<Path> repoRootBlobs = repoFilesByPrefix(repoRoot)) {
final Collection<String> foundSnapshotUUIDs = repoRootBlobs.filter(p -> p.getFileName().toString().startsWith(prefix))
.map(p -> p.getFileName().toString().replace(prefix, "").replace(".dat", ""))
.collect(Collectors.toSet());
assertThat(foundSnapshotUUIDs, containsInAnyOrder(expectedSnapshotUUIDs.toArray(Strings.EMPTY_ARRAY)));
}
}
}

/**
* List contents of a blob path and return an empty stream if the path doesn't exist.
* @param prefix Path to find children for
* @return stream of child paths
* @throws IOException on failure
*/
private static Stream<Path> repoFilesByPrefix(Path prefix) throws IOException {
try {
return Files.list(prefix);
} catch (FileNotFoundException | NoSuchFileException e) {
return Stream.empty();
}
}

private void clearDisruptionsAndAwaitSync() {
testClusterNodes.clearNetworkDisruptions();
runUntil(() -> {
Expand Down