|
105 | 105 | import org.elasticsearch.cluster.service.ClusterApplierService; |
106 | 106 | import org.elasticsearch.cluster.service.ClusterService; |
107 | 107 | import org.elasticsearch.cluster.service.MasterService; |
| 108 | +import org.elasticsearch.common.Strings; |
| 109 | +import org.elasticsearch.common.bytes.BytesArray; |
108 | 110 | import org.elasticsearch.common.io.stream.NamedWriteableRegistry; |
109 | 111 | import org.elasticsearch.common.network.NetworkModule; |
110 | 112 | import org.elasticsearch.common.settings.ClusterSettings; |
|
115 | 117 | import org.elasticsearch.common.util.PageCacheRecycler; |
116 | 118 | import org.elasticsearch.common.util.concurrent.AbstractRunnable; |
117 | 119 | import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; |
| 120 | +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; |
118 | 121 | import org.elasticsearch.common.xcontent.NamedXContentRegistry; |
| 122 | +import org.elasticsearch.common.xcontent.XContentHelper; |
| 123 | +import org.elasticsearch.common.xcontent.XContentParser; |
| 124 | +import org.elasticsearch.common.xcontent.XContentType; |
119 | 125 | import org.elasticsearch.env.Environment; |
120 | 126 | import org.elasticsearch.env.NodeEnvironment; |
121 | 127 | import org.elasticsearch.env.TestEnvironment; |
|
140 | 146 | import org.elasticsearch.ingest.IngestService; |
141 | 147 | import org.elasticsearch.node.ResponseCollectorService; |
142 | 148 | import org.elasticsearch.plugins.PluginsService; |
| 149 | +import org.elasticsearch.repositories.IndexId; |
143 | 150 | import org.elasticsearch.repositories.RepositoriesService; |
144 | 151 | import org.elasticsearch.repositories.Repository; |
| 152 | +import org.elasticsearch.repositories.RepositoryData; |
145 | 153 | import org.elasticsearch.repositories.fs.FsRepository; |
146 | 154 | import org.elasticsearch.script.ScriptService; |
147 | 155 | import org.elasticsearch.search.SearchService; |
|
160 | 168 | import org.junit.Before; |
161 | 169 |
|
162 | 170 | import java.io.IOException; |
| 171 | +import java.nio.ByteBuffer; |
| 172 | +import java.nio.file.Files; |
163 | 173 | import java.nio.file.Path; |
164 | 174 | import java.util.Collection; |
165 | 175 | import java.util.Collections; |
@@ -206,8 +216,12 @@ public void createServices() { |
206 | 216 | } |
207 | 217 |
|
208 | 218 | @After |
209 | | - public void stopServices() { |
210 | | - testClusterNodes.nodes.values().forEach(TestClusterNode::stop); |
| 219 | + public void verifyReposThenStopServices() throws IOException { |
| 220 | + try { |
| 221 | + assertNoStaleRepositoryData(); |
| 222 | + } finally { |
| 223 | + testClusterNodes.nodes.values().forEach(TestClusterNode::stop); |
| 224 | + } |
211 | 225 | } |
212 | 226 |
|
213 | 227 | public void testSuccessfulSnapshotAndRestore() { |
@@ -502,6 +516,65 @@ public void run() { |
502 | 516 | assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0))); |
503 | 517 | } |
504 | 518 |
|
| 519 | + /** |
| 520 | + * Assert that there are no unreferenced indices or unreferenced root-level metadata blobs in any repository. |
| 521 | + * TODO: Expand the logic here to also check for unreferenced segment blobs and shard level metadata |
| 522 | + */ |
| 523 | + private void assertNoStaleRepositoryData() throws IOException { |
| 524 | + final Path repoPath = tempDir.resolve("repo").toAbsolutePath(); |
| 525 | + final List<Path> repos; |
| 526 | + try (Stream<Path> reposDir = Files.list(repoPath)) { |
| 527 | + repos = reposDir.filter(s -> s.getFileName().toString().startsWith("extra") == false).collect(Collectors.toList()); |
| 528 | + } |
| 529 | + for (Path repoRoot : repos) { |
| 530 | + final Path latestIndexGenBlob = repoRoot.resolve("index.latest"); |
| 531 | + assertTrue("Could not find index.latest blob for repo at [" + repoRoot + ']', Files.exists(latestIndexGenBlob)); |
| 532 | + final long latestGen = ByteBuffer.wrap(Files.readAllBytes(latestIndexGenBlob)).getLong(0); |
| 533 | + assertIndexGenerations(repoRoot, latestGen); |
| 534 | + final RepositoryData repositoryData; |
| 535 | + try (XContentParser parser = |
| 536 | + XContentHelper.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, |
| 537 | + new BytesArray(Files.readAllBytes(repoRoot.resolve("index-" + latestGen))), XContentType.JSON)) { |
| 538 | + repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen); |
| 539 | + } |
| 540 | + assertIndexUUIDs(repoRoot, repositoryData); |
| 541 | + assertSnapshotUUIDs(repoRoot, repositoryData); |
| 542 | + } |
| 543 | + } |
| 544 | + |
| 545 | + private static void assertIndexGenerations(Path repoRoot, long latestGen) throws IOException { |
| 546 | + try (Stream<Path> repoRootBlobs = Files.list(repoRoot)) { |
| 547 | + final long[] indexGenerations = repoRootBlobs.filter(p -> p.getFileName().toString().startsWith("index-")) |
| 548 | + .map(p -> p.getFileName().toString().replace("index-", "")) |
| 549 | + .mapToLong(Long::parseLong).sorted().toArray(); |
| 550 | + assertEquals(latestGen, indexGenerations[indexGenerations.length - 1]); |
| 551 | + assertTrue(indexGenerations.length <= 2); |
| 552 | + } |
| 553 | + } |
| 554 | + |
| 555 | + private static void assertIndexUUIDs(Path repoRoot, RepositoryData repositoryData) throws IOException { |
| 556 | + final List<String> expectedIndexUUIDs = |
| 557 | + repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toList()); |
| 558 | + try (Stream<Path> indexRoots = Files.list(repoRoot.resolve("indices"))) { |
| 559 | + final List<String> foundIndexUUIDs = indexRoots.filter(s -> s.getFileName().toString().startsWith("extra") == false) |
| 560 | + .map(p -> p.getFileName().toString()).collect(Collectors.toList()); |
| 561 | + assertThat(foundIndexUUIDs, containsInAnyOrder(expectedIndexUUIDs.toArray(Strings.EMPTY_ARRAY))); |
| 562 | + } |
| 563 | + } |
| 564 | + |
| 565 | + private static void assertSnapshotUUIDs(Path repoRoot, RepositoryData repositoryData) throws IOException { |
| 566 | + final List<String> expectedSnapshotUUIDs = |
| 567 | + repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toList()); |
| 568 | + for (String prefix : new String[]{"snap-", "meta-"}) { |
| 569 | + try (Stream<Path> repoRootBlobs = Files.list(repoRoot)) { |
| 570 | + final Collection<String> foundSnapshotUUIDs = repoRootBlobs.filter(p -> p.getFileName().toString().startsWith(prefix)) |
| 571 | + .map(p -> p.getFileName().toString().replace(prefix, "").replace(".dat", "")) |
| 572 | + .collect(Collectors.toSet()); |
| 573 | + assertThat(foundSnapshotUUIDs, containsInAnyOrder(expectedSnapshotUUIDs.toArray(Strings.EMPTY_ARRAY))); |
| 574 | + } |
| 575 | + } |
| 576 | + } |
| 577 | + |
505 | 578 | private void clearDisruptionsAndAwaitSync() { |
506 | 579 | testClusterNodes.clearNetworkDisruptions(); |
507 | 580 | runUntil(() -> { |
|
0 commit comments