Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,45 @@
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class BlobStoreIncrementalityIT extends AbstractSnapshotIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), InternalSettingsPlugin.class);
}

public void testIncrementalBehaviorOnPrimaryFailover() throws InterruptedException, ExecutionException, IOException {
internalCluster().startMasterOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode();
Expand Down Expand Up @@ -166,6 +184,73 @@ public void testForceMergeCausesFullSnapshot() throws Exception {
assertThat(secondSnapshotShardStatus.getIncrementalFileCount(), greaterThan(0));
}

public void testRecordCorrectSegmentCountsWithBackgroundMerges() throws Exception {
final String repoName = "test-repo";
createRepository(repoName, "fs");

final String indexName = "test";
// disable merges
assertAcked(prepareCreate(indexName).setSettings(indexSettingsNoReplicas(1).put(MergePolicyConfig.INDEX_MERGE_ENABLED, "false")));

// create an empty snapshot so that later snapshots run as quickly as possible
createFullSnapshot(repoName, "empty");

// create a situation where we temporarily have a bunch of segments until the merges can catch up
long id = 0;
final int rounds = scaledRandomIntBetween(3, 5);
for (int i = 0; i < rounds; ++i) {
final int numDocs = scaledRandomIntBetween(100, 1000);
BulkRequestBuilder request = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int j = 0; j < numDocs; ++j) {
request.add(
Requests.indexRequest(indexName)
.id(Long.toString(id++))
.source(jsonBuilder().startObject().field("l", randomLong()).endObject())
);
}
assertNoFailures(request.get());
}

// snapshot with a bunch of unmerged segments
final SnapshotInfo before = createFullSnapshot(repoName, "snapshot-before");
final SnapshotInfo.IndexSnapshotDetails beforeIndexDetails = before.indexSnapshotDetails().get(indexName);
final long beforeSegmentCount = beforeIndexDetails.getMaxSegmentsPerShard();

// reactivate merges
assertAcked(admin().indices().prepareClose(indexName).get());
assertAcked(
admin().indices()
.prepareUpdateSettings(indexName)
.setSettings(
Settings.builder()
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 we know we did ≥3 rounds, each of which made ≥1 segment, and they're all pretty small (≤2MB) so this guarantees a merge IIUC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ that was assumption here. Also ran a few hundred rounds of this to verify and it seems to hold 🤞

.put(MergePolicyConfig.INDEX_MERGE_ENABLED, "true")
)
);
assertAcked(admin().indices().prepareOpen(indexName).get());
assertEquals(0, admin().indices().prepareForceMerge(indexName).setFlush(true).get().getFailedShards());

// wait for merges to reduce segment count
assertBusy(() -> {
IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).setSegments(true).get();
assertThat(stats.getIndex(indexName).getPrimaries().getSegments().getCount(), lessThan(beforeSegmentCount));
}, 30L, TimeUnit.SECONDS);

final SnapshotInfo after = createFullSnapshot(repoName, "snapshot-after");
final int incrementalFileCount = clusterAdmin().prepareSnapshotStatus()
.setRepository(repoName)
.setSnapshots(after.snapshotId().getName())
.get()
.getSnapshots()
.get(0)
.getStats()
.getIncrementalFileCount();
assertEquals(0, incrementalFileCount);
logger.info("--> no files have changed between snapshots, asserting that segment counts are constant as well");
final SnapshotInfo.IndexSnapshotDetails afterIndexDetails = after.indexSnapshotDetails().get(indexName);
assertEquals(beforeSegmentCount, afterIndexDetails.getMaxSegmentsPerShard());
}

private void assertCountInIndexThenDelete(String index, long expectedCount) {
logger.info("--> asserting that index [{}] contains [{}] documents", index, expectedCount);
assertDocCount(index, expectedCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2773,7 +2773,7 @@ public void snapshotShard(SnapshotShardContext context) {
final ShardSnapshotResult shardSnapshotResult = new ShardSnapshotResult(
indexGeneration,
ByteSizeValue.ofBytes(blobStoreIndexShardSnapshot.totalSize()),
snapshotIndexCommit.getSegmentCount()
getSegmentInfoFileCount(blobStoreIndexShardSnapshot.indexFiles())
);
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), shardSnapshotResult);
context.onResponse(shardSnapshotResult);
Expand Down