Skip to content

Commit 57fc705

Browse files
authored
Keep commits and translog up to the global checkpoint (#27606)
We need to keep index commits and translog operations up to the current global checkpoint to allow us to throw away unsafe operations and increase the operation-based recovery chance. This is achieved by a new index deletion policy. Relates #10708
1 parent cc1a301 commit 57fc705

File tree

12 files changed

+541
-129
lines changed

12 files changed

+541
-129
lines changed

core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,43 +21,48 @@
2121

2222
import org.apache.lucene.index.IndexCommit;
2323
import org.apache.lucene.index.IndexDeletionPolicy;
24-
import org.apache.lucene.index.SnapshotDeletionPolicy;
24+
import org.elasticsearch.index.seqno.SequenceNumbers;
2525
import org.elasticsearch.index.translog.Translog;
2626
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
2727

2828
import java.io.IOException;
2929
import java.util.List;
30+
import java.util.Map;
31+
import java.util.function.LongSupplier;
3032

3133
/**
3234
* An {@link IndexDeletionPolicy} that coordinates between Lucene's commits and the retention of translog generation files,
3335
* making sure that all translog files that are needed to recover from the Lucene commit are not deleted.
36+
* <p>
37+
* In particular, this policy will delete index commits whose max sequence number is at most
38+
* the current global checkpoint except the index commit which has the highest max sequence number among those.
3439
*/
35-
class CombinedDeletionPolicy extends IndexDeletionPolicy {
36-
40+
final class CombinedDeletionPolicy extends IndexDeletionPolicy {
3741
private final TranslogDeletionPolicy translogDeletionPolicy;
3842
private final EngineConfig.OpenMode openMode;
43+
private final LongSupplier globalCheckpointSupplier;
3944

40-
private final SnapshotDeletionPolicy indexDeletionPolicy;
41-
42-
CombinedDeletionPolicy(SnapshotDeletionPolicy indexDeletionPolicy, TranslogDeletionPolicy translogDeletionPolicy,
43-
EngineConfig.OpenMode openMode) {
44-
this.indexDeletionPolicy = indexDeletionPolicy;
45-
this.translogDeletionPolicy = translogDeletionPolicy;
45+
CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
46+
LongSupplier globalCheckpointSupplier) {
4647
this.openMode = openMode;
48+
this.translogDeletionPolicy = translogDeletionPolicy;
49+
this.globalCheckpointSupplier = globalCheckpointSupplier;
4750
}
4851

4952
@Override
5053
public void onInit(List<? extends IndexCommit> commits) throws IOException {
51-
indexDeletionPolicy.onInit(commits);
5254
switch (openMode) {
5355
case CREATE_INDEX_AND_TRANSLOG:
56+
assert commits.isEmpty() : "index is created, but we have commits";
5457
break;
5558
case OPEN_INDEX_CREATE_TRANSLOG:
5659
assert commits.isEmpty() == false : "index is opened, but we have no commits";
60+
// When an engine starts with OPEN_INDEX_CREATE_TRANSLOG, a new fresh index commit will be created immediately.
61+
// We therefore can simply skip processing here as `onCommit` will be called right after with a new commit.
5762
break;
5863
case OPEN_INDEX_AND_TRANSLOG:
5964
assert commits.isEmpty() == false : "index is opened, but we have no commits";
60-
setLastCommittedTranslogGeneration(commits);
65+
onCommit(commits);
6166
break;
6267
default:
6368
throw new IllegalArgumentException("unknown openMode [" + openMode + "]");
@@ -66,24 +71,56 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {
6671

6772
@Override
6873
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
69-
indexDeletionPolicy.onCommit(commits);
70-
setLastCommittedTranslogGeneration(commits);
74+
final int keptPosition = indexOfKeptCommits(commits);
75+
for (int i = 0; i < keptPosition; i++) {
76+
commits.get(i).delete();
77+
}
78+
updateTranslogDeletionPolicy(commits.get(keptPosition), commits.get(commits.size() - 1));
7179
}
7280

73-
private void setLastCommittedTranslogGeneration(List<? extends IndexCommit> commits) throws IOException {
74-
// when opening an existing lucene index, we currently always open the last commit.
75-
// we therefore use the translog gen as the one that will be required for recovery
76-
final IndexCommit indexCommit = commits.get(commits.size() - 1);
77-
assert indexCommit.isDeleted() == false : "last commit is deleted";
78-
long minGen = Long.parseLong(indexCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
79-
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minGen);
80-
}
81+
private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, final IndexCommit lastCommit) throws IOException {
82+
assert minRequiredCommit.isDeleted() == false : "The minimum required commit must not be deleted";
83+
final long minRequiredGen = Long.parseLong(minRequiredCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
84+
85+
assert lastCommit.isDeleted() == false : "The last commit must not be deleted";
86+
final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
8187

82-
public SnapshotDeletionPolicy getIndexDeletionPolicy() {
83-
return indexDeletionPolicy;
88+
assert minRequiredGen <= lastGen : "minRequiredGen must not be greater than lastGen";
89+
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen);
90+
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);
8491
}
8592

86-
public TranslogDeletionPolicy getTranslogDeletionPolicy() {
87-
return translogDeletionPolicy;
93+
/**
94+
* Find the highest index position of a safe index commit whose max sequence number is not greater than the global checkpoint.
95+
* Index commits with different translog UUID will be filtered out as they don't belong to this engine.
96+
*/
97+
private int indexOfKeptCommits(List<? extends IndexCommit> commits) throws IOException {
98+
final long currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong();
99+
final String expectedTranslogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);
100+
101+
// Commits are sorted by age (the 0th one is the oldest commit).
102+
for (int i = commits.size() - 1; i >= 0; i--) {
103+
final Map<String, String> commitUserData = commits.get(i).getUserData();
104+
// Ignore index commits with different translog uuid.
105+
if (expectedTranslogUUID.equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) {
106+
return i + 1;
107+
}
108+
// 5.x commits do not contain MAX_SEQ_NO.
109+
if (commitUserData.containsKey(SequenceNumbers.MAX_SEQ_NO) == false) {
110+
return i;
111+
}
112+
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
113+
if (maxSeqNoFromCommit <= currentGlobalCheckpoint) {
114+
return i;
115+
}
116+
}
117+
/*
118+
* We may reach to this point in these cases:
119+
* 1. In the previous 6.x, we keep only the last commit - which is likely not a safe commit if writes are in progress.
120+
* Thus, after upgrading, we may not find a safe commit until we can reserve one.
121+
* 2. In peer-recovery, if the file-based happens, a replica will be received the latest commit from a primary.
122+
* However, that commit may not be a safe commit if writes are in progress in the primary.
123+
*/
124+
return 0;
88125
}
89126
}

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.lucene.index.IndexReader;
2626
import org.apache.lucene.index.IndexWriter;
2727
import org.apache.lucene.index.IndexWriterConfig;
28-
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
2928
import org.apache.lucene.index.LeafReader;
3029
import org.apache.lucene.index.LiveIndexWriterConfig;
3130
import org.apache.lucene.index.MergePolicy;
@@ -128,7 +127,7 @@ public class InternalEngine extends Engine {
128127

129128
private final String uidField;
130129

131-
private final CombinedDeletionPolicy deletionPolicy;
130+
private final SnapshotDeletionPolicy snapshotDeletionPolicy;
132131

133132
// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
134133
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
@@ -167,8 +166,6 @@ public InternalEngine(EngineConfig engineConfig) {
167166
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
168167
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
169168
);
170-
this.deletionPolicy = new CombinedDeletionPolicy(
171-
new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode);
172169
store.incRef();
173170
IndexWriter writer = null;
174171
Translog translog = null;
@@ -182,30 +179,19 @@ public InternalEngine(EngineConfig engineConfig) {
182179
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
183180
throttle = new IndexThrottle();
184181
try {
185-
final SeqNoStats seqNoStats;
186-
switch (openMode) {
187-
case OPEN_INDEX_AND_TRANSLOG:
188-
writer = createWriter(false);
189-
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
190-
seqNoStats = store.loadSeqNoStats(globalCheckpoint);
191-
break;
192-
case OPEN_INDEX_CREATE_TRANSLOG:
193-
writer = createWriter(false);
194-
seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
195-
break;
196-
case CREATE_INDEX_AND_TRANSLOG:
197-
writer = createWriter(true);
198-
seqNoStats = new SeqNoStats(
199-
SequenceNumbers.NO_OPS_PERFORMED,
200-
SequenceNumbers.NO_OPS_PERFORMED,
201-
SequenceNumbers.UNASSIGNED_SEQ_NO);
202-
break;
203-
default:
204-
throw new IllegalArgumentException(openMode.toString());
205-
}
182+
final SeqNoStats seqNoStats = loadSeqNoStats(openMode);
206183
logger.trace("recovered [{}]", seqNoStats);
207-
seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
184+
this.seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
185+
this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(
186+
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, seqNoService::getGlobalCheckpoint)
187+
);
188+
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
208189
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
190+
assert engineConfig.getForceNewHistoryUUID() == false
191+
|| openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG
192+
|| openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
193+
: "OpenMode must be either CREATE_INDEX_AND_TRANSLOG or OPEN_INDEX_CREATE_TRANSLOG if forceNewHistoryUUID; " +
194+
"openMode [" + openMode + "], forceNewHistoryUUID [" + engineConfig.getForceNewHistoryUUID() + "]";
209195
historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID());
210196
Objects.requireNonNull(historyUUID, "history uuid should not be null");
211197
indexWriter = writer;
@@ -380,6 +366,23 @@ static SequenceNumbersService sequenceNumberService(
380366
seqNoStats.getGlobalCheckpoint());
381367
}
382368

369+
private SeqNoStats loadSeqNoStats(EngineConfig.OpenMode openMode) throws IOException {
370+
switch (openMode) {
371+
case OPEN_INDEX_AND_TRANSLOG:
372+
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
373+
return store.loadSeqNoStats(globalCheckpoint);
374+
case OPEN_INDEX_CREATE_TRANSLOG:
375+
return store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
376+
case CREATE_INDEX_AND_TRANSLOG:
377+
return new SeqNoStats(
378+
SequenceNumbers.NO_OPS_PERFORMED,
379+
SequenceNumbers.NO_OPS_PERFORMED,
380+
SequenceNumbers.UNASSIGNED_SEQ_NO);
381+
default:
382+
throw new IllegalArgumentException(openMode.toString());
383+
}
384+
}
385+
383386
@Override
384387
public InternalEngine recoverFromTranslog() throws IOException {
385388
flushLock.lock();
@@ -1607,7 +1610,7 @@ public IndexCommitRef acquireIndexCommit(final boolean flushFirst) throws Engine
16071610
}
16081611
try (ReleasableLock lock = readLock.acquire()) {
16091612
logger.trace("pulling snapshot");
1610-
return new IndexCommitRef(deletionPolicy.getIndexDeletionPolicy());
1613+
return new IndexCommitRef(snapshotDeletionPolicy);
16111614
} catch (IOException e) {
16121615
throw new SnapshotFailedEngineException(shardId, e);
16131616
}
@@ -1788,7 +1791,7 @@ private IndexWriterConfig getIndexWriterConfig(boolean create) {
17881791
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
17891792
iwc.setCommitOnClose(false); // we by default don't commit on close
17901793
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
1791-
iwc.setIndexDeletionPolicy(deletionPolicy);
1794+
iwc.setIndexDeletionPolicy(snapshotDeletionPolicy);
17921795
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
17931796
boolean verbose = false;
17941797
try {

core/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,14 +372,14 @@ long getMinFileGeneration() {
372372
* Returns the number of operations in the translog files that aren't committed to lucene.
373373
*/
374374
public int uncommittedOperations() {
375-
return totalOperations(deletionPolicy.getMinTranslogGenerationForRecovery());
375+
return totalOperations(deletionPolicy.getTranslogGenerationOfLastCommit());
376376
}
377377

378378
/**
379379
* Returns the size in bytes of the translog files that aren't committed to lucene.
380380
*/
381381
public long uncommittedSizeInBytes() {
382-
return sizeInBytesByMinGen(deletionPolicy.getMinTranslogGenerationForRecovery());
382+
return sizeInBytesByMinGen(deletionPolicy.getTranslogGenerationOfLastCommit());
383383
}
384384

385385
/**

core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ public void assertNoOpenTranslogRefs() {
5454
*/
5555
private long minTranslogGenerationForRecovery = 1;
5656

57+
/**
58+
* This translog generation is used to calculate the number of uncommitted operations since the last index commit.
59+
*/
60+
private long translogGenerationOfLastCommit = 1;
61+
5762
private long retentionSizeInBytes;
5863

5964
private long retentionAgeInMillis;
@@ -69,13 +74,24 @@ public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMill
6974
}
7075

7176
public synchronized void setMinTranslogGenerationForRecovery(long newGen) {
72-
if (newGen < minTranslogGenerationForRecovery) {
73-
throw new IllegalArgumentException("minTranslogGenerationForRecovery can't go backwards. new [" + newGen + "] current [" +
74-
minTranslogGenerationForRecovery + "]");
77+
if (newGen < minTranslogGenerationForRecovery || newGen > translogGenerationOfLastCommit) {
78+
throw new IllegalArgumentException("Invalid minTranslogGenerationForRecovery can't go backwards; new [" + newGen + "]," +
79+
"current [" + minTranslogGenerationForRecovery + "], lastGen [" + translogGenerationOfLastCommit + "]");
7580
}
7681
minTranslogGenerationForRecovery = newGen;
7782
}
7883

84+
/**
85+
* Sets the translog generation of the last index commit.
86+
*/
87+
public synchronized void setTranslogGenerationOfLastCommit(long lastGen) {
88+
if (lastGen < translogGenerationOfLastCommit || lastGen < minTranslogGenerationForRecovery) {
89+
throw new IllegalArgumentException("Invalid translogGenerationOfLastCommit; new [" + lastGen + "]," +
90+
"current [" + translogGenerationOfLastCommit + "], minRequiredGen [" + minTranslogGenerationForRecovery + "]");
91+
}
92+
translogGenerationOfLastCommit = lastGen;
93+
}
94+
7995
public synchronized void setRetentionSizeInBytes(long bytes) {
8096
retentionSizeInBytes = bytes;
8197
}
@@ -193,6 +209,14 @@ public synchronized long getMinTranslogGenerationForRecovery() {
193209
return minTranslogGenerationForRecovery;
194210
}
195211

212+
/**
213+
* Returns a translog generation that will be used to calculate the number of uncommitted operations since the last index commit.
214+
* See {@link Translog#uncommittedOperations()} and {@link Translog#uncommittedSizeInBytes()}
215+
*/
216+
public synchronized long getTranslogGenerationOfLastCommit() {
217+
return translogGenerationOfLastCommit;
218+
}
219+
196220
synchronized long getTranslogRefCount(long gen) {
197221
final Counter counter = translogRefCounts.get(gen);
198222
return counter == null ? 0 : counter.get();

0 commit comments

Comments
 (0)