Skip to content

Commit 892b7d5

Browse files
committed
Harden periodically check to avoid endless flush loop (#29125)
In #28350, we fixed an endless flushing loop which may happen on replicas by tightening the relation between the flush action and the periodically flush condition. 1. The periodically flush condition is enabled only if it is disabled after a flush. 2. If the periodically flush condition is enabled then a flush will actually happen regardless of Lucene state. (1) and (2) guarantee that a flushing loop will be terminated. Sadly, the condition 1 can be violated in edge cases as we used two different algorithms to evaluate the current and future uncommitted translog size. - We use method `uncommittedSizeInBytes` to calculate current uncommitted size. It is the sum of translogs whose generation at least the minGen (determined by a given seqno). We pick a continuous range of translogs since the minGen to evaluate the current uncommitted size. - We use method `sizeOfGensAboveSeqNoInBytes` to calculate the future uncommitted size. It is the sum of translogs whose maxSeqNo at least the given seqNo. Here we don't pick a range but select translog one by one. Suppose we have 3 translogs `gen1={#1,#2}, gen2={}, gen3={#3} and seqno=#1`, `uncommittedSizeInBytes` is the sum of gen1, gen2, and gen3 while `sizeOfGensAboveSeqNoInBytes` is the sum of gen1 and gen3. Gen2 is excluded because its maxSeqno is still -1. This commit removes both `sizeOfGensAboveSeqNoInBytes` and `uncommittedSizeInBytes` methods, then enforces an engine to use only `sizeInBytesByMinGen` method to evaluate the periodically flush condition. Closes #29097 Relates ##28350
1 parent e88181b commit 892b7d5

File tree

7 files changed

+95
-68
lines changed

7 files changed

+95
-68
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1509,7 +1509,8 @@ final boolean tryRenewSyncCommit() {
15091509
ensureOpen();
15101510
ensureCanFlush();
15111511
String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID);
1512-
if (syncId != null && translog.uncommittedOperations() == 0 && indexWriter.hasUncommittedChanges()) {
1512+
long translogGenOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
1513+
if (syncId != null && indexWriter.hasUncommittedChanges() && translog.totalOperationsByMinGen(translogGenOfLastCommit) == 0) {
15131514
logger.trace("start renewing sync commit [{}]", syncId);
15141515
commitIndexWriter(indexWriter, translog, syncId);
15151516
logger.debug("successfully sync committed. sync id [{}].", syncId);
@@ -1531,19 +1532,30 @@ final boolean tryRenewSyncCommit() {
15311532
@Override
15321533
public boolean shouldPeriodicallyFlush() {
15331534
ensureOpen();
1535+
final long translogGenerationOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
15341536
final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes();
1535-
final long uncommittedSizeOfCurrentCommit = translog.uncommittedSizeInBytes();
1536-
if (uncommittedSizeOfCurrentCommit < flushThreshold) {
1537+
if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
15371538
return false;
15381539
}
15391540
/*
1540-
* We should only flush ony if the shouldFlush condition can become false after flushing.
1541-
* This condition will change if the `uncommittedSize` of the new commit is smaller than
1542-
* the `uncommittedSize` of the current commit. This method is to maintain translog only,
1543-
* thus the IndexWriter#hasUncommittedChanges condition is not considered.
1541+
* We flush to reduce the size of uncommitted translog but strictly speaking the uncommitted size won't always be
1542+
* below the flush-threshold after a flush. To avoid getting into an endless loop of flushing, we only enable the
1543+
* periodically flush condition if this condition is disabled after a flush. The condition will change if the new
1544+
* commit points to the later generation the last commit's(eg. gen-of-last-commit < gen-of-new-commit)[1].
1545+
*
1546+
* When the local checkpoint equals to max_seqno, and translog-gen of the last commit equals to translog-gen of
1547+
* the new commit, we know that the last generation must contain operations because its size is above the flush
1548+
* threshold and the flush-threshold is guaranteed to be higher than an empty translog by the setting validation.
1549+
* This guarantees that the new commit will point to the newly rolled generation. In fact, this scenario only
1550+
* happens when the generation-threshold is close to or above the flush-threshold; otherwise we have rolled
1551+
* generations as the generation-threshold was reached, then the first condition (eg. [1]) is already satisfied.
1552+
*
1553+
* This method is to maintain translog only, thus IndexWriter#hasUncommittedChanges condition is not considered.
15441554
*/
1545-
final long uncommittedSizeOfNewCommit = translog.sizeOfGensAboveSeqNoInBytes(localCheckpointTracker.getCheckpoint() + 1);
1546-
return uncommittedSizeOfNewCommit < uncommittedSizeOfCurrentCommit;
1555+
final long translogGenerationOfNewCommit =
1556+
translog.getMinGenerationForSeqNo(localCheckpointTracker.getCheckpoint() + 1).translogFileGeneration;
1557+
return translogGenerationOfLastCommit < translogGenerationOfNewCommit
1558+
|| localCheckpointTracker.getCheckpoint() == localCheckpointTracker.getMaxSeqNo();
15471559
}
15481560

15491561
@Override

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 7 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -356,26 +356,11 @@ public long getMinFileGeneration() {
356356
}
357357
}
358358

359-
360-
/**
361-
* Returns the number of operations in the translog files that aren't committed to lucene.
362-
*/
363-
public int uncommittedOperations() {
364-
return totalOperations(deletionPolicy.getTranslogGenerationOfLastCommit());
365-
}
366-
367-
/**
368-
* Returns the size in bytes of the translog files that aren't committed to lucene.
369-
*/
370-
public long uncommittedSizeInBytes() {
371-
return sizeInBytesByMinGen(deletionPolicy.getTranslogGenerationOfLastCommit());
372-
}
373-
374359
/**
375360
* Returns the number of operations in the translog files
376361
*/
377362
public int totalOperations() {
378-
return totalOperations(-1);
363+
return totalOperationsByMinGen(-1);
379364
}
380365

381366
/**
@@ -406,9 +391,9 @@ static long findEarliestLastModifiedAge(long currentTime, Iterable<TranslogReade
406391
}
407392

408393
/**
409-
* Returns the number of operations in the transaction files that aren't committed to lucene..
394+
* Returns the number of operations in the translog files at least the given generation
410395
*/
411-
private int totalOperations(long minGeneration) {
396+
public int totalOperationsByMinGen(long minGeneration) {
412397
try (ReleasableLock ignored = readLock.acquire()) {
413398
ensureOpen();
414399
return Stream.concat(readers.stream(), Stream.of(current))
@@ -429,9 +414,9 @@ public int estimateTotalOperationsFromMinSeq(long minSeqNo) {
429414
}
430415

431416
/**
432-
* Returns the size in bytes of the translog files above the given generation
417+
* Returns the size in bytes of the translog files at least the given generation
433418
*/
434-
private long sizeInBytesByMinGen(long minGeneration) {
419+
public long sizeInBytesByMinGen(long minGeneration) {
435420
try (ReleasableLock ignored = readLock.acquire()) {
436421
ensureOpen();
437422
return Stream.concat(readers.stream(), Stream.of(current))
@@ -441,16 +426,6 @@ private long sizeInBytesByMinGen(long minGeneration) {
441426
}
442427
}
443428

444-
/**
445-
* Returns the size in bytes of the translog files with ops above the given seqNo
446-
*/
447-
public long sizeOfGensAboveSeqNoInBytes(long minSeqNo) {
448-
try (ReleasableLock ignored = readLock.acquire()) {
449-
ensureOpen();
450-
return readersAboveMinSeqNo(minSeqNo).mapToLong(BaseTranslogReader::sizeInBytes).sum();
451-
}
452-
}
453-
454429
/**
455430
* Creates a new translog for the specified generation.
456431
*
@@ -758,7 +733,8 @@ private void closeOnTragicEvent(Exception ex) {
758733
public TranslogStats stats() {
759734
// acquire lock to make the two numbers roughly consistent (no file change half way)
760735
try (ReleasableLock lock = readLock.acquire()) {
761-
return new TranslogStats(totalOperations(), sizeInBytes(), uncommittedOperations(), uncommittedSizeInBytes(), earliestLastModifiedAge());
736+
final long uncommittedGen = deletionPolicy.getTranslogGenerationOfLastCommit();
737+
return new TranslogStats(totalOperations(), sizeInBytes(), totalOperationsByMinGen(uncommittedGen), sizeInBytesByMinGen(uncommittedGen), earliestLastModifiedAge());
762738
}
763739
}
764740

server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,6 @@ public synchronized long getMinTranslogGenerationForRecovery() {
211211

212212
/**
213213
* Returns a translog generation that will be used to calculate the number of uncommitted operations since the last index commit.
214-
* See {@link Translog#uncommittedOperations()} and {@link Translog#uncommittedSizeInBytes()}
215214
*/
216215
public synchronized long getTranslogGenerationOfLastCommit() {
217216
return translogGenerationOfLastCommit;

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -722,14 +722,13 @@ public void testTranslogRecoveryDoesNotReplayIntoTranslog() throws IOException {
722722
recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) {
723723
@Override
724724
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
725-
assertThat(getTranslog().uncommittedOperations(), equalTo(docs));
725+
assertThat(getTranslog().stats().getUncommittedOperations(), equalTo(docs));
726726
final CommitId commitId = super.flush(force, waitIfOngoing);
727727
flushed.set(true);
728728
return commitId;
729729
}
730730
};
731-
732-
assertThat(recoveringEngine.getTranslog().uncommittedOperations(), equalTo(docs));
731+
assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(docs));
733732
recoveringEngine.recoverFromTranslog();
734733
assertTrue(flushed.get());
735734
} finally {
@@ -2883,7 +2882,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException {
28832882
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
28842883
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
28852884
assertEquals(2, engine.getTranslog().currentFileGeneration());
2886-
assertEquals(0L, engine.getTranslog().uncommittedOperations());
2885+
assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations());
28872886
}
28882887
}
28892888

@@ -3839,7 +3838,7 @@ protected long doGenerateSeqNoForOperation(Operation operation) {
38393838
System.nanoTime(),
38403839
reason));
38413840
assertThat(noOpEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo((long) (maxSeqNo + 1)));
3842-
assertThat(noOpEngine.getTranslog().uncommittedOperations(), equalTo(1 + gapsFilled));
3841+
assertThat(noOpEngine.getTranslog().stats().getUncommittedOperations(), equalTo(1 + gapsFilled));
38433842
// skip to the op that we added to the translog
38443843
Translog.Operation op;
38453844
Translog.Operation last = null;
@@ -4040,7 +4039,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
40404039
assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpointTracker().getCheckpoint());
40414040
recoveringEngine = new InternalEngine(copy(
40424041
replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get));
4043-
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().uncommittedOperations());
4042+
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().stats().getUncommittedOperations());
40444043
recoveringEngine.recoverFromTranslog();
40454044
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
40464045
assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint());
@@ -4075,7 +4074,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
40754074
recoveringEngine = new InternalEngine(
40764075
copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get));
40774076
if (flushed) {
4078-
assertEquals(0, recoveringEngine.getTranslog().uncommittedOperations());
4077+
assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(0));
40794078
}
40804079
recoveringEngine.recoverFromTranslog();
40814080
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
@@ -4505,39 +4504,80 @@ public void testCleanupCommitsWhenReleaseSnapshot() throws Exception {
45054504
public void testShouldPeriodicallyFlush() throws Exception {
45064505
assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false));
45074506
// A new engine may have more than one empty translog files - the test should account this extra.
4508-
final long extraTranslogSizeInNewEngine = engine.getTranslog().uncommittedSizeInBytes() - Translog.DEFAULT_HEADER_SIZE_IN_BYTES;
4507+
final Translog translog = engine.getTranslog();
4508+
final long extraTranslogSizeInNewEngine = engine.getTranslog().stats().getUncommittedSizeInBytes() - Translog.DEFAULT_HEADER_SIZE_IN_BYTES;
45094509
int numDocs = between(10, 100);
45104510
for (int id = 0; id < numDocs; id++) {
45114511
final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null);
45124512
engine.index(indexForDoc(doc));
45134513
}
45144514
assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false));
45154515
long flushThreshold = RandomNumbers.randomLongBetween(random(), 100,
4516-
engine.getTranslog().uncommittedSizeInBytes() - extraTranslogSizeInNewEngine);
4516+
engine.getTranslog().stats().getUncommittedSizeInBytes()- extraTranslogSizeInNewEngine);
45174517
final IndexSettings indexSettings = engine.config().getIndexSettings();
45184518
final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData())
45194519
.settings(Settings.builder().put(indexSettings.getSettings())
45204520
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build();
45214521
indexSettings.updateIndexMetaData(indexMetaData);
45224522
engine.onSettingsChanged();
4523-
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs));
4523+
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs));
45244524
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
45254525
engine.flush();
4526-
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0));
4526+
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0));
45274527
// Stale operations skipped by Lucene but added to translog - still able to flush
45284528
for (int id = 0; id < numDocs; id++) {
45294529
final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null);
45304530
final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 1L, id, false));
45314531
assertThat(result.isCreated(), equalTo(false));
45324532
}
45334533
SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos();
4534-
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs));
4534+
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs));
45354535
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
45364536
engine.flush(false, false);
45374537
assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo)));
4538-
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0));
4538+
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0));
4539+
// If the new index commit still points to the same translog generation as the current index commit,
4540+
// we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes.
4541+
engine.getLocalCheckpointTracker().generateSeqNo(); // create a gap here
4542+
for (int id = 0; id < numDocs; id++) {
4543+
if (randomBoolean()) {
4544+
translog.rollGeneration();
4545+
}
4546+
final ParsedDocument doc = testParsedDocument("new" + id, null, testDocumentWithTextField(), SOURCE, null);
4547+
engine.index(replicaIndexForDoc(doc, 2L, engine.getLocalCheckpointTracker().generateSeqNo(), false));
4548+
if (engine.shouldPeriodicallyFlush()) {
4549+
engine.flush();
4550+
assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo)));
4551+
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
4552+
}
4553+
}
45394554
}
45404555

4556+
public void testStressShouldPeriodicallyFlush() throws Exception {
4557+
final long flushThreshold = randomLongBetween(100, 5000);
4558+
final long generationThreshold = randomLongBetween(1000, 5000);
4559+
final IndexSettings indexSettings = engine.config().getIndexSettings();
4560+
final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData())
4561+
.settings(Settings.builder().put(indexSettings.getSettings())
4562+
.put(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey(), generationThreshold + "b")
4563+
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build();
4564+
indexSettings.updateIndexMetaData(indexMetaData);
4565+
engine.onSettingsChanged();
4566+
final int numOps = scaledRandomIntBetween(100, 10_000);
4567+
for (int i = 0; i < numOps; i++) {
4568+
final long localCheckPoint = engine.getLocalCheckpointTracker().getCheckpoint();
4569+
final long seqno = randomLongBetween(Math.max(0, localCheckPoint), localCheckPoint + 5);
4570+
final ParsedDocument doc = testParsedDocument(Long.toString(seqno), null, testDocumentWithTextField(), SOURCE, null);
4571+
engine.index(replicaIndexForDoc(doc, 1L, seqno, false));
4572+
if (rarely() && engine.getTranslog().shouldRollGeneration()) {
4573+
engine.rollTranslogGeneration();
4574+
}
4575+
if (rarely() || engine.shouldPeriodicallyFlush()) {
4576+
engine.flush();
4577+
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
4578+
}
4579+
}
4580+
}
45414581

45424582
public void testStressUpdateSameDocWhileGettingIt() throws IOException, InterruptedException {
45434583
final int iters = randomIntBetween(1, 15);

server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -359,29 +359,29 @@ public void testMaybeFlush() throws Exception {
359359
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {});
360360
assertTrue(shard.shouldPeriodicallyFlush());
361361
final Translog translog = shard.getEngine().getTranslog();
362-
assertEquals(2, translog.uncommittedOperations());
362+
assertEquals(2, translog.stats().getUncommittedOperations());
363363
client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON)
364364
.setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
365365
assertBusy(() -> { // this is async
366366
assertFalse(shard.shouldPeriodicallyFlush());
367367
});
368-
assertEquals(0, translog.uncommittedOperations());
368+
assertEquals(0, translog.stats().getUncommittedOperations());
369369
translog.sync();
370-
long size = Math.max(translog.uncommittedSizeInBytes(), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1);
371-
logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(),
372-
translog.uncommittedOperations(), translog.getGeneration());
370+
long size = Math.max(translog.stats().getUncommittedSizeInBytes(), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1);
371+
logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]",
372+
translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration());
373373
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(
374374
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES))
375375
.build()).get();
376376
client().prepareDelete("test", "test", "2").get();
377-
logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(),
378-
translog.uncommittedOperations(), translog.getGeneration());
377+
logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]",
378+
translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration());
379379
assertBusy(() -> { // this is async
380-
logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(),
381-
translog.uncommittedOperations(), translog.getGeneration());
380+
logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]",
381+
translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration());
382382
assertFalse(shard.shouldPeriodicallyFlush());
383383
});
384-
assertEquals(0, translog.uncommittedOperations());
384+
assertEquals(0, translog.stats().getUncommittedOperations());
385385
}
386386

387387
public void testMaybeRollTranslogGeneration() throws Exception {

0 commit comments

Comments
 (0)