Skip to content

Commit 65e9007

Browse files
authored
Open engine should keep only starting commit (#28228)
Keeping unsafe commits when opening an engine can be problematic because these commits are not safe at the recovering time but they can suddenly become safe in the future. The following issues can happen if unsafe commits are kept oninit. 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1 (max_seqno=1) and an unsafe commit c2 (max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document (seqno=2) is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use the unsafe commit c2 (max_seqno=2 <= gcp=2) as the starting commit for sequenced based recovery even the commit c2 contains a stale operation and the document (with seqno=2) will not be replicated to the replica. 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when a replica with a safe commit c1 (local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2 (local_checkpoint=2, recovery_translog_gen=2). The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1 while the local checkpoint of c2 is 2. 3. Commit without translog can be used for recovery. An old index, which was created before multiple-commits is introduced (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit, the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog. These issues can be avoided if the combined deletion policy keeps only the starting commit onInit. Relates #27804 Relates #28181
1 parent 67c1f1c commit 65e9007

File tree

7 files changed

+234
-52
lines changed

7 files changed

+234
-52
lines changed

server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,37 +45,72 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
4545
private final TranslogDeletionPolicy translogDeletionPolicy;
4646
private final EngineConfig.OpenMode openMode;
4747
private final LongSupplier globalCheckpointSupplier;
48+
private final IndexCommit startingCommit;
4849
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
4950
private IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
5051
private IndexCommit lastCommit; // the most recent commit point
5152

5253
CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
53-
LongSupplier globalCheckpointSupplier) {
54+
LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) {
5455
this.openMode = openMode;
5556
this.translogDeletionPolicy = translogDeletionPolicy;
5657
this.globalCheckpointSupplier = globalCheckpointSupplier;
58+
this.startingCommit = startingCommit;
5759
this.snapshottedCommits = new ObjectIntHashMap<>();
5860
}
5961

6062
@Override
61-
public void onInit(List<? extends IndexCommit> commits) throws IOException {
63+
public synchronized void onInit(List<? extends IndexCommit> commits) throws IOException {
6264
switch (openMode) {
6365
case CREATE_INDEX_AND_TRANSLOG:
66+
assert startingCommit == null : "CREATE_INDEX_AND_TRANSLOG must not have starting commit; commit [" + startingCommit + "]";
6467
break;
6568
case OPEN_INDEX_CREATE_TRANSLOG:
66-
assert commits.isEmpty() == false : "index is opened, but we have no commits";
67-
// When an engine starts with OPEN_INDEX_CREATE_TRANSLOG, a new fresh index commit will be created immediately.
68-
// We therefore can simply skip processing here as `onCommit` will be called right after with a new commit.
69-
break;
7069
case OPEN_INDEX_AND_TRANSLOG:
7170
assert commits.isEmpty() == false : "index is opened, but we have no commits";
72-
onCommit(commits);
71+
assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; "
72+
+ "startingCommit [" + startingCommit + "], commit list [" + commits + "]";
73+
keepOnlyStartingCommitOnInit(commits);
74+
// OPEN_INDEX_CREATE_TRANSLOG can open an index commit from other shard with a different translog history,
75+
// We therefore should not use that index commit to update the translog deletion policy.
76+
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
77+
updateTranslogDeletionPolicy();
78+
}
7379
break;
7480
default:
7581
throw new IllegalArgumentException("unknown openMode [" + openMode + "]");
7682
}
7783
}
7884

85+
/**
86+
* Keeping existing unsafe commits when opening an engine can be problematic because these commits are not safe
87+
* at the recovering time but they can suddenly become safe in the future.
88+
* The following issues can happen if unsafe commits are kept oninit.
89+
* <p>
90+
* 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1(max_seqno=1)
91+
* and an unsafe commit c2(max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document(seqno=2)
92+
* is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use
93+
* the unsafe commit c2(max_seqno=2 at most gcp=2) as the starting commit for sequenced-based recovery even the
94+
* commit c2 contains a stale operation and the document(with seqno=2) will not be replicated to the replica.
95+
* <p>
96+
* 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when are replica with a safe commit
97+
* c1(local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2(local_checkpoint=2, recovery_translog_gen=2).
98+
* The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new
99+
* commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery
100+
* translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1
101+
* while the local checkpoint of c2 is 2.
102+
* <p>
103+
* 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced
104+
* (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit,
105+
* the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog.
106+
*/
107+
private void keepOnlyStartingCommitOnInit(List<? extends IndexCommit> commits) {
108+
commits.stream().filter(commit -> startingCommit.equals(commit) == false).forEach(IndexCommit::delete);
109+
assert startingCommit.isDeleted() == false : "Starting commit must not be deleted";
110+
lastCommit = startingCommit;
111+
safeCommit = startingCommit;
112+
}
113+
79114
@Override
80115
public synchronized void onCommit(List<? extends IndexCommit> commits) throws IOException {
81116
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ public InternalEngine(EngineConfig engineConfig) {
185185
"Starting commit should be non-null; mode [" + openMode + "]; startingCommit [" + startingCommit + "]";
186186
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit);
187187
this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, translogDeletionPolicy,
188-
translog::getLastSyncedGlobalCheckpoint);
188+
translog::getLastSyncedGlobalCheckpoint, startingCommit);
189189
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit);
190190
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
191191
assert engineConfig.getForceNewHistoryUUID() == false
@@ -411,28 +411,44 @@ public void skipTranslogRecovery() {
411411
}
412412

413413
private IndexCommit getStartingCommitPoint() throws IOException {
414-
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
415-
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
416-
final long minRetainedTranslogGen = translog.getMinFileGeneration();
417-
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
418-
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose full translog
419-
// files are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
420-
// To avoid this issue, we only select index commits whose translog files are fully retained.
421-
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
422-
final List<IndexCommit> recoverableCommits = new ArrayList<>();
423-
for (IndexCommit commit : existingCommits) {
424-
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
425-
recoverableCommits.add(commit);
414+
final IndexCommit startingIndexCommit;
415+
final List<IndexCommit> existingCommits;
416+
switch (openMode) {
417+
case CREATE_INDEX_AND_TRANSLOG:
418+
startingIndexCommit = null;
419+
break;
420+
case OPEN_INDEX_CREATE_TRANSLOG:
421+
// Use the last commit
422+
existingCommits = DirectoryReader.listCommits(store.directory());
423+
startingIndexCommit = existingCommits.get(existingCommits.size() - 1);
424+
break;
425+
case OPEN_INDEX_AND_TRANSLOG:
426+
// Use the safe commit
427+
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
428+
final long minRetainedTranslogGen = translog.getMinFileGeneration();
429+
existingCommits = DirectoryReader.listCommits(store.directory());
430+
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog
431+
// are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
432+
// To avoid this issue, we only select index commits whose translog are fully retained.
433+
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
434+
final List<IndexCommit> recoverableCommits = new ArrayList<>();
435+
for (IndexCommit commit : existingCommits) {
436+
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
437+
recoverableCommits.add(commit);
438+
}
426439
}
440+
assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " +
441+
"commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
442+
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
443+
} else {
444+
// TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint.
445+
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
427446
}
428-
assert recoverableCommits.isEmpty() == false : "No commit point with full translog found; " +
429-
"commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
430-
return CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
431-
} else {
432-
return CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
433-
}
447+
break;
448+
default:
449+
throw new IllegalArgumentException("unknown mode: " + openMode);
434450
}
435-
return null;
451+
return startingIndexCommit;
436452
}
437453

438454
private void recoverFromTranslogInternal() throws IOException {
@@ -557,9 +573,7 @@ private ExternalSearcherManager createSearcherManager(SearchFactory externalSear
557573
final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
558574
internalSearcherManager = new SearcherManager(directoryReader,
559575
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
560-
// The index commit from IndexWriterConfig is null if the engine is open with other modes
561-
// rather than CREATE_INDEX_AND_TRANSLOG. In those cases lastCommittedSegmentInfos will be retrieved from the last commit.
562-
lastCommittedSegmentInfos = store.readCommittedSegmentsInfo(indexWriter.getConfig().getIndexCommit());
576+
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
563577
ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager,
564578
externalSearcherFactory);
565579
success = true;

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.logging.log4j.Logger;
2424
import org.apache.logging.log4j.message.ParameterizedMessage;
2525
import org.apache.lucene.index.CheckIndex;
26+
import org.apache.lucene.index.DirectoryReader;
2627
import org.apache.lucene.index.IndexCommit;
2728
import org.apache.lucene.index.IndexOptions;
2829
import org.apache.lucene.index.LeafReaderContext;
@@ -1290,12 +1291,16 @@ public void createIndexAndTranslog() throws IOException {
12901291

12911292
/** opens the engine on top of the existing lucene engine but creates an empty translog **/
12921293
public void openIndexAndCreateTranslog(boolean forceNewHistoryUUID, long globalCheckpoint) throws IOException {
1293-
assert recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE &&
1294-
recoveryState.getRecoverySource().getType() != RecoverySource.Type.EXISTING_STORE;
1295-
SequenceNumbers.CommitInfo commitInfo = store.loadSeqNoInfo(null);
1296-
assert commitInfo.localCheckpoint >= globalCheckpoint :
1297-
"trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint ["
1294+
if (Assertions.ENABLED) {
1295+
assert recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE &&
1296+
recoveryState.getRecoverySource().getType() != RecoverySource.Type.EXISTING_STORE;
1297+
SequenceNumbers.CommitInfo commitInfo = store.loadSeqNoInfo(null);
1298+
assert commitInfo.localCheckpoint >= globalCheckpoint :
1299+
"trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint ["
12981300
+ globalCheckpoint + "]";
1301+
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
1302+
assert existingCommits.size() == 1 : "Open index create translog should have one commit, commits[" + existingCommits + "]";
1303+
}
12991304
globalCheckpointTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "opening index with a new translog");
13001305
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, forceNewHistoryUUID);
13011306
}

server/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -182,17 +182,9 @@ public Directory directory() {
182182
* @throws IOException if the index is corrupted or the segments file is not present
183183
*/
184184
public SegmentInfos readLastCommittedSegmentsInfo() throws IOException {
185-
return readCommittedSegmentsInfo(null);
186-
}
187-
188-
/**
189-
* Returns the committed segments info for the given commit point.
190-
* If the commit point is not provided, this method will return the segments info of the last commit in the store.
191-
*/
192-
public SegmentInfos readCommittedSegmentsInfo(final IndexCommit commit) throws IOException {
193185
failIfCorrupted();
194186
try {
195-
return readSegmentsInfo(commit, directory());
187+
return readSegmentsInfo(null, directory());
196188
} catch (CorruptIndexException ex) {
197189
markStoreCorrupted(ex);
198190
throw ex;

server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
5454
public void testKeepCommitsAfterGlobalCheckpoint() throws Exception {
5555
final AtomicLong globalCheckpoint = new AtomicLong();
5656
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
57-
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get);
57+
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
58+
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
5859

5960
final LongArrayList maxSeqNoList = new LongArrayList();
6061
final LongArrayList translogGenList = new LongArrayList();
@@ -93,7 +94,8 @@ public void testAcquireIndexCommit() throws Exception {
9394
final AtomicLong globalCheckpoint = new AtomicLong();
9495
final UUID translogUUID = UUID.randomUUID();
9596
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
96-
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get);
97+
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
98+
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
9799
long lastMaxSeqNo = between(1, 1000);
98100
long lastTranslogGen = between(1, 20);
99101
int safeIndex = 0;
@@ -156,11 +158,12 @@ public void testLegacyIndex() throws Exception {
156158
final UUID translogUUID = UUID.randomUUID();
157159

158160
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
159-
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get);
161+
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
162+
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
160163

161164
long legacyTranslogGen = randomNonNegativeLong();
162165
IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen);
163-
indexPolicy.onInit(singletonList(legacyCommit));
166+
indexPolicy.onCommit(singletonList(legacyCommit));
164167
verify(legacyCommit, never()).delete();
165168
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(legacyTranslogGen));
166169
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(legacyTranslogGen));
@@ -188,7 +191,8 @@ public void testLegacyIndex() throws Exception {
188191
public void testDeleteInvalidCommits() throws Exception {
189192
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
190193
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
191-
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_CREATE_TRANSLOG, translogPolicy, globalCheckpoint::get);
194+
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
195+
OPEN_INDEX_CREATE_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
192196

193197
final int invalidCommits = between(1, 10);
194198
final List<IndexCommit> commitList = new ArrayList<>();
@@ -211,6 +215,35 @@ public void testDeleteInvalidCommits() throws Exception {
211215
}
212216
}
213217

218+
/**
219+
* Keeping existing unsafe commits can be problematic because these commits are not safe at the recovering time
220+
* but they can suddenly become safe in the future. See {@link CombinedDeletionPolicy#keepOnlyStartingCommitOnInit(List)}
221+
*/
222+
public void testKeepOnlyStartingCommitOnInit() throws Exception {
223+
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
224+
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
225+
final UUID translogUUID = UUID.randomUUID();
226+
final List<IndexCommit> commitList = new ArrayList<>();
227+
int totalCommits = between(2, 20);
228+
for (int i = 0; i < totalCommits; i++) {
229+
commitList.add(mockIndexCommit(randomNonNegativeLong(), translogUUID, randomNonNegativeLong()));
230+
}
231+
final IndexCommit startingCommit = randomFrom(commitList);
232+
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
233+
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, startingCommit);
234+
indexPolicy.onInit(commitList);
235+
for (IndexCommit commit : commitList) {
236+
if (commit.equals(startingCommit) == false) {
237+
verify(commit, times(1)).delete();
238+
}
239+
}
240+
verify(startingCommit, never()).delete();
241+
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(),
242+
equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
243+
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(),
244+
equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
245+
}
246+
214247
IndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen) throws IOException {
215248
final Map<String, String> userData = new HashMap<>();
216249
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));

0 commit comments

Comments
 (0)