diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/BlobStoreIncrementalityIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/BlobStoreIncrementalityIT.java index 98641782f804f..874f6fe8fee49 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/BlobStoreIncrementalityIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/BlobStoreIncrementalityIT.java @@ -13,27 +13,45 @@ import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.index.MergePolicyConfig; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class BlobStoreIncrementalityIT extends AbstractSnapshotIntegTestCase { + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopy(super.nodePlugins(), InternalSettingsPlugin.class); + } + public void testIncrementalBehaviorOnPrimaryFailover() throws InterruptedException, ExecutionException, IOException { internalCluster().startMasterOnlyNode(); final String primaryNode = internalCluster().startDataOnlyNode(); @@ -166,6 +184,73 @@ public void testForceMergeCausesFullSnapshot() throws Exception { assertThat(secondSnapshotShardStatus.getIncrementalFileCount(), greaterThan(0)); } + public void testRecordCorrectSegmentCountsWithBackgroundMerges() throws Exception { + final String repoName = "test-repo"; + createRepository(repoName, "fs"); + + final String indexName = "test"; + // disable merges + assertAcked(prepareCreate(indexName).setSettings(indexSettingsNoReplicas(1).put(MergePolicyConfig.INDEX_MERGE_ENABLED, "false"))); + + // create an empty snapshot so that later snapshots run as quickly as possible + createFullSnapshot(repoName, "empty"); + + // create a situation where we temporarily have a bunch of segments until the merges can catch up + long id = 0; + final int rounds = scaledRandomIntBetween(3, 5); + for (int i = 0; i < rounds; ++i) { + final int numDocs = scaledRandomIntBetween(100, 1000); + BulkRequestBuilder request = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int j = 0; j < numDocs; ++j) { + request.add( + Requests.indexRequest(indexName) + .id(Long.toString(id++)) + .source(jsonBuilder().startObject().field("l", randomLong()).endObject()) + ); + } + assertNoFailures(request.get()); + } + + // snapshot with a bunch of unmerged segments + final SnapshotInfo before = createFullSnapshot(repoName, "snapshot-before"); + final SnapshotInfo.IndexSnapshotDetails beforeIndexDetails = before.indexSnapshotDetails().get(indexName); + final long beforeSegmentCount = beforeIndexDetails.getMaxSegmentsPerShard(); + + // reactivate merges + assertAcked(admin().indices().prepareClose(indexName).get()); + assertAcked( + admin().indices() + .prepareUpdateSettings(indexName) + .setSettings( + Settings.builder() + .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") + .put(MergePolicyConfig.INDEX_MERGE_ENABLED, "true") + ) + ); + assertAcked(admin().indices().prepareOpen(indexName).get()); + assertEquals(0, admin().indices().prepareForceMerge(indexName).setFlush(true).get().getFailedShards()); + + // wait for merges to reduce segment count + assertBusy(() -> { + IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).setSegments(true).get(); + assertThat(stats.getIndex(indexName).getPrimaries().getSegments().getCount(), lessThan(beforeSegmentCount)); + }, 30L, TimeUnit.SECONDS); + + final SnapshotInfo after = createFullSnapshot(repoName, "snapshot-after"); + final int incrementalFileCount = clusterAdmin().prepareSnapshotStatus() + .setRepository(repoName) + .setSnapshots(after.snapshotId().getName()) + .get() + .getSnapshots() + .get(0) + .getStats() + .getIncrementalFileCount(); + assertEquals(0, incrementalFileCount); + logger.info("--> no files have changed between snapshots, asserting that segment counts are constant as well"); + final SnapshotInfo.IndexSnapshotDetails afterIndexDetails = after.indexSnapshotDetails().get(indexName); + assertEquals(beforeSegmentCount, afterIndexDetails.getMaxSegmentsPerShard()); + } + private void assertCountInIndexThenDelete(String index, long expectedCount) { logger.info("--> asserting that index [{}] contains [{}] documents", index, expectedCount); assertDocCount(index, expectedCount); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 7798e596efe56..c1121695c6cde 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -2758,7 +2758,7 @@ public void snapshotShard(SnapshotShardContext context) { final ShardSnapshotResult shardSnapshotResult = new ShardSnapshotResult( indexGeneration, ByteSizeValue.ofBytes(blobStoreIndexShardSnapshot.totalSize()), - snapshotIndexCommit.getSegmentCount() + getSegmentInfoFileCount(blobStoreIndexShardSnapshot.indexFiles()) ); snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), shardSnapshotResult); context.onResponse(shardSnapshotResult);