Skip to content

Commit 4ed7907

Browse files
committed
fix refresh
1 parent 4c73e52 commit 4ed7907

File tree

2 files changed

+42
-3
lines changed

2 files changed

+42
-3
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1420,6 +1420,7 @@ final void refresh(String source, SearcherScope scope) throws EngineException {
14201420
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
14211421
// both refresh types will result in an internal refresh but only the external will also
14221422
// pass the new reader reference to the external reader manager.
1423+
final long localCheckpointBeforeRefresh = getLocalCheckpoint();
14231424

14241425
// this will also cause version map ram to be freed hence we always account for it.
14251426
final long bytes = indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
@@ -1445,6 +1446,7 @@ final void refresh(String source, SearcherScope scope) throws EngineException {
14451446
} finally {
14461447
store.decRef();
14471448
}
1449+
lastRefreshedCheckpointListener.updateRefreshedCheckpoint(localCheckpointBeforeRefresh);
14481450
}
14491451
} catch (AlreadyClosedException e) {
14501452
failOnTragicEvent(e);
@@ -1459,7 +1461,8 @@ final void refresh(String source, SearcherScope scope) throws EngineException {
14591461
} finally {
14601462
writingBytes.addAndGet(-bytes);
14611463
}
1462-
1464+
assert lastRefreshedCheckpoint() >= localCheckpointBeforeRefresh : "refresh checkpoint was not advanced; " +
1465+
"local_checkpoint=" + localCheckpointBeforeRefresh + " refresh_checkpoint=" + lastRefreshedCheckpoint();
14631466
// TODO: maybe we should just put a scheduled job in threadPool?
14641467
// We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
14651468
// for a long time:
@@ -2526,21 +2529,31 @@ public long softUpdateDocuments(Term term, Iterable<? extends Iterable<? extends
25262529
final long lastRefreshedCheckpoint() {
25272530
return lastRefreshedCheckpointListener.refreshedCheckpoint.get();
25282531
}
2532+
25292533
private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener {
25302534
final AtomicLong refreshedCheckpoint;
25312535
private long pendingCheckpoint;
2536+
25322537
LastRefreshedCheckpointListener(long initialLocalCheckpoint) {
25332538
this.refreshedCheckpoint = new AtomicLong(initialLocalCheckpoint);
25342539
}
2540+
25352541
@Override
25362542
public void beforeRefresh() {
2537-
pendingCheckpoint = localCheckpointTracker.getCheckpoint(); // All change until this point should be visible after refresh
2543+
// all changes until this point should be visible after refresh
2544+
pendingCheckpoint = localCheckpointTracker.getCheckpoint();
25382545
}
2546+
25392547
@Override
25402548
public void afterRefresh(boolean didRefresh) {
25412549
if (didRefresh) {
2542-
refreshedCheckpoint.set(pendingCheckpoint);
2550+
updateRefreshedCheckpoint(pendingCheckpoint);
25432551
}
25442552
}
2553+
2554+
void updateRefreshedCheckpoint(long checkpoint) {
2555+
refreshedCheckpoint.updateAndGet(curr -> Math.max(curr, checkpoint));
2556+
assert refreshedCheckpoint.get() >= checkpoint : refreshedCheckpoint.get() + " < " + checkpoint;
2557+
}
25452558
}
25462559
}

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4994,6 +4994,32 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException {
49944994
}
49954995
}
49964996

4997+
public void testLastRefreshCheckpoint() throws Exception {
4998+
AtomicBoolean done = new AtomicBoolean();
4999+
Thread[] refreshThreads = new Thread[between(1, 8)];
5000+
CountDownLatch latch = new CountDownLatch(refreshThreads.length);
5001+
for (int i = 0; i < refreshThreads.length; i++) {
5002+
latch.countDown();
5003+
refreshThreads[i] = new Thread(() -> {
5004+
while (done.get() == false) {
5005+
long checkPointBeforeRefresh = engine.getLocalCheckpoint();
5006+
engine.refresh("test", randomFrom(Engine.SearcherScope.values()));
5007+
assertThat(engine.lastRefreshedCheckpoint(), greaterThanOrEqualTo(checkPointBeforeRefresh));
5008+
}
5009+
});
5010+
refreshThreads[i].start();
5011+
}
5012+
latch.await();
5013+
List<Engine.Operation> ops = generateSingleDocHistory(true, VersionType.EXTERNAL, 1, 10, 1000, "1");
5014+
concurrentlyApplyOps(ops, engine);
5015+
done.set(true);
5016+
for (Thread thread : refreshThreads) {
5017+
thread.join();
5018+
}
5019+
engine.refresh("test");
5020+
assertThat(engine.lastRefreshedCheckpoint(), equalTo(engine.getLocalCheckpoint()));
5021+
}
5022+
49975023
private static void trimUnsafeCommits(EngineConfig config) throws IOException {
49985024
final Store store = config.getStore();
49995025
final TranslogConfig translogConfig = config.getTranslogConfig();

0 commit comments

Comments
 (0)