Skip to content

Commit 683d072

Browse files
DaveCTurnerAdam Locke
authored andcommitted
Reduce merging in PersistedClusterStateService (elastic#79793)
When writing the cluster state index we flush a segment every 2000 docs or so, which sometimes triggers merging in the middle of the write process. This merging is often unnecessary since many of the segments being merged would have ended up containing no live docs at the end of the process and hence could have just been deleted. With this commit we adjust the merge policy to be much more relaxed about merging, permitting up to 100 segments per tier, since we only read this index very rarely and not on any hot paths. We also disable merging completely during the write process, checking just before commit to see if any merging should be done. Relates elastic#77466
1 parent edf2306 commit 683d072

File tree

2 files changed

+119
-10
lines changed

2 files changed

+119
-10
lines changed

server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@
2121
import org.apache.lucene.index.IndexWriter;
2222
import org.apache.lucene.index.IndexWriterConfig;
2323
import org.apache.lucene.index.LeafReaderContext;
24+
import org.apache.lucene.index.MergePolicy;
25+
import org.apache.lucene.index.NoMergePolicy;
2426
import org.apache.lucene.index.SerialMergeScheduler;
2527
import org.apache.lucene.index.Term;
28+
import org.apache.lucene.index.TieredMergePolicy;
2629
import org.apache.lucene.search.DocIdSetIterator;
2730
import org.apache.lucene.search.IndexSearcher;
2831
import org.apache.lucene.search.Query;
@@ -53,20 +56,21 @@
5356
import org.elasticsearch.common.util.ByteArray;
5457
import org.elasticsearch.common.util.PageCacheRecycler;
5558
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
56-
import org.elasticsearch.xcontent.NamedXContentRegistry;
57-
import org.elasticsearch.xcontent.ToXContent;
58-
import org.elasticsearch.xcontent.XContentBuilder;
59-
import org.elasticsearch.xcontent.XContentFactory;
60-
import org.elasticsearch.xcontent.XContentType;
6159
import org.elasticsearch.core.CheckedConsumer;
6260
import org.elasticsearch.core.Nullable;
6361
import org.elasticsearch.core.Releasable;
6462
import org.elasticsearch.core.Releasables;
63+
import org.elasticsearch.core.SuppressForbidden;
6564
import org.elasticsearch.core.TimeValue;
6665
import org.elasticsearch.core.internal.io.IOUtils;
6766
import org.elasticsearch.env.NodeEnvironment;
6867
import org.elasticsearch.env.NodeMetadata;
6968
import org.elasticsearch.index.Index;
69+
import org.elasticsearch.xcontent.NamedXContentRegistry;
70+
import org.elasticsearch.xcontent.ToXContent;
71+
import org.elasticsearch.xcontent.XContentBuilder;
72+
import org.elasticsearch.xcontent.XContentFactory;
73+
import org.elasticsearch.xcontent.XContentType;
7074

7175
import java.io.Closeable;
7276
import java.io.IOError;
@@ -124,6 +128,9 @@ public class PersistedClusterStateService {
124128
private static final String INDEX_UUID_FIELD_NAME = "index_uuid";
125129
private static final int COMMIT_DATA_SIZE = 4;
126130

131+
private static final MergePolicy NO_MERGE_POLICY = noMergePolicy();
132+
private static final MergePolicy DEFAULT_MERGE_POLICY = defaultMergePolicy();
133+
127134
public static final String METADATA_DIRECTORY_NAME = MetadataStateFormat.STATE_DIR_NAME;
128135

129136
public static final Setting<TimeValue> SLOW_WRITE_LOGGING_THRESHOLD = Setting.timeSetting("gateway.slow_write_logging_threshold",
@@ -193,10 +200,13 @@ private static IndexWriter createIndexWriter(Directory directory, boolean openEx
193200
indexWriterConfig.setOpenMode(openExisting ? IndexWriterConfig.OpenMode.APPEND : IndexWriterConfig.OpenMode.CREATE);
194201
// only commit when specifically instructed, we must not write any intermediate states
195202
indexWriterConfig.setCommitOnClose(false);
196-
// most of the data goes into stored fields which are not buffered, so we only really need a tiny buffer
203+
// most of the data goes into stored fields which are not buffered, so each doc written accounts for ~500B of indexing buffer
204+
// (see e.g. BufferedUpdates#BYTES_PER_DEL_TERM); a 1MB buffer therefore gets flushed every ~2000 docs.
197205
indexWriterConfig.setRAMBufferSizeMB(1.0);
198206
// merge on the write thread (e.g. while flushing)
199207
indexWriterConfig.setMergeScheduler(new SerialMergeScheduler());
208+
// apply the adjusted merge policy
209+
indexWriterConfig.setMergePolicy(DEFAULT_MERGE_POLICY);
200210

201211
return new IndexWriter(directory, indexWriterConfig);
202212
}
@@ -481,6 +491,28 @@ private static void consumeFromType(IndexSearcher indexSearcher, String type,
481491
FORMAT_PARAMS = new ToXContent.MapParams(params);
482492
}
483493

494+
@SuppressForbidden(reason = "merges are only temporarily suppressed, the merge scheduler does not need changing")
495+
private static MergePolicy noMergePolicy() {
496+
return NoMergePolicy.INSTANCE;
497+
}
498+
499+
private static MergePolicy defaultMergePolicy() {
500+
final TieredMergePolicy mergePolicy = new TieredMergePolicy();
501+
502+
// don't worry about cleaning up deletes too much, segments will often get completely deleted once they're old enough
503+
mergePolicy.setDeletesPctAllowed(50.0);
504+
// more/smaller segments means there's a better chance they just get deleted before needing a merge
505+
mergePolicy.setSegmentsPerTier(100);
506+
// ... but if we do end up merging them then do them all
507+
mergePolicy.setMaxMergeAtOnce(100);
508+
// always use compound segments to avoid fsync overhead
509+
mergePolicy.setNoCFSRatio(1.0);
510+
// segments are mostly tiny, so don't pretend they are bigger
511+
mergePolicy.setFloorSegmentMB(0.001);
512+
513+
return mergePolicy;
514+
}
515+
484516
/**
485517
* Encapsulates a single {@link IndexWriter} with its {@link Directory} for ease of closing, and a {@link Logger}. There is one of these
486518
* for each data path.
@@ -522,7 +554,15 @@ void flush() throws IOException {
522554
this.indexWriter.flush();
523555
}
524556

557+
void startWrite() {
558+
// Disable merges during indexing - many older segments will ultimately contain no live docs and simply get deleted.
559+
indexWriter.getConfig().setMergePolicy(NO_MERGE_POLICY);
560+
}
561+
525562
void prepareCommit(String nodeId, long currentTerm, long lastAcceptedVersion) throws IOException {
563+
indexWriter.getConfig().setMergePolicy(DEFAULT_MERGE_POLICY);
564+
indexWriter.maybeMerge();
565+
526566
final Map<String, String> commitData = new HashMap<>(COMMIT_DATA_SIZE);
527567
commitData.put(CURRENT_TERM_KEY, Long.toString(currentTerm));
528568
commitData.put(LAST_ACCEPTED_VERSION_KEY, Long.toString(lastAcceptedVersion));
@@ -594,6 +634,11 @@ public void writeFullStateAndCommit(long currentTerm, ClusterState clusterState)
594634
ensureOpen();
595635
try {
596636
final long startTimeMillis = relativeTimeMillisSupplier.getAsLong();
637+
638+
for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
639+
metadataIndexWriter.startWrite();
640+
}
641+
597642
final WriterStats stats = overwriteMetadata(clusterState.metadata());
598643
commit(currentTerm, clusterState.version());
599644
fullStateWritten = true;
@@ -623,6 +668,11 @@ void writeIncrementalStateAndCommit(long currentTerm, ClusterState previousClust
623668

624669
try {
625670
final long startTimeMillis = relativeTimeMillisSupplier.getAsLong();
671+
672+
for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
673+
metadataIndexWriter.startWrite();
674+
}
675+
626676
final WriterStats stats = updateMetadata(previousClusterState.metadata(), clusterState.metadata());
627677
commit(currentTerm, clusterState.version());
628678
final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis;

server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.ArrayList;
5454
import java.util.Arrays;
5555
import java.util.Collection;
56+
import java.util.Comparator;
5657
import java.util.HashSet;
5758
import java.util.List;
5859
import java.util.concurrent.atomic.AtomicBoolean;
@@ -421,9 +422,8 @@ public void sync(Collection<String> names) {
421422
assertFalse(writer.isOpen());
422423
}
423424

424-
// check if we can open writer again
425+
// noinspection EmptyTryBlock - we are just checking that opening the writer again doesn't throw any exceptions
425426
try (Writer ignored = persistedClusterStateService.createWriter()) {
426-
427427
}
428428
}
429429
}
@@ -469,9 +469,8 @@ public void rename(String source, String dest) throws IOException {
469469
assertFalse(writer.isOpen());
470470
}
471471

472-
// check if we can open writer again
472+
// noinspection EmptyTryBlock - we are just checking that opening the writer again doesn't throw any exceptions
473473
try (Writer ignored = persistedClusterStateService.createWriter()) {
474-
475474
}
476475
}
477476
}
@@ -881,6 +880,66 @@ public void testFailsIfCorrupt() throws IOException {
881880
}
882881
}
883882

883+
public void testLimitsFileCount() throws IOException {
884+
try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) {
885+
final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment);
886+
887+
try (Writer writer = persistedClusterStateService.createWriter()) {
888+
889+
ClusterState clusterState = ClusterState.EMPTY_STATE;
890+
writer.writeFullStateAndCommit(1, ClusterState.EMPTY_STATE);
891+
892+
final int indexCount = between(2, usually() ? 20 : 1000);
893+
894+
final int maxSegmentCount = (indexCount / 100) + 100; // only expect to have two tiers, each with max 100 segments
895+
final int filesPerSegment = 3; // .cfe, .cfs, .si
896+
final int extraFiles = 2; // segments_*, write.lock
897+
final int maxFileCount = (maxSegmentCount * filesPerSegment) + extraFiles;
898+
899+
logger.info("--> adding [{}] indices one-by-one, verifying file count does not exceed [{}]", indexCount, maxFileCount);
900+
for (int i = 0; i < indexCount; i++) {
901+
final ClusterState previousClusterState = clusterState;
902+
903+
clusterState = ClusterState.builder(clusterState)
904+
.metadata(Metadata.builder(clusterState.metadata())
905+
.version(i + 2)
906+
.put(IndexMetadata.builder("index-" + i)
907+
.settings(Settings.builder()
908+
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
909+
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
910+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
911+
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())))))
912+
.incrementVersion().build();
913+
914+
writer.writeIncrementalStateAndCommit(1, previousClusterState, clusterState);
915+
916+
for (Path dataPath : nodeEnvironment.nodeDataPaths()) {
917+
try (DirectoryStream<Path> files
918+
= Files.newDirectoryStream(dataPath.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME))) {
919+
920+
int fileCount = 0;
921+
final List<String> fileNames = new ArrayList<>();
922+
for (Path filePath : files) {
923+
final String fileName = filePath.getFileName().toString();
924+
if (ExtrasFS.isExtra(fileName) == false) {
925+
fileNames.add(fileName);
926+
fileCount += 1;
927+
}
928+
}
929+
930+
if (maxFileCount < fileCount) {
931+
// don't bother preparing the description unless we are failing
932+
fileNames.sort(Comparator.naturalOrder());
933+
fail("after " + indexCount + " indices have " + fileCount + " files vs max of " + maxFileCount + ": " +
934+
fileNames);
935+
}
936+
}
937+
}
938+
}
939+
}
940+
}
941+
}
942+
884943
private void assertExpectedLogs(long currentTerm, ClusterState previousState, ClusterState clusterState,
885944
PersistedClusterStateService.Writer writer, MockLogAppender.LoggingExpectation expectation)
886945
throws IllegalAccessException, IOException {

0 commit comments

Comments
 (0)