Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,21 @@
* not thread safe, so it is the caller's job to create and use one
* instance of this per thread. Do not use this if a term may appear
* in more than one document! It will only return the first one it
* finds. */

* finds.
* This class uses live docs, so it should be cached based on the
* {@link org.apache.lucene.index.IndexReader#getReaderCacheHelper() reader cache helper}
* rather than the {@link LeafReader#getCoreCacheHelper() core cache helper}.
*/
final class PerThreadIDVersionAndSeqNoLookup {
// TODO: do we really need to store all this stuff? some if it might not speed up anything.
// we keep it around for now, to reduce the amount of e.g. hash lookups by field and stuff

/** The {@link LeafReaderContext} that needs to be looked up. */
private final LeafReaderContext context;
/** Live docs of the context, cached to avoid the cost of ensureOpen() on every
* segment for every index operation. */
private final Bits liveDocs;

/** terms enum for uid field */
final String uidField;
private final TermsEnum termsEnum;
Expand All @@ -62,7 +71,10 @@ final class PerThreadIDVersionAndSeqNoLookup {
/**
* Initialize lookup for the provided segment
*/
PerThreadIDVersionAndSeqNoLookup(LeafReader reader, String uidField) throws IOException {
PerThreadIDVersionAndSeqNoLookup(LeafReaderContext context, String uidField) throws IOException {
this.context = context;
final LeafReader reader = context.reader();
this.liveDocs = reader.getLiveDocs();
this.uidField = uidField;
Fields fields = reader.fields();
Terms terms = fields.terms(uidField);
Expand All @@ -80,11 +92,11 @@ final class PerThreadIDVersionAndSeqNoLookup {
}

/** Return null if id is not found. */
public DocIdAndVersion lookupVersion(BytesRef id, Bits liveDocs, LeafReaderContext context)
public DocIdAndVersion lookupVersion(BytesRef id)
throws IOException {
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
int docID = getDocID(id, liveDocs);
int docID = getDocID(id);

if (docID != DocIdSetIterator.NO_MORE_DOCS) {
final NumericDocValues versions = context.reader().getNumericDocValues(VersionFieldMapper.NAME);
Expand All @@ -104,7 +116,7 @@ public DocIdAndVersion lookupVersion(BytesRef id, Bits liveDocs, LeafReaderConte
* returns the internal lucene doc id for the given id bytes.
* {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found
* */
private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
private int getDocID(BytesRef id) throws IOException {
if (termsEnum.seekExact(id)) {
int docID = DocIdSetIterator.NO_MORE_DOCS;
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
Expand All @@ -122,10 +134,8 @@ private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
}

/** Return null if id is not found. */
DocIdAndSeqNo lookupSeqNo(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException {
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
int docID = getDocID(id, liveDocs);
DocIdAndSeqNo lookupSeqNo(BytesRef id) throws IOException {
int docID = getDocID(id);
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
long seqNo;
Expand All @@ -139,18 +149,4 @@ DocIdAndSeqNo lookupSeqNo(BytesRef id, Bits liveDocs, LeafReaderContext context)
return null;
}
}

/**
* returns 0 if the primary term is not found.
*
* Note that 0 is an illegal primary term. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)}
**/
long lookUpPrimaryTerm(int docID, LeafReader reader) throws IOException {
NumericDocValues primaryTerms = reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
if (primaryTerms != null && primaryTerms.advanceExact(docID)) {
return primaryTerms.longValue();
} else {
return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
package org.elasticsearch.common.lucene.uid;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.Term;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;

import java.io.IOException;
import java.util.List;
Expand All @@ -36,40 +37,54 @@
/** Utility class to resolve the Lucene doc ID, version, seqNo and primaryTerms for a given uid. */
public final class VersionsAndSeqNoResolver {

static final ConcurrentMap<Object, CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup>> lookupStates =
static final ConcurrentMap<IndexReader.CacheKey, CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup[]>> lookupStates =
ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();

// Evict this reader from lookupStates once it's closed:
private static final IndexReader.ClosedListener removeLookupState = key -> {
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup> ctl = lookupStates.remove(key);
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup[]> ctl = lookupStates.remove(key);
if (ctl != null) {
ctl.close();
}
};

private static PerThreadIDVersionAndSeqNoLookup getLookupState(LeafReader reader, String uidField) throws IOException {
IndexReader.CacheHelper cacheHelper = reader.getCoreCacheHelper();
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup> ctl = lookupStates.get(cacheHelper.getKey());
private static PerThreadIDVersionAndSeqNoLookup[] getLookupState(IndexReader reader, String uidField) throws IOException {
// We cache on the top level
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you leave a comment why we moved to this? you can also reference this PR for documentation purposes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I had more comments initially but they apparently got lost with the rebase

// This means cache entries have a shorter lifetime, maybe as low as 1s with the
// default refresh interval and a steady indexing rate, but on the other hand it
// proved to be cheaper than having to perform a CHM and a TL get for every segment.
// See https://github.com/elastic/elasticsearch/pull/19856.
IndexReader.CacheHelper cacheHelper = reader.getReaderCacheHelper();
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup[]> ctl = lookupStates.get(cacheHelper.getKey());
if (ctl == null) {
// First time we are seeing this reader's core; make a new CTL:
ctl = new CloseableThreadLocal<>();
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup> other = lookupStates.putIfAbsent(cacheHelper.getKey(), ctl);
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup[]> other = lookupStates.putIfAbsent(cacheHelper.getKey(), ctl);
if (other == null) {
// Our CTL won, we must remove it when the core is closed:
// Our CTL won, we must remove it when the reader is closed:
cacheHelper.addClosedListener(removeLookupState);
} else {
// Another thread beat us to it: just use their CTL:
ctl = other;
}
}

PerThreadIDVersionAndSeqNoLookup lookupState = ctl.get();
PerThreadIDVersionAndSeqNoLookup[] lookupState = ctl.get();
if (lookupState == null) {
lookupState = new PerThreadIDVersionAndSeqNoLookup(reader, uidField);
lookupState = new PerThreadIDVersionAndSeqNoLookup[reader.leaves().size()];
for (LeafReaderContext leaf : reader.leaves()) {
lookupState[leaf.ord] = new PerThreadIDVersionAndSeqNoLookup(leaf, uidField);
}
ctl.set(lookupState);
} else if (Objects.equals(lookupState.uidField, uidField) == false) {
}

if (lookupState.length != reader.leaves().size()) {
throw new AssertionError("Mismatched numbers of leaves: " + lookupState.length + " != " + reader.leaves().size());
}

if (lookupState.length > 0 && Objects.equals(lookupState[0].uidField, uidField) == false) {
throw new AssertionError("Index does not consistently use the same uid field: ["
+ uidField + "] != [" + lookupState.uidField + "]");
+ uidField + "] != [" + lookupState[0].uidField + "]");
}

return lookupState;
Expand Down Expand Up @@ -112,17 +127,13 @@ public static class DocIdAndSeqNo {
* </ul>
*/
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return null;
}
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReaderContext context = leaves.get(i);
LeafReader leaf = context.reader();
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf, term.field());
DocIdAndVersion result = lookup.lookupVersion(term.bytes(), leaf.getLiveDocs(), context);
PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaves.get(i).ord];
DocIdAndVersion result = lookup.lookupVersion(term.bytes());
if (result != null) {
return result;
}
Expand All @@ -137,17 +148,13 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term)
* </ul>
*/
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException {
PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return null;
}
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReaderContext context = leaves.get(i);
LeafReader leaf = context.reader();
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf, term.field());
DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf.getLiveDocs(), context);
PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaves.get(i).ord];
DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes());
if (result != null) {
return result;
}
Expand All @@ -159,9 +166,13 @@ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) thr
* Load the primaryTerm associated with the given {@link DocIdAndSeqNo}
*/
public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo, String uidField) throws IOException {
LeafReader leaf = docIdAndSeqNo.context.reader();
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf, uidField);
long result = lookup.lookUpPrimaryTerm(docIdAndSeqNo.docId, leaf);
NumericDocValues primaryTerms = docIdAndSeqNo.context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
long result;
if (primaryTerms != null && primaryTerms.advanceExact(docIdAndSeqNo.docId)) {
result = primaryTerms.longValue();
} else {
result = 0;
}
assert result > 0 : "should always resolve a primary term for a resolved sequence number. primary_term [" + result + "]"
+ " docId [" + docIdAndSeqNo.docId + "] seqNo [" + docIdAndSeqNo.seqNo + "]";
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.index.mapper.IdFieldMapper;
Expand All @@ -46,23 +46,31 @@ public class VersionLookupTests extends ESTestCase {
*/
public void testSimple() throws Exception {
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
// to have deleted docs
.setMergePolicy(NoMergePolicy.INSTANCE));
Document doc = new Document();
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
writer.addDocument(doc);
writer.addDocument(new Document());
DirectoryReader reader = DirectoryReader.open(writer);
LeafReaderContext segment = reader.leaves().get(0);
PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), IdFieldMapper.NAME);
PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME);
// found doc
DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment);
DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"));
assertNotNull(result);
assertEquals(87, result.version);
assertEquals(0, result.docId);
// not found doc
assertNull(lookup.lookupVersion(new BytesRef("7"), null, segment));
assertNull(lookup.lookupVersion(new BytesRef("7")));
// deleted doc
assertNull(lookup.lookupVersion(new BytesRef("6"), new Bits.MatchNoBits(1), segment));
writer.deleteDocuments(new Term(IdFieldMapper.NAME, "6"));
reader.close();
reader = DirectoryReader.open(writer);
segment = reader.leaves().get(0);
lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME);
assertNull(lookup.lookupVersion(new BytesRef("6")));
reader.close();
writer.close();
dir.close();
Expand All @@ -73,36 +81,39 @@ public void testSimple() throws Exception {
*/
public void testTwoDocuments() throws Exception {
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
.setMergePolicy(NoMergePolicy.INSTANCE));
Document doc = new Document();
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
writer.addDocument(doc);
writer.addDocument(doc);
writer.addDocument(new Document());
DirectoryReader reader = DirectoryReader.open(writer);
LeafReaderContext segment = reader.leaves().get(0);
PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), IdFieldMapper.NAME);
PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME);
// return the last doc when there are duplicates
DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment);
DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"));
assertNotNull(result);
assertEquals(87, result.version);
assertEquals(1, result.docId);
// delete the first doc only
FixedBitSet live = new FixedBitSet(2);
live.set(1);
result = lookup.lookupVersion(new BytesRef("6"), live, segment);
assertTrue(writer.tryDeleteDocument(reader, 0) >= 0);
reader.close();
reader = DirectoryReader.open(writer);
segment = reader.leaves().get(0);
lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME);
result = lookup.lookupVersion(new BytesRef("6"));
assertNotNull(result);
assertEquals(87, result.version);
assertEquals(1, result.docId);
// delete the second doc only
live.clear(1);
live.set(0);
result = lookup.lookupVersion(new BytesRef("6"), live, segment);
assertNotNull(result);
assertEquals(87, result.version);
assertEquals(0, result.docId);
// delete both docs
assertNull(lookup.lookupVersion(new BytesRef("6"), new Bits.MatchNoBits(2), segment));
assertTrue(writer.tryDeleteDocument(reader, 1) >= 0);
reader.close();
reader = DirectoryReader.open(writer);
segment = reader.leaves().get(0);
lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME);
assertNull(lookup.lookupVersion(new BytesRef("6")));
reader.close();
writer.close();
dir.close();
Expand Down