Skip to content

Commit 46f309d

Browse files
committed
Use IndexWriter.getFlushingBytes() rather than tracking it ourselves (#33582)
Currently we keep track of how many bytes are currently being written to disk in an AtomicLong within InternalEngine, updating it on refresh. The IndexWriter has its own accounting for this, and exposes it via a getFlushingBytes method in the latest lucene 8 snapshot. This commit removes the InternalEngine tracking in favour of just using the IndexWriter method.
1 parent b870888 commit 46f309d

File tree

3 files changed

+23
-12
lines changed

3 files changed

+23
-12
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,6 @@ public class InternalEngine extends Engine {
156156
private final SoftDeletesPolicy softDeletesPolicy;
157157
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;
158158

159-
/**
160-
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
161-
* across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
162-
* being indexed/deleted.
163-
*/
164-
private final AtomicLong writingBytes = new AtomicLong();
165159
private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false);
166160

167161
@Nullable
@@ -543,7 +537,7 @@ public String getHistoryUUID() {
543537
/** Returns how many bytes we are currently moving from indexing buffer to segments on disk */
544538
@Override
545539
public long getWritingBytes() {
546-
return writingBytes.get();
540+
return indexWriter.getFlushingBytes() + versionMap.getRefreshingBytes();
547541
}
548542

549543
/**
@@ -1517,9 +1511,6 @@ final void refresh(String source, SearcherScope scope) throws EngineException {
15171511
// pass the new reader reference to the external reader manager.
15181512
final long localCheckpointBeforeRefresh = getLocalCheckpoint();
15191513

1520-
// this will also cause version map ram to be freed hence we always account for it.
1521-
final long bytes = indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
1522-
writingBytes.addAndGet(bytes);
15231514
try (ReleasableLock lock = readLock.acquire()) {
15241515
ensureOpen();
15251516
if (store.tryIncRef()) {
@@ -1545,8 +1536,6 @@ final void refresh(String source, SearcherScope scope) throws EngineException {
15451536
e.addSuppressed(inner);
15461537
}
15471538
throw new RefreshFailedEngineException(shardId, e);
1548-
} finally {
1549-
writingBytes.addAndGet(-bytes);
15501539
}
15511540
assert lastRefreshedCheckpoint() >= localCheckpointBeforeRefresh : "refresh checkpoint was not advanced; " +
15521541
"local_checkpoint=" + localCheckpointBeforeRefresh + " refresh_checkpoint=" + lastRefreshedCheckpoint();

server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,14 @@ long ramBytesUsedForRefresh() {
434434
return maps.current.ramBytesUsed.get();
435435
}
436436

437+
/**
438+
* Returns how much RAM is current being freed up by refreshing. This is {@link #ramBytesUsed()}
439+
* except does not include tombstones because they don't clear on refresh.
440+
*/
441+
long getRefreshingBytes() {
442+
return maps.old.ramBytesUsed.get();
443+
}
444+
437445
@Override
438446
public Collection<Accountable> getChildResources() {
439447
// TODO: useful to break down RAM usage here?

server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
import static org.hamcrest.Matchers.empty;
4343
import static org.hamcrest.Matchers.equalTo;
44+
import static org.hamcrest.Matchers.greaterThan;
4445
import static org.hamcrest.Matchers.nullValue;
4546

4647
public class LiveVersionMapTests extends ESTestCase {
@@ -91,6 +92,19 @@ public void testRamBytesUsed() throws Exception {
9192
assertEquals(actualRamBytesUsed, estimatedRamBytesUsed, tolerance);
9293
}
9394

95+
public void testRefreshingBytes() throws IOException {
96+
LiveVersionMap map = new LiveVersionMap();
97+
BytesRefBuilder uid = new BytesRefBuilder();
98+
uid.copyChars(TestUtil.randomSimpleString(random(), 10, 20));
99+
try (Releasable r = map.acquireLock(uid.toBytesRef())) {
100+
map.putIndexUnderLock(uid.toBytesRef(), randomIndexVersionValue());
101+
}
102+
map.beforeRefresh();
103+
assertThat(map.getRefreshingBytes(), greaterThan(0L));
104+
map.afterRefresh(true);
105+
assertThat(map.getRefreshingBytes(), equalTo(0L));
106+
}
107+
94108
private BytesRef uid(String string) {
95109
BytesRefBuilder builder = new BytesRefBuilder();
96110
builder.copyChars(string);

0 commit comments

Comments
 (0)