Skip to content

Commit fc1db9e

Browse files
committed
Speed up PK lookups at index time.
At index time Elasticsearch needs to look up the version associated with the `_id` of the document that is being indexed, which is often the bottleneck for indexing. While reviewing the output of the `jfr` telemetry from a Rally benchmark, I saw that significant time was spent in `ConcurrentHashMap#get` and `ThreadLocal#get`. The reason is that we cache lookup objects per thread and segment, and for every indexed document, we first need to look up the cache associated with this segment (`ConcurrentHashMap#get`) and then get a state that is local to the current thread (`ThreadLocal#get`). So if you are indexing N documents per second and have S segments, both these methods will be called N*S times per second. This commit changes version lookup to use a cache per index reader rather than per segment. While this makes cache entries live for less long, we now only need to do one call to `ConcurrentHashMap#get` and `ThreadLocal#get` per indexed document.
1 parent aa3134c commit fc1db9e

File tree

3 files changed

+88
-74
lines changed

3 files changed

+88
-74
lines changed

core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,21 @@
4343
* not thread safe, so it is the caller's job to create and use one
4444
* instance of this per thread. Do not use this if a term may appear
4545
* in more than one document! It will only return the first one it
46-
* finds. */
47-
46+
* finds.
47+
* This class uses live docs, so it should be cached based on the
48+
* {@link org.apache.lucene.index.IndexReader#getReaderCacheHelper() reader cache helper}
49+
* rather than the {@link LeafReader#getCoreCacheHelper() core cache helper}.
50+
*/
4851
final class PerThreadIDVersionAndSeqNoLookup {
4952
// TODO: do we really need to store all this stuff? some if it might not speed up anything.
5053
// we keep it around for now, to reduce the amount of e.g. hash lookups by field and stuff
5154

55+
/** The {@link LeafReaderContext} that needs to be looked up. */
56+
private final LeafReaderContext context;
57+
/** Live docs of the context, cached to avoid the cost of ensureOpen() on every
58+
* segment for every index operation. */
59+
private final Bits liveDocs;
60+
5261
/** terms enum for uid field */
5362
final String uidField;
5463
private final TermsEnum termsEnum;
@@ -62,7 +71,10 @@ final class PerThreadIDVersionAndSeqNoLookup {
6271
/**
6372
* Initialize lookup for the provided segment
6473
*/
65-
PerThreadIDVersionAndSeqNoLookup(LeafReader reader, String uidField) throws IOException {
74+
PerThreadIDVersionAndSeqNoLookup(LeafReaderContext context, String uidField) throws IOException {
75+
this.context = context;
76+
final LeafReader reader = context.reader();
77+
this.liveDocs = reader.getLiveDocs();
6678
this.uidField = uidField;
6779
Fields fields = reader.fields();
6880
Terms terms = fields.terms(uidField);
@@ -80,11 +92,11 @@ final class PerThreadIDVersionAndSeqNoLookup {
8092
}
8193

8294
/** Return null if id is not found. */
83-
public DocIdAndVersion lookupVersion(BytesRef id, Bits liveDocs, LeafReaderContext context)
95+
public DocIdAndVersion lookupVersion(BytesRef id)
8496
throws IOException {
8597
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
8698
"context's reader is not the same as the reader class was initialized on.";
87-
int docID = getDocID(id, liveDocs);
99+
int docID = getDocID(id);
88100

89101
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
90102
final NumericDocValues versions = context.reader().getNumericDocValues(VersionFieldMapper.NAME);
@@ -104,7 +116,7 @@ public DocIdAndVersion lookupVersion(BytesRef id, Bits liveDocs, LeafReaderConte
104116
* returns the internal lucene doc id for the given id bytes.
105117
* {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found
106118
* */
107-
private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
119+
private int getDocID(BytesRef id) throws IOException {
108120
if (termsEnum.seekExact(id)) {
109121
int docID = DocIdSetIterator.NO_MORE_DOCS;
110122
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
@@ -122,10 +134,8 @@ private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
122134
}
123135

124136
/** Return null if id is not found. */
125-
DocIdAndSeqNo lookupSeqNo(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException {
126-
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
127-
"context's reader is not the same as the reader class was initialized on.";
128-
int docID = getDocID(id, liveDocs);
137+
DocIdAndSeqNo lookupSeqNo(BytesRef id) throws IOException {
138+
int docID = getDocID(id);
129139
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
130140
NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
131141
long seqNo;
@@ -139,18 +149,4 @@ DocIdAndSeqNo lookupSeqNo(BytesRef id, Bits liveDocs, LeafReaderContext context)
139149
return null;
140150
}
141151
}
142-
143-
/**
144-
* returns 0 if the primary term is not found.
145-
*
146-
* Note that 0 is an illegal primary term. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)}
147-
**/
148-
long lookUpPrimaryTerm(int docID, LeafReader reader) throws IOException {
149-
NumericDocValues primaryTerms = reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
150-
if (primaryTerms != null && primaryTerms.advanceExact(docID)) {
151-
return primaryTerms.longValue();
152-
} else {
153-
return 0;
154-
}
155-
}
156152
}

core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@
2020
package org.elasticsearch.common.lucene.uid;
2121

2222
import org.apache.lucene.index.IndexReader;
23-
import org.apache.lucene.index.LeafReader;
2423
import org.apache.lucene.index.LeafReaderContext;
24+
import org.apache.lucene.index.NumericDocValues;
2525
import org.apache.lucene.index.Term;
2626
import org.apache.lucene.util.CloseableThreadLocal;
2727
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
28+
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
2829

2930
import java.io.IOException;
3031
import java.util.List;
@@ -36,40 +37,50 @@
3637
/** Utility class to resolve the Lucene doc ID, version, seqNo and primaryTerms for a given uid. */
3738
public final class VersionsAndSeqNoResolver {
3839

39-
static final ConcurrentMap<Object, CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup>> lookupStates =
40+
static final ConcurrentMap<IndexReader.CacheKey, CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup[]>> lookupStates =
4041
ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
4142

4243
// Evict this reader from lookupStates once it's closed:
4344
private static final IndexReader.ClosedListener removeLookupState = key -> {
44-
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup> ctl = lookupStates.remove(key);
45+
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup[]> ctl = lookupStates.remove(key);
4546
if (ctl != null) {
4647
ctl.close();
4748
}
4849
};
4950

50-
private static PerThreadIDVersionAndSeqNoLookup getLookupState(LeafReader reader, String uidField) throws IOException {
51-
IndexReader.CacheHelper cacheHelper = reader.getCoreCacheHelper();
52-
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup> ctl = lookupStates.get(cacheHelper.getKey());
51+
private static PerThreadIDVersionAndSeqNoLookup[] getLookupState(IndexReader reader, String uidField) throws IOException {
52+
// We cache on the top level
53+
IndexReader.CacheHelper cacheHelper = reader.getReaderCacheHelper();
54+
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup[]> ctl = lookupStates.get(cacheHelper.getKey());
5355
if (ctl == null) {
5456
// First time we are seeing this reader's core; make a new CTL:
5557
ctl = new CloseableThreadLocal<>();
56-
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup> other = lookupStates.putIfAbsent(cacheHelper.getKey(), ctl);
58+
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup[]> other = lookupStates.putIfAbsent(cacheHelper.getKey(), ctl);
5759
if (other == null) {
58-
// Our CTL won, we must remove it when the core is closed:
60+
// Our CTL won, we must remove it when the reader is closed:
5961
cacheHelper.addClosedListener(removeLookupState);
6062
} else {
6163
// Another thread beat us to it: just use their CTL:
6264
ctl = other;
6365
}
6466
}
6567

66-
PerThreadIDVersionAndSeqNoLookup lookupState = ctl.get();
68+
PerThreadIDVersionAndSeqNoLookup[] lookupState = ctl.get();
6769
if (lookupState == null) {
68-
lookupState = new PerThreadIDVersionAndSeqNoLookup(reader, uidField);
70+
lookupState = new PerThreadIDVersionAndSeqNoLookup[reader.leaves().size()];
71+
for (LeafReaderContext leaf : reader.leaves()) {
72+
lookupState[leaf.ord] = new PerThreadIDVersionAndSeqNoLookup(leaf, uidField);
73+
}
6974
ctl.set(lookupState);
70-
} else if (Objects.equals(lookupState.uidField, uidField) == false) {
75+
}
76+
77+
if (lookupState.length != reader.leaves().size()) {
78+
throw new AssertionError("Mismatched numbers of leaves: " + lookupState.length + " != " + reader.leaves().size());
79+
}
80+
81+
if (lookupState.length > 0 && Objects.equals(lookupState[0].uidField, uidField) == false) {
7182
throw new AssertionError("Index does not consistently use the same uid field: ["
72-
+ uidField + "] != [" + lookupState.uidField + "]");
83+
+ uidField + "] != [" + lookupState[0].uidField + "]");
7384
}
7485

7586
return lookupState;
@@ -112,17 +123,13 @@ public static class DocIdAndSeqNo {
112123
* </ul>
113124
*/
114125
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
126+
PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
115127
List<LeafReaderContext> leaves = reader.leaves();
116-
if (leaves.isEmpty()) {
117-
return null;
118-
}
119128
// iterate backwards to optimize for the frequently updated documents
120129
// which are likely to be in the last segments
121130
for (int i = leaves.size() - 1; i >= 0; i--) {
122-
LeafReaderContext context = leaves.get(i);
123-
LeafReader leaf = context.reader();
124-
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf, term.field());
125-
DocIdAndVersion result = lookup.lookupVersion(term.bytes(), leaf.getLiveDocs(), context);
131+
PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaves.get(i).ord];
132+
DocIdAndVersion result = lookup.lookupVersion(term.bytes());
126133
if (result != null) {
127134
return result;
128135
}
@@ -137,17 +144,13 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term)
137144
* </ul>
138145
*/
139146
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException {
147+
PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
140148
List<LeafReaderContext> leaves = reader.leaves();
141-
if (leaves.isEmpty()) {
142-
return null;
143-
}
144149
// iterate backwards to optimize for the frequently updated documents
145150
// which are likely to be in the last segments
146151
for (int i = leaves.size() - 1; i >= 0; i--) {
147-
LeafReaderContext context = leaves.get(i);
148-
LeafReader leaf = context.reader();
149-
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf, term.field());
150-
DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf.getLiveDocs(), context);
152+
PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaves.get(i).ord];
153+
DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes());
151154
if (result != null) {
152155
return result;
153156
}
@@ -159,9 +162,13 @@ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) thr
159162
* Load the primaryTerm associated with the given {@link DocIdAndSeqNo}
160163
*/
161164
public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo, String uidField) throws IOException {
162-
LeafReader leaf = docIdAndSeqNo.context.reader();
163-
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf, uidField);
164-
long result = lookup.lookUpPrimaryTerm(docIdAndSeqNo.docId, leaf);
165+
NumericDocValues primaryTerms = docIdAndSeqNo.context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
166+
long result;
167+
if (primaryTerms != null && primaryTerms.advanceExact(docIdAndSeqNo.docId)) {
168+
result = primaryTerms.longValue();
169+
} else {
170+
result = 0;
171+
}
165172
assert result > 0 : "should always resolve a primary term for a resolved sequence number. primary_term [" + result + "]"
166173
+ " docId [" + docIdAndSeqNo.docId + "] seqNo [" + docIdAndSeqNo.seqNo + "]";
167174
return result;

core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@
2626
import org.apache.lucene.index.IndexWriter;
2727
import org.apache.lucene.index.IndexWriterConfig;
2828
import org.apache.lucene.index.LeafReaderContext;
29+
import org.apache.lucene.index.NoMergePolicy;
30+
import org.apache.lucene.index.Term;
2931
import org.apache.lucene.store.Directory;
30-
import org.apache.lucene.util.Bits;
3132
import org.apache.lucene.util.BytesRef;
32-
import org.apache.lucene.util.FixedBitSet;
3333
import org.elasticsearch.common.lucene.Lucene;
3434
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
3535
import org.elasticsearch.index.mapper.IdFieldMapper;
@@ -46,23 +46,31 @@ public class VersionLookupTests extends ESTestCase {
4646
*/
4747
public void testSimple() throws Exception {
4848
Directory dir = newDirectory();
49-
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
49+
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
50+
// to have deleted docs
51+
.setMergePolicy(NoMergePolicy.INSTANCE));
5052
Document doc = new Document();
5153
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
5254
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
5355
writer.addDocument(doc);
56+
writer.addDocument(new Document());
5457
DirectoryReader reader = DirectoryReader.open(writer);
5558
LeafReaderContext segment = reader.leaves().get(0);
56-
PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), IdFieldMapper.NAME);
59+
PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME);
5760
// found doc
58-
DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment);
61+
DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"));
5962
assertNotNull(result);
6063
assertEquals(87, result.version);
6164
assertEquals(0, result.docId);
6265
// not found doc
63-
assertNull(lookup.lookupVersion(new BytesRef("7"), null, segment));
66+
assertNull(lookup.lookupVersion(new BytesRef("7")));
6467
// deleted doc
65-
assertNull(lookup.lookupVersion(new BytesRef("6"), new Bits.MatchNoBits(1), segment));
68+
writer.deleteDocuments(new Term(IdFieldMapper.NAME, "6"));
69+
reader.close();
70+
reader = DirectoryReader.open(writer);
71+
segment = reader.leaves().get(0);
72+
lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME);
73+
assertNull(lookup.lookupVersion(new BytesRef("6")));
6674
reader.close();
6775
writer.close();
6876
dir.close();
@@ -73,36 +81,39 @@ public void testSimple() throws Exception {
7381
*/
7482
public void testTwoDocuments() throws Exception {
7583
Directory dir = newDirectory();
76-
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
84+
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
85+
.setMergePolicy(NoMergePolicy.INSTANCE));
7786
Document doc = new Document();
7887
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
7988
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
8089
writer.addDocument(doc);
8190
writer.addDocument(doc);
91+
writer.addDocument(new Document());
8292
DirectoryReader reader = DirectoryReader.open(writer);
8393
LeafReaderContext segment = reader.leaves().get(0);
84-
PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), IdFieldMapper.NAME);
94+
PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME);
8595
// return the last doc when there are duplicates
86-
DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment);
96+
DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"));
8797
assertNotNull(result);
8898
assertEquals(87, result.version);
8999
assertEquals(1, result.docId);
90100
// delete the first doc only
91-
FixedBitSet live = new FixedBitSet(2);
92-
live.set(1);
93-
result = lookup.lookupVersion(new BytesRef("6"), live, segment);
101+
assertTrue(writer.tryDeleteDocument(reader, 0) >= 0);
102+
reader.close();
103+
reader = DirectoryReader.open(writer);
104+
segment = reader.leaves().get(0);
105+
lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME);
106+
result = lookup.lookupVersion(new BytesRef("6"));
94107
assertNotNull(result);
95108
assertEquals(87, result.version);
96109
assertEquals(1, result.docId);
97-
// delete the second doc only
98-
live.clear(1);
99-
live.set(0);
100-
result = lookup.lookupVersion(new BytesRef("6"), live, segment);
101-
assertNotNull(result);
102-
assertEquals(87, result.version);
103-
assertEquals(0, result.docId);
104110
// delete both docs
105-
assertNull(lookup.lookupVersion(new BytesRef("6"), new Bits.MatchNoBits(2), segment));
111+
assertTrue(writer.tryDeleteDocument(reader, 1) >= 0);
112+
reader.close();
113+
reader = DirectoryReader.open(writer);
114+
segment = reader.leaves().get(0);
115+
lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME);
116+
assertNull(lookup.lookupVersion(new BytesRef("6")));
106117
reader.close();
107118
writer.close();
108119
dir.close();

0 commit comments

Comments
 (0)