Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOExc
}

/**
* Wraps the given reader in a {@link org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader} as
* well as all it's sub-readers in {@link org.elasticsearch.common.lucene.index.ElasticsearchLeafReader} to
* Wraps the given reader in a {@link ElasticsearchDirectoryReader} as
* well as all it's sub-readers in {@link ElasticsearchLeafReader} to
* expose the given shard Id.
*
* @param reader the reader to wrap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.common.lucene.index;

import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.LeafReader;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -26,7 +27,7 @@
* A {@link org.apache.lucene.index.FilterLeafReader} that exposes
* Elasticsearch internal per shard / index information like the shard ID.
*/
public final class ElasticsearchLeafReader extends FilterLeafReader {
public final class ElasticsearchLeafReader extends SequentialStoredFieldsLeafReader {

private final ShardId shardId;

Expand Down Expand Up @@ -72,4 +73,9 @@ public static ElasticsearchLeafReader getElasticsearchLeafReader(LeafReader read
}
return null;
}

@Override
protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) {
return reader;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.lucene.index;

import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.index.CodecReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.LeafReader;

import java.io.IOException;

/**
* A {@link FilterLeafReader} that exposes a {@link StoredFieldsReader}
* optimized for sequential access. This class should be used by custom
* {@link FilterLeafReader} that are used at search time in order to
* leverage sequential access when retrieving stored fields in queries,
* aggregations or during the fetch phase.
*/
public abstract class SequentialStoredFieldsLeafReader extends FilterLeafReader {
/**
* <p>Construct a StoredFieldsFilterLeafReader based on the specified base reader.
* <p>Note that base reader is closed if this FilterLeafReader is closed.</p>
*
* @param in specified base reader.
*/
public SequentialStoredFieldsLeafReader(LeafReader in) {
super(in);
}

/**
* Implementations should return a {@link StoredFieldsReader} that wraps the provided <code>reader</code>
* that is optimized for sequential access (adjacent doc ids).
*/
protected abstract StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader);

/**
* Returns a {@link StoredFieldsReader} optimized for sequential access (adjacent doc ids).
*/
public StoredFieldsReader getSequentialStoredFieldsReader() throws IOException {
if (in instanceof CodecReader) {
CodecReader reader = (CodecReader) in;
return doGetSequentialStoredFieldsReader(reader.getFieldsReader().getMergeInstance());
} else if (in instanceof SequentialStoredFieldsLeafReader) {
SequentialStoredFieldsLeafReader reader = (SequentialStoredFieldsLeafReader) in;
return doGetSequentialStoredFieldsReader(reader.getSequentialStoredFieldsReader());
} else {
throw new IOException("requires a CodecReader or a SequentialStoredFieldsLeafReader, got " + in.getClass());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.BitSet;
import org.elasticsearch.Version;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.XContentHelper;
Expand Down Expand Up @@ -104,6 +106,7 @@ public void execute(SearchContext context) {
for (int index = 0; index < context.docIdsToLoadSize(); index++) {
docs[index] = new DocIdToIndex(context.docIdsToLoad()[context.docIdsToLoadFrom() + index], index);
}
// make sure that we iterate in doc id order
Arrays.sort(docs);

Map<String, Set<String>> storedToRequestedFields = new HashMap<>();
Expand All @@ -118,6 +121,8 @@ public void execute(SearchContext context) {

int currentReaderIndex = -1;
LeafReaderContext currentReaderContext = null;
CheckedBiConsumer<Integer, FieldsVisitor, IOException> fieldReader = null;
boolean hasSequentialDocs = hasSequentialDocs(docs);
for (int index = 0; index < context.docIdsToLoadSize(); index++) {
if (context.isCancelled()) {
throw new TaskCancelledException("cancelled");
Expand All @@ -128,6 +133,17 @@ public void execute(SearchContext context) {
if (currentReaderIndex != readerIndex) {
currentReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);
currentReaderIndex = readerIndex;
if (currentReaderContext.reader() instanceof SequentialStoredFieldsLeafReader
&& hasSequentialDocs && docs.length >= 10) {
// All the docs to fetch are adjacent but Lucene stored fields are optimized
// for random access and don't optimize for sequential access - except for merging.
// So we do a little hack here and pretend we're going to do merges in order to
// get better sequential access.
SequentialStoredFieldsLeafReader lf = (SequentialStoredFieldsLeafReader) currentReaderContext.reader();
fieldReader = lf.getSequentialStoredFieldsReader()::visitDocument;
} else {
fieldReader = currentReaderContext.reader()::document;
}
for (FetchSubPhaseProcessor processor : processors) {
processor.setNextReader(currentReaderContext);
}
Expand All @@ -140,6 +156,7 @@ public void execute(SearchContext context) {
docId,
storedToRequestedFields,
currentReaderContext,
fieldReader,
sharedCache
);
for (FetchSubPhaseProcessor processor : processors) {
Expand Down Expand Up @@ -253,9 +270,14 @@ private int findRootDocumentIfNested(SearchContext context, LeafReaderContext su
return -1;
}

private HitContext prepareHitContext(SearchContext context, SearchLookup lookup, FieldsVisitor fieldsVisitor, int docId,
private HitContext prepareHitContext(SearchContext context,
SearchLookup lookup,
FieldsVisitor fieldsVisitor,
int docId,
Map<String, Set<String>> storedToRequestedFields,
LeafReaderContext subReaderContext, Map<String, Object> sharedCache) throws IOException {
LeafReaderContext subReaderContext,
CheckedBiConsumer<Integer, FieldsVisitor, IOException> storedFieldReader,
Map<String, Object> sharedCache) throws IOException {
int rootDocId = findRootDocumentIfNested(context, subReaderContext, docId - subReaderContext.docBase);
if (rootDocId == -1) {
return prepareNonNestedHitContext(
Expand All @@ -265,10 +287,12 @@ private HitContext prepareHitContext(SearchContext context, SearchLookup lookup,
docId,
storedToRequestedFields,
subReaderContext,
storedFieldReader,
sharedCache
);
} else {
return prepareNestedHitContext(context, docId, rootDocId, storedToRequestedFields, subReaderContext, sharedCache);
return prepareNestedHitContext(context, docId, rootDocId, storedToRequestedFields,
subReaderContext, storedFieldReader, sharedCache);
}
}

Expand All @@ -285,6 +309,7 @@ private HitContext prepareNonNestedHitContext(SearchContext context,
int docId,
Map<String, Set<String>> storedToRequestedFields,
LeafReaderContext subReaderContext,
CheckedBiConsumer<Integer, FieldsVisitor, IOException> fieldReader,
Map<String, Object> sharedCache) throws IOException {
int subDocId = docId - subReaderContext.docBase;
DocumentMapper documentMapper = context.mapperService().documentMapper();
Expand All @@ -295,7 +320,7 @@ private HitContext prepareNonNestedHitContext(SearchContext context,
return new HitContext(hit, subReaderContext, subDocId, lookup.source(), sharedCache);
} else {
SearchHit hit;
loadStoredFields(context.mapperService(), subReaderContext, fieldsVisitor, subDocId);
loadStoredFields(context.mapperService(), fieldReader, fieldsVisitor, subDocId);
Uid uid = fieldsVisitor.uid();
if (fieldsVisitor.fields().isEmpty() == false) {
Map<String, DocumentField> docFields = new HashMap<>();
Expand Down Expand Up @@ -328,6 +353,7 @@ private HitContext prepareNestedHitContext(SearchContext context,
int rootDocId,
Map<String, Set<String>> storedToRequestedFields,
LeafReaderContext subReaderContext,
CheckedBiConsumer<Integer, FieldsVisitor, IOException> storedFieldReader,
Map<String, Object> sharedCache) throws IOException {
// Also if highlighting is requested on nested documents we need to fetch the _source from the root document,
// otherwise highlighting will attempt to fetch the _source from the nested doc, which will fail,
Expand All @@ -351,7 +377,7 @@ private HitContext prepareNestedHitContext(SearchContext context,
}
} else {
FieldsVisitor rootFieldsVisitor = new FieldsVisitor(needSource);
loadStoredFields(context.mapperService(), subReaderContext, rootFieldsVisitor, rootDocId);
loadStoredFields(context.mapperService(), storedFieldReader, rootFieldsVisitor, rootDocId);
rootFieldsVisitor.postProcess(context.mapperService());
rootId = rootFieldsVisitor.uid();

Expand All @@ -367,7 +393,7 @@ private HitContext prepareNestedHitContext(SearchContext context,
Map<String, DocumentField> metaFields = emptyMap();
if (context.hasStoredFields() && !context.storedFieldsContext().fieldNames().isEmpty()) {
FieldsVisitor nestedFieldsVisitor = new CustomFieldsVisitor(storedToRequestedFields.keySet(), false);
loadStoredFields(context.mapperService(), subReaderContext, nestedFieldsVisitor, nestedDocId);
loadStoredFields(context.mapperService(), storedFieldReader, nestedFieldsVisitor, nestedDocId);
if (nestedFieldsVisitor.fields().isEmpty() == false) {
docFields = new HashMap<>();
metaFields = new HashMap<>();
Expand Down Expand Up @@ -518,10 +544,10 @@ private SearchHit.NestedIdentity getInternalNestedIdentity(SearchContext context
}

private void loadStoredFields(MapperService mapperService,
LeafReaderContext readerContext,
CheckedBiConsumer<Integer, FieldsVisitor, IOException> fieldReader,
FieldsVisitor fieldVisitor, int docId) throws IOException {
fieldVisitor.reset();
readerContext.reader().document(docId, fieldVisitor);
fieldReader.accept(docId, fieldVisitor);
fieldVisitor.postProcess(mapperService);
}

Expand All @@ -548,4 +574,12 @@ private static void fillDocAndMetaFields(SearchContext context, FieldsVisitor fi
}
}
}

/**
* Returns <code>true</code> if the provided <code>docs</code> are
* stored sequentially (Dn = Dn-1 + 1).
*/
static boolean hasSequentialDocs(DocIdToIndex[] docs) {
return docs.length > 0 && docs[docs.length-1].docId - docs[0].docId == docs.length - 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.search.internal;

import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
Expand All @@ -30,6 +31,7 @@
import org.apache.lucene.search.suggest.document.CompletionTerms;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.automaton.CompiledAutomaton;
import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;

import java.io.IOException;

Expand Down Expand Up @@ -79,7 +81,7 @@ public CacheHelper getReaderCacheHelper() {
/**
* Wraps a {@link FilterLeafReader} with a {@link QueryCancellation}.
*/
static class ExitableLeafReader extends FilterLeafReader {
static class ExitableLeafReader extends SequentialStoredFieldsLeafReader {

private final QueryCancellation queryCancellation;

Expand Down Expand Up @@ -119,6 +121,11 @@ public CacheHelper getCoreCacheHelper() {
public CacheHelper getReaderCacheHelper() {
return in.getReaderCacheHelper();
}

@Override
protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) {
return reader;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
Expand Down Expand Up @@ -6299,4 +6300,25 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception {
}
}
}

public void testProducesStoredFieldsReader() throws Exception {
// Make sure that the engine produces a SequentialStoredFieldsLeafReader.
// This is required for optimizations on SourceLookup to work, which is in-turn useful for runtime fields.
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField("test"),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
Engine.Index operation = randomBoolean() ?
appendOnlyPrimary(doc, false, 1)
: appendOnlyReplica(doc, false, 1, randomIntBetween(0, 5));
engine.index(operation);
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
IndexReader reader = searcher.getIndexReader();
assertThat(reader.leaves().size(), Matchers.greaterThanOrEqualTo(1));
for (LeafReaderContext context: reader.leaves()) {
assertThat(context.reader(), Matchers.instanceOf(SequentialStoredFieldsLeafReader.class));
SequentialStoredFieldsLeafReader lf = (SequentialStoredFieldsLeafReader) context.reader();
assertNotNull(lf.getSequentialStoredFieldsReader());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.fetch;

import org.elasticsearch.test.ESTestCase;

public class FetchPhaseTests extends ESTestCase {
public void testSequentialDocs() {
FetchPhase.DocIdToIndex[] docs = new FetchPhase.DocIdToIndex[10];
int start = randomIntBetween(0, Short.MAX_VALUE);
for (int i = 0; i < 10; i++) {
docs[i] = new FetchPhase.DocIdToIndex(start, i);
++ start;
}
assertTrue(FetchPhase.hasSequentialDocs(docs));

int from = randomIntBetween(0, 9);
start = docs[from].docId;
for (int i = from; i < 10; i++) {
start += randomIntBetween(2, 10);
docs[i] = new FetchPhase.DocIdToIndex(start, i);
}
assertFalse(FetchPhase.hasSequentialDocs(docs));
}
}
Loading