Skip to content

Commit 8552c5e

Browse files
committed
Update translog policy before the next safe commit (#54839)
IndexShardIT#testMaybeFlush relies on the assumption that the safe commit and translog deletion policy have advanced after IndexShard#sync returns . This assumption does not hold if there's a race with the global checkpoint sync. Closes #52223
1 parent 2573ba9 commit 8552c5e

File tree

2 files changed

+44
-5
lines changed

2 files changed

+44
-5
lines changed

server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,17 @@ public void onCommit(List<? extends IndexCommit> commits) throws IOException {
8484
this.safeCommitInfo = SafeCommitInfo.EMPTY;
8585
this.lastCommit = commits.get(commits.size() - 1);
8686
this.safeCommit = commits.get(keptPosition);
87-
if (keptPosition == commits.size() - 1) {
88-
this.maxSeqNoOfNextSafeCommit = Long.MAX_VALUE;
89-
} else {
90-
this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
91-
}
9287
for (int i = 0; i < keptPosition; i++) {
9388
if (snapshottedCommits.containsKey(commits.get(i)) == false) {
9489
deleteCommit(commits.get(i));
9590
}
9691
}
9792
updateRetentionPolicy();
93+
if (keptPosition == commits.size() - 1) {
94+
this.maxSeqNoOfNextSafeCommit = Long.MAX_VALUE;
95+
} else {
96+
this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
97+
}
9898
safeCommit = this.safeCommit;
9999
}
100100

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1181,6 +1181,45 @@ public void testCommitAdvancesMinTranslogForRecovery() throws IOException {
11811181
assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(5L));
11821182
}
11831183

1184+
public void testSyncTranslogConcurrently() throws Exception {
1185+
IOUtils.close(engine, store);
1186+
final Path translogPath = createTempDir();
1187+
store = createStore();
1188+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
1189+
engine = createEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get));
1190+
List<Engine.Operation> ops = generateHistoryOnReplica(between(1, 50), false, randomBoolean(), randomBoolean());
1191+
applyOperations(engine, ops);
1192+
engine.flush(true, true);
1193+
final CheckedRunnable<IOException> checker = () -> {
1194+
assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(0));
1195+
assertThat(engine.getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint.get()));
1196+
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) {
1197+
SequenceNumbers.CommitInfo commitInfo =
1198+
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getIndexCommit().getUserData().entrySet());
1199+
assertThat(commitInfo.localCheckpoint, equalTo(engine.getProcessedLocalCheckpoint()));
1200+
}
1201+
};
1202+
final Thread[] threads = new Thread[randomIntBetween(2, 4)];
1203+
final Phaser phaser = new Phaser(threads.length);
1204+
globalCheckpoint.set(engine.getProcessedLocalCheckpoint());
1205+
for (int i = 0; i < threads.length; i++) {
1206+
threads[i] = new Thread(() -> {
1207+
phaser.arriveAndAwaitAdvance();
1208+
try {
1209+
engine.syncTranslog();
1210+
checker.run();
1211+
} catch (IOException e) {
1212+
throw new AssertionError(e);
1213+
}
1214+
});
1215+
threads[i].start();
1216+
}
1217+
for (Thread thread : threads) {
1218+
thread.join();
1219+
}
1220+
checker.run();
1221+
}
1222+
11841223
public void testSyncedFlush() throws IOException {
11851224
try (Store store = createStore();
11861225
Engine engine = createEngine(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null)) {

0 commit comments

Comments
 (0)