Skip to content

Commit 8793ebc

Browse files
authored
Limit num hits when reading Lucene changes (#30908)
Today we don't limit the number of hits when reading changes from Lucene index. If the index and the requesting seq# range both are large, the searcher may consume a huge amount of memory. This commit uses a fixed size batch with search_after to avoid the problem.
1 parent f25ee25 commit 8793ebc

File tree

3 files changed

+42
-26
lines changed

3 files changed

+42
-26
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2349,6 +2349,7 @@ long getNumDocUpdates() {
23492349
return numDocUpdates.count();
23502350
}
23512351

2352+
@Override
23522353
public Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService mapperService,
23532354
long minSeqNo, long maxSeqNo, boolean requiredFullRange) throws IOException {
23542355
// TODO: Should we defer the refresh until we really need it?
@@ -2358,7 +2359,8 @@ public Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService m
23582359
}
23592360
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
23602361
try {
2361-
LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, minSeqNo, maxSeqNo, requiredFullRange);
2362+
LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(
2363+
searcher, mapperService, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, minSeqNo, maxSeqNo, requiredFullRange);
23622364
searcher = null;
23632365
return snapshot;
23642366
} finally {

server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,10 @@
2525
import org.apache.lucene.index.NumericDocValues;
2626
import org.apache.lucene.index.ReaderUtil;
2727
import org.apache.lucene.index.Term;
28-
import org.apache.lucene.search.BooleanClause;
29-
import org.apache.lucene.search.BooleanQuery;
3028
import org.apache.lucene.search.DocIdSetIterator;
31-
import org.apache.lucene.search.DocValuesFieldExistsQuery;
3229
import org.apache.lucene.search.IndexSearcher;
3330
import org.apache.lucene.search.Query;
31+
import org.apache.lucene.search.ScoreDoc;
3432
import org.apache.lucene.search.Sort;
3533
import org.apache.lucene.search.SortField;
3634
import org.apache.lucene.search.SortedNumericSortField;
@@ -55,6 +53,9 @@
5553
* A {@link Translog.Snapshot} from changes in a Lucene index
5654
*/
5755
final class LuceneChangesSnapshot implements Translog.Snapshot {
56+
static final int DEFAULT_BATCH_SIZE = 1024;
57+
58+
private final int searchBatchSize;
5859
private final long fromSeqNo, toSeqNo;
5960
private long lastSeenSeqNo;
6061
private int skippedOperations;
@@ -63,7 +64,8 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
6364
private final IndexSearcher indexSearcher;
6465
private final MapperService mapperService;
6566
private int docIndex = 0;
66-
private final TopDocs topDocs;
67+
private final int totalHits;
68+
private ScoreDoc[] scoreDocs;
6769

6870
private final Closeable onClose;
6971
private final CombinedDocValues[] docValues; // Cache of DocValues
@@ -73,23 +75,30 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
7375
*
7476
* @param engineSearcher the internal engine searcher which will be taken over if the snapshot is opened successfully
7577
* @param mapperService the mapper service which will be mainly used to resolve the document's type and uid
78+
* @param searchBatchSize the number of documents should be returned by each search
7679
* @param fromSeqNo the min requesting seq# - inclusive
7780
* @param toSeqNo the maximum requesting seq# - inclusive
7881
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
7982
*/
80-
LuceneChangesSnapshot(Engine.Searcher engineSearcher, MapperService mapperService,
83+
LuceneChangesSnapshot(Engine.Searcher engineSearcher, MapperService mapperService, int searchBatchSize,
8184
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
8285
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
8386
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
8487
}
88+
if (searchBatchSize < 0) {
89+
throw new IllegalArgumentException("Search_batch_size must not be negative [" + searchBatchSize + "]");
90+
}
8591
this.mapperService = mapperService;
92+
this.searchBatchSize = searchBatchSize;
8693
this.fromSeqNo = fromSeqNo;
8794
this.toSeqNo = toSeqNo;
8895
this.lastSeenSeqNo = fromSeqNo - 1;
8996
this.requiredFullRange = requiredFullRange;
9097
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
9198
this.indexSearcher.setQueryCache(null);
92-
this.topDocs = searchOperations(indexSearcher);
99+
final TopDocs topDocs = searchOperations(null);
100+
this.totalHits = Math.toIntExact(topDocs.totalHits);
101+
this.scoreDocs = topDocs.scoreDocs;
93102
final List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
94103
this.docValues = new CombinedDocValues[leaves.size()];
95104
for (LeafReaderContext leaf : leaves) {
@@ -105,7 +114,7 @@ public void close() throws IOException {
105114

106115
@Override
107116
public int totalOperations() {
108-
return Math.toIntExact(topDocs.totalHits);
117+
return totalHits;
109118
}
110119

111120
@Override
@@ -146,28 +155,28 @@ private void rangeCheck(Translog.Operation op) {
146155
}
147156
}
148157

149-
private int nextDocId() {
150-
if (docIndex < topDocs.scoreDocs.length) {
151-
final int docId = topDocs.scoreDocs[docIndex].doc;
158+
private int nextDocId() throws IOException {
159+
// we have processed all docs in the current search - fetch the next batch
160+
if (docIndex == scoreDocs.length && docIndex > 0) {
161+
final ScoreDoc prev = scoreDocs[scoreDocs.length - 1];
162+
scoreDocs = searchOperations(prev).scoreDocs;
163+
docIndex = 0;
164+
}
165+
if (docIndex < scoreDocs.length) {
166+
int docId = scoreDocs[docIndex].doc;
152167
docIndex++;
153168
return docId;
154-
} else {
155-
return DocIdSetIterator.NO_MORE_DOCS;
156169
}
170+
return DocIdSetIterator.NO_MORE_DOCS;
157171
}
158172

159-
private TopDocs searchOperations(IndexSearcher searcher) throws IOException {
160-
final Query rangeQuery = new BooleanQuery.Builder()
161-
.add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.FILTER)
162-
.add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.FILTER)
163-
.build();
173+
private TopDocs searchOperations(ScoreDoc after) throws IOException {
174+
final Query rangeQuery = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo);
164175
final Sort sortedBySeqNoThenByTerm = new Sort(
165176
new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG),
166177
new SortedNumericSortField(SeqNoFieldMapper.PRIMARY_TERM_NAME, SortField.Type.LONG, true)
167178
);
168-
// norelease - limits the number of hits
169-
final long numHits = Math.min((toSeqNo + 1 - fromSeqNo) * 2, Integer.MAX_VALUE - 1);
170-
return searcher.search(rangeQuery, Math.toIntExact(numHits), sortedBySeqNoThenByTerm);
179+
return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNoThenByTerm);
171180
}
172181

173182
private Translog.Operation readDocAsOp(int docID) throws IOException {

server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,17 @@ public void testBasics() throws Exception {
8585
toSeqNo = randomLongBetween(fromSeqNo, numOps * 2);
8686

8787
Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
88-
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, false)) {
88+
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
89+
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false)) {
8990
searcher = null;
9091
assertThat(snapshot, SnapshotMatchers.size(0));
9192
} finally {
9293
IOUtils.close(searcher);
9394
}
9495

9596
searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
96-
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, true)) {
97+
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
98+
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) {
9799
searcher = null;
98100
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
99101
assertThat(error.getMessage(),
@@ -105,14 +107,16 @@ public void testBasics() throws Exception {
105107
fromSeqNo = randomLongBetween(0, refreshedSeqNo);
106108
toSeqNo = randomLongBetween(refreshedSeqNo + 1, numOps * 2);
107109
Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
108-
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, false)) {
110+
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
111+
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false)) {
109112
searcher = null;
110113
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, refreshedSeqNo));
111114
}finally {
112115
IOUtils.close(searcher);
113116
}
114117
searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
115-
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, true)) {
118+
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
119+
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) {
116120
searcher = null;
117121
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
118122
assertThat(error.getMessage(),
@@ -122,7 +126,8 @@ public void testBasics() throws Exception {
122126
}
123127
toSeqNo = randomLongBetween(fromSeqNo, refreshedSeqNo);
124128
searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
125-
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, true)) {
129+
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
130+
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) {
126131
searcher = null;
127132
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo));
128133
}finally {

0 commit comments

Comments
 (0)