Skip to content

Commit df93b31

Browse files
authored
Faster sequential access for stored fields (#62509) (#62573)
Faster sequential access for stored fields Spinoff of #61806 Today retrieving stored fields at search time is optimized for random access. So we make no effort to keep state in order to not decompress the same data multiple times because two documents might be in the same compressed block. This strategy is acceptable when retrieving a top N sorted by score since there is no guarantee that documents will be on the same block. However, we have some use cases where the document to retrieve might be completely sequential: Scrolls or normal search sorted by document id. Queries on Runtime fields that extract from _source. This commit exposes a sequential stored fields reader in the custom leaf reader that we use at search time. That allows to leverage the merge instances of stored fields readers that are optimized for sequential access. This change focuses on the fetch phase for now and leverages the merge instances for stored fields only if all documents to retrieve are adjacent. Applying the same logic in the source lookup of runtime fields should be trivial but will be done in a follow up. The speedup on queries sorted by doc id is significant. I played with the scroll task of the http_logs rally track on my laptop and had the following result: | Metric | Task | Baseline | Contender | Diff | Unit | |--------------------------------------------------------------:|-------:|------------:|------------:|---------:|--------:| | Total Young Gen GC | | 0.199 | 0.231 | 0.032 | s | | Total Old Gen GC | | 0 | 0 | 0 | s | | Store size | | 17.9704 | 17.9704 | 0 | GB | | Translog size | | 2.04891e-06 | 2.04891e-06 | 0 | GB | | Heap used for segments | | 0.820332 | 0.820332 | 0 | MB | | Heap used for doc values | | 0.113979 | 0.113979 | 0 | MB | | Heap used for terms | | 0.37973 | 0.37973 | 0 | MB | | Heap used for norms | | 0.03302 | 0.03302 | 0 | MB | | Heap used for points | | 0 | 0 | 0 | MB | | Heap used for stored fields | | 0.293602 | 0.293602 | 0 | MB | | Segment count | | 541 | 541 | 0 | | | Min Throughput | scroll | 12.7872 | 12.8747 | 0.08758 | pages/s | | Median Throughput | scroll | 12.9679 | 13.0556 | 0.08776 | pages/s | | Max Throughput | scroll | 13.4001 | 13.5705 | 0.17046 | pages/s | | 50th percentile latency | scroll | 524.966 | 251.396 | -273.57 | ms | | 90th percentile latency | scroll | 577.593 | 271.066 | -306.527 | ms | | 100th percentile latency | scroll | 664.73 | 272.734 | -391.997 | ms | | 50th percentile service time | scroll | 522.387 | 248.776 | -273.612 | ms | | 90th percentile service time | scroll | 573.118 | 267.79 | -305.328 | ms | | 100th percentile service time | scroll | 660.642 | 268.963 | -391.678 | ms | | error rate | scroll | 0 | 0 | 0 | % | Closes #62024
1 parent c4d80ab commit df93b31

File tree

13 files changed

+398
-63
lines changed

13 files changed

+398
-63
lines changed

server/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOExc
6262
}
6363

6464
/**
65-
* Wraps the given reader in a {@link org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader} as
66-
* well as all it's sub-readers in {@link org.elasticsearch.common.lucene.index.ElasticsearchLeafReader} to
65+
* Wraps the given reader in a {@link ElasticsearchDirectoryReader} as
66+
* well as all it's sub-readers in {@link ElasticsearchLeafReader} to
6767
* expose the given shard Id.
6868
*
6969
* @param reader the reader to wrap

server/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchLeafReader.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.common.lucene.index;
2020

21+
import org.apache.lucene.codecs.StoredFieldsReader;
2122
import org.apache.lucene.index.FilterLeafReader;
2223
import org.apache.lucene.index.LeafReader;
2324
import org.elasticsearch.index.shard.ShardId;
@@ -26,7 +27,7 @@
2627
* A {@link org.apache.lucene.index.FilterLeafReader} that exposes
2728
* Elasticsearch internal per shard / index information like the shard ID.
2829
*/
29-
public final class ElasticsearchLeafReader extends FilterLeafReader {
30+
public final class ElasticsearchLeafReader extends SequentialStoredFieldsLeafReader {
3031

3132
private final ShardId shardId;
3233

@@ -72,4 +73,9 @@ public static ElasticsearchLeafReader getElasticsearchLeafReader(LeafReader read
7273
}
7374
return null;
7475
}
76+
77+
@Override
78+
protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) {
79+
return reader;
80+
}
7581
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.common.lucene.index;
21+
22+
import org.apache.lucene.codecs.StoredFieldsReader;
23+
import org.apache.lucene.index.CodecReader;
24+
import org.apache.lucene.index.FilterLeafReader;
25+
import org.apache.lucene.index.LeafReader;
26+
27+
import java.io.IOException;
28+
29+
/**
30+
* A {@link FilterLeafReader} that exposes a {@link StoredFieldsReader}
31+
* optimized for sequential access. This class should be used by custom
32+
* {@link FilterLeafReader} that are used at search time in order to
33+
* leverage sequential access when retrieving stored fields in queries,
34+
* aggregations or during the fetch phase.
35+
*/
36+
public abstract class SequentialStoredFieldsLeafReader extends FilterLeafReader {
37+
/**
38+
* <p>Construct a StoredFieldsFilterLeafReader based on the specified base reader.
39+
* <p>Note that base reader is closed if this FilterLeafReader is closed.</p>
40+
*
41+
* @param in specified base reader.
42+
*/
43+
public SequentialStoredFieldsLeafReader(LeafReader in) {
44+
super(in);
45+
}
46+
47+
/**
48+
* Implementations should return a {@link StoredFieldsReader} that wraps the provided <code>reader</code>
49+
* that is optimized for sequential access (adjacent doc ids).
50+
*/
51+
protected abstract StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader);
52+
53+
/**
54+
* Returns a {@link StoredFieldsReader} optimized for sequential access (adjacent doc ids).
55+
*/
56+
public StoredFieldsReader getSequentialStoredFieldsReader() throws IOException {
57+
if (in instanceof CodecReader) {
58+
CodecReader reader = (CodecReader) in;
59+
return doGetSequentialStoredFieldsReader(reader.getFieldsReader().getMergeInstance());
60+
} else if (in instanceof SequentialStoredFieldsLeafReader) {
61+
SequentialStoredFieldsLeafReader reader = (SequentialStoredFieldsLeafReader) in;
62+
return doGetSequentialStoredFieldsReader(reader.getSequentialStoredFieldsReader());
63+
} else {
64+
throw new IOException("requires a CodecReader or a SequentialStoredFieldsLeafReader, got " + in.getClass());
65+
}
66+
}
67+
68+
}

server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@
3131
import org.apache.lucene.search.Weight;
3232
import org.apache.lucene.util.BitSet;
3333
import org.elasticsearch.Version;
34+
import org.elasticsearch.common.CheckedBiConsumer;
3435
import org.elasticsearch.common.bytes.BytesReference;
3536
import org.elasticsearch.common.collect.Tuple;
3637
import org.elasticsearch.common.document.DocumentField;
38+
import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
3739
import org.elasticsearch.common.lucene.search.Queries;
3840
import org.elasticsearch.common.text.Text;
3941
import org.elasticsearch.common.xcontent.XContentHelper;
@@ -104,6 +106,7 @@ public void execute(SearchContext context) {
104106
for (int index = 0; index < context.docIdsToLoadSize(); index++) {
105107
docs[index] = new DocIdToIndex(context.docIdsToLoad()[context.docIdsToLoadFrom() + index], index);
106108
}
109+
// make sure that we iterate in doc id order
107110
Arrays.sort(docs);
108111

109112
Map<String, Set<String>> storedToRequestedFields = new HashMap<>();
@@ -118,6 +121,8 @@ public void execute(SearchContext context) {
118121

119122
int currentReaderIndex = -1;
120123
LeafReaderContext currentReaderContext = null;
124+
CheckedBiConsumer<Integer, FieldsVisitor, IOException> fieldReader = null;
125+
boolean hasSequentialDocs = hasSequentialDocs(docs);
121126
for (int index = 0; index < context.docIdsToLoadSize(); index++) {
122127
if (context.isCancelled()) {
123128
throw new TaskCancelledException("cancelled");
@@ -128,6 +133,17 @@ public void execute(SearchContext context) {
128133
if (currentReaderIndex != readerIndex) {
129134
currentReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);
130135
currentReaderIndex = readerIndex;
136+
if (currentReaderContext.reader() instanceof SequentialStoredFieldsLeafReader
137+
&& hasSequentialDocs && docs.length >= 10) {
138+
// All the docs to fetch are adjacent but Lucene stored fields are optimized
139+
// for random access and don't optimize for sequential access - except for merging.
140+
// So we do a little hack here and pretend we're going to do merges in order to
141+
// get better sequential access.
142+
SequentialStoredFieldsLeafReader lf = (SequentialStoredFieldsLeafReader) currentReaderContext.reader();
143+
fieldReader = lf.getSequentialStoredFieldsReader()::visitDocument;
144+
} else {
145+
fieldReader = currentReaderContext.reader()::document;
146+
}
131147
for (FetchSubPhaseProcessor processor : processors) {
132148
processor.setNextReader(currentReaderContext);
133149
}
@@ -140,6 +156,7 @@ public void execute(SearchContext context) {
140156
docId,
141157
storedToRequestedFields,
142158
currentReaderContext,
159+
fieldReader,
143160
sharedCache
144161
);
145162
for (FetchSubPhaseProcessor processor : processors) {
@@ -253,9 +270,14 @@ private int findRootDocumentIfNested(SearchContext context, LeafReaderContext su
253270
return -1;
254271
}
255272

256-
private HitContext prepareHitContext(SearchContext context, SearchLookup lookup, FieldsVisitor fieldsVisitor, int docId,
273+
private HitContext prepareHitContext(SearchContext context,
274+
SearchLookup lookup,
275+
FieldsVisitor fieldsVisitor,
276+
int docId,
257277
Map<String, Set<String>> storedToRequestedFields,
258-
LeafReaderContext subReaderContext, Map<String, Object> sharedCache) throws IOException {
278+
LeafReaderContext subReaderContext,
279+
CheckedBiConsumer<Integer, FieldsVisitor, IOException> storedFieldReader,
280+
Map<String, Object> sharedCache) throws IOException {
259281
int rootDocId = findRootDocumentIfNested(context, subReaderContext, docId - subReaderContext.docBase);
260282
if (rootDocId == -1) {
261283
return prepareNonNestedHitContext(
@@ -265,10 +287,12 @@ private HitContext prepareHitContext(SearchContext context, SearchLookup lookup,
265287
docId,
266288
storedToRequestedFields,
267289
subReaderContext,
290+
storedFieldReader,
268291
sharedCache
269292
);
270293
} else {
271-
return prepareNestedHitContext(context, docId, rootDocId, storedToRequestedFields, subReaderContext, sharedCache);
294+
return prepareNestedHitContext(context, docId, rootDocId, storedToRequestedFields,
295+
subReaderContext, storedFieldReader, sharedCache);
272296
}
273297
}
274298

@@ -285,6 +309,7 @@ private HitContext prepareNonNestedHitContext(SearchContext context,
285309
int docId,
286310
Map<String, Set<String>> storedToRequestedFields,
287311
LeafReaderContext subReaderContext,
312+
CheckedBiConsumer<Integer, FieldsVisitor, IOException> fieldReader,
288313
Map<String, Object> sharedCache) throws IOException {
289314
int subDocId = docId - subReaderContext.docBase;
290315
DocumentMapper documentMapper = context.mapperService().documentMapper();
@@ -295,7 +320,7 @@ private HitContext prepareNonNestedHitContext(SearchContext context,
295320
return new HitContext(hit, subReaderContext, subDocId, lookup.source(), sharedCache);
296321
} else {
297322
SearchHit hit;
298-
loadStoredFields(context.mapperService(), subReaderContext, fieldsVisitor, subDocId);
323+
loadStoredFields(context.mapperService(), fieldReader, fieldsVisitor, subDocId);
299324
Uid uid = fieldsVisitor.uid();
300325
if (fieldsVisitor.fields().isEmpty() == false) {
301326
Map<String, DocumentField> docFields = new HashMap<>();
@@ -328,6 +353,7 @@ private HitContext prepareNestedHitContext(SearchContext context,
328353
int rootDocId,
329354
Map<String, Set<String>> storedToRequestedFields,
330355
LeafReaderContext subReaderContext,
356+
CheckedBiConsumer<Integer, FieldsVisitor, IOException> storedFieldReader,
331357
Map<String, Object> sharedCache) throws IOException {
332358
// Also if highlighting is requested on nested documents we need to fetch the _source from the root document,
333359
// otherwise highlighting will attempt to fetch the _source from the nested doc, which will fail,
@@ -351,7 +377,7 @@ private HitContext prepareNestedHitContext(SearchContext context,
351377
}
352378
} else {
353379
FieldsVisitor rootFieldsVisitor = new FieldsVisitor(needSource);
354-
loadStoredFields(context.mapperService(), subReaderContext, rootFieldsVisitor, rootDocId);
380+
loadStoredFields(context.mapperService(), storedFieldReader, rootFieldsVisitor, rootDocId);
355381
rootFieldsVisitor.postProcess(context.mapperService());
356382
rootId = rootFieldsVisitor.uid();
357383

@@ -367,7 +393,7 @@ private HitContext prepareNestedHitContext(SearchContext context,
367393
Map<String, DocumentField> metaFields = emptyMap();
368394
if (context.hasStoredFields() && !context.storedFieldsContext().fieldNames().isEmpty()) {
369395
FieldsVisitor nestedFieldsVisitor = new CustomFieldsVisitor(storedToRequestedFields.keySet(), false);
370-
loadStoredFields(context.mapperService(), subReaderContext, nestedFieldsVisitor, nestedDocId);
396+
loadStoredFields(context.mapperService(), storedFieldReader, nestedFieldsVisitor, nestedDocId);
371397
if (nestedFieldsVisitor.fields().isEmpty() == false) {
372398
docFields = new HashMap<>();
373399
metaFields = new HashMap<>();
@@ -518,10 +544,10 @@ private SearchHit.NestedIdentity getInternalNestedIdentity(SearchContext context
518544
}
519545

520546
private void loadStoredFields(MapperService mapperService,
521-
LeafReaderContext readerContext,
547+
CheckedBiConsumer<Integer, FieldsVisitor, IOException> fieldReader,
522548
FieldsVisitor fieldVisitor, int docId) throws IOException {
523549
fieldVisitor.reset();
524-
readerContext.reader().document(docId, fieldVisitor);
550+
fieldReader.accept(docId, fieldVisitor);
525551
fieldVisitor.postProcess(mapperService);
526552
}
527553

@@ -548,4 +574,12 @@ private static void fillDocAndMetaFields(SearchContext context, FieldsVisitor fi
548574
}
549575
}
550576
}
577+
578+
/**
579+
* Returns <code>true</code> if the provided <code>docs</code> are
580+
* stored sequentially (Dn = Dn-1 + 1).
581+
*/
582+
static boolean hasSequentialDocs(DocIdToIndex[] docs) {
583+
return docs.length > 0 && docs[docs.length-1].docId - docs[0].docId == docs.length - 1;
584+
}
551585
}

server/src/main/java/org/elasticsearch/search/internal/ExitableDirectoryReader.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.search.internal;
2121

22+
import org.apache.lucene.codecs.StoredFieldsReader;
2223
import org.apache.lucene.index.DirectoryReader;
2324
import org.apache.lucene.index.FilterDirectoryReader;
2425
import org.apache.lucene.index.FilterLeafReader;
@@ -30,6 +31,7 @@
3031
import org.apache.lucene.search.suggest.document.CompletionTerms;
3132
import org.apache.lucene.util.BytesRef;
3233
import org.apache.lucene.util.automaton.CompiledAutomaton;
34+
import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
3335

3436
import java.io.IOException;
3537

@@ -79,7 +81,7 @@ public CacheHelper getReaderCacheHelper() {
7981
/**
8082
* Wraps a {@link FilterLeafReader} with a {@link QueryCancellation}.
8183
*/
82-
static class ExitableLeafReader extends FilterLeafReader {
84+
static class ExitableLeafReader extends SequentialStoredFieldsLeafReader {
8385

8486
private final QueryCancellation queryCancellation;
8587

@@ -119,6 +121,11 @@ public CacheHelper getCoreCacheHelper() {
119121
public CacheHelper getReaderCacheHelper() {
120122
return in.getReaderCacheHelper();
121123
}
124+
125+
@Override
126+
protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) {
127+
return reader;
128+
}
122129
}
123130

124131
/**

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.elasticsearch.common.logging.Loggers;
9292
import org.elasticsearch.common.lucene.Lucene;
9393
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
94+
import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
9495
import org.elasticsearch.common.lucene.uid.Versions;
9596
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
9697
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
@@ -6299,4 +6300,25 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception {
62996300
}
63006301
}
63016302
}
6303+
6304+
public void testProducesStoredFieldsReader() throws Exception {
6305+
// Make sure that the engine produces a SequentialStoredFieldsLeafReader.
6306+
// This is required for optimizations on SourceLookup to work, which is in-turn useful for runtime fields.
6307+
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField("test"),
6308+
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
6309+
Engine.Index operation = randomBoolean() ?
6310+
appendOnlyPrimary(doc, false, 1)
6311+
: appendOnlyReplica(doc, false, 1, randomIntBetween(0, 5));
6312+
engine.index(operation);
6313+
engine.refresh("test");
6314+
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
6315+
IndexReader reader = searcher.getIndexReader();
6316+
assertThat(reader.leaves().size(), Matchers.greaterThanOrEqualTo(1));
6317+
for (LeafReaderContext context: reader.leaves()) {
6318+
assertThat(context.reader(), Matchers.instanceOf(SequentialStoredFieldsLeafReader.class));
6319+
SequentialStoredFieldsLeafReader lf = (SequentialStoredFieldsLeafReader) context.reader();
6320+
assertNotNull(lf.getSequentialStoredFieldsReader());
6321+
}
6322+
}
6323+
}
63026324
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.search.fetch;
21+
22+
import org.elasticsearch.test.ESTestCase;
23+
24+
public class FetchPhaseTests extends ESTestCase {
25+
public void testSequentialDocs() {
26+
FetchPhase.DocIdToIndex[] docs = new FetchPhase.DocIdToIndex[10];
27+
int start = randomIntBetween(0, Short.MAX_VALUE);
28+
for (int i = 0; i < 10; i++) {
29+
docs[i] = new FetchPhase.DocIdToIndex(start, i);
30+
++ start;
31+
}
32+
assertTrue(FetchPhase.hasSequentialDocs(docs));
33+
34+
int from = randomIntBetween(0, 9);
35+
start = docs[from].docId;
36+
for (int i = from; i < 10; i++) {
37+
start += randomIntBetween(2, 10);
38+
docs[i] = new FetchPhase.DocIdToIndex(start, i);
39+
}
40+
assertFalse(FetchPhase.hasSequentialDocs(docs));
41+
}
42+
}

0 commit comments

Comments
 (0)