Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.lucene.uid.Versions.DocIdAndVersion;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbersService;

import java.io.IOException;

Expand All @@ -43,57 +46,81 @@
* in more than one document! It will only return the first one it
* finds. */

final class PerThreadIDAndVersionLookup {
final class PerThreadIDAndVersionSeqNoLookup {
// TODO: do we really need to store all this stuff? some if it might not speed up anything.
// we keep it around for now, to reduce the amount of e.g. hash lookups by field and stuff

/** terms enum for uid field */
private final TermsEnum termsEnum;
/** _version data */
private final NumericDocValues versions;
/** _seq_no data */
private final NumericDocValues seqNos;
/** _primary_term data */
private final NumericDocValues primaryTerms;
/** Reused for iteration (when the term exists) */
private PostingsEnum docsEnum;

private final Object readerKey;

/**
* Initialize lookup for the provided segment
*/
PerThreadIDAndVersionLookup(LeafReader reader) throws IOException {
TermsEnum termsEnum = null;
NumericDocValues versions = null;

PerThreadIDAndVersionSeqNoLookup(LeafReader reader) throws IOException {
Fields fields = reader.fields();
if (fields != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty readers are not possible anymore?

Terms terms = fields.terms(UidFieldMapper.NAME);
if (terms != null) {
termsEnum = terms.iterator();
assert termsEnum != null;
versions = reader.getNumericDocValues(VersionFieldMapper.NAME);
assert versions != null;
}
}

this.versions = versions;
this.termsEnum = termsEnum;
Terms terms = fields.terms(UidFieldMapper.NAME);
termsEnum = terms.iterator();
assert termsEnum != null;
versions = reader.getNumericDocValues(VersionFieldMapper.NAME);
assert versions != null;
seqNos = reader.getNumericDocValues(SeqNoFieldMapper.NAME);
primaryTerms = reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
readerKey = reader.getCoreCacheKey();
}

/** Return null if id is not found. */
public DocIdAndVersion lookup(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException {
public DocIdAndVersion lookupVersion(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException {
assert context.reader().getCoreCacheKey().equals(readerKey);
int docID = getDocID(id, liveDocs);

if (docID != DocIdSetIterator.NO_MORE_DOCS) {
return new DocIdAndVersion(docID, versions.get(docID), context);
} else {
return null;
}
}

/** returns the internal lucene doc id for the given id bytes. {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found */
private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
if (termsEnum.seekExact(id)) {
int docID = DocIdSetIterator.NO_MORE_DOCS;
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
docsEnum = termsEnum.postings(docsEnum, 0);
int docID = DocIdSetIterator.NO_MORE_DOCS;
for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
if (liveDocs != null && liveDocs.get(d) == false) {
continue;
}
docID = d;
}
return docID;
} else {
return DocIdSetIterator.NO_MORE_DOCS;
}
}

if (docID != DocIdSetIterator.NO_MORE_DOCS) {
return new DocIdAndVersion(docID, versions.get(docID), context);
}
/** Return null if id is not found. */
public DocIdAndSeqNo lookupSequenceNo(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException {
assert context.reader().getCoreCacheKey().equals(readerKey);
int docID = getDocID(id, liveDocs);
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
return new DocIdAndSeqNo(docID, seqNos == null ? SequenceNumbersService.UNASSIGNED_SEQ_NO : seqNos.get(docID), context);
} else {
return null;
}
}

return null;
/** returns 0 if the primary term is not found */
public long lookUpPrimaryTerm(int docID) throws IOException {
return primaryTerms == null ? 0 : primaryTerms.get(docID);
}
}
231 changes: 0 additions & 231 deletions core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,6 @@

package org.elasticsearch.common.lucene.uid;

import org.apache.lucene.index.Fields;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReader.CoreClosedListener;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbersService;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentMap;

/** Utility class to resolve the Lucene doc ID and version for a given uid. */
public class Versions {

/** used to indicate the write operation should succeed regardless of current version **/
Expand All @@ -59,210 +34,4 @@ public class Versions {
* i.e., not found in the index and/or found as deleted (with version) in the version map
*/
public static final long MATCH_DELETED = -4L;

// TODO: is there somewhere else we can store these?
static final ConcurrentMap<Object, CloseableThreadLocal<PerThreadIDAndVersionLookup>> lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();

// Evict this reader from lookupStates once it's closed:
private static final CoreClosedListener removeLookupState = new CoreClosedListener() {
@Override
public void onClose(Object key) {
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.remove(key);
if (ctl != null) {
ctl.close();
}
}
};

private static PerThreadIDAndVersionLookup getLookupState(LeafReader reader) throws IOException {
Object key = reader.getCoreCacheKey();
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.get(key);
if (ctl == null) {
// First time we are seeing this reader's core; make a
// new CTL:
ctl = new CloseableThreadLocal<>();
CloseableThreadLocal<PerThreadIDAndVersionLookup> other = lookupStates.putIfAbsent(key, ctl);
if (other == null) {
// Our CTL won, we must remove it when the
// core is closed:
reader.addCoreClosedListener(removeLookupState);
} else {
// Another thread beat us to it: just use
// their CTL:
ctl = other;
}
}

PerThreadIDAndVersionLookup lookupState = ctl.get();
if (lookupState == null) {
lookupState = new PerThreadIDAndVersionLookup(reader);
ctl.set(lookupState);
}

return lookupState;
}

private Versions() {
}

/** Wraps an {@link LeafReaderContext}, a doc ID <b>relative to the context doc base</b> and a version. */
public static class DocIdAndVersion {
public final int docId;
public final long version;
public final LeafReaderContext context;

public DocIdAndVersion(int docId, long version, LeafReaderContext context) {
this.docId = docId;
this.version = version;
this.context = context;
}
}

/**
* Load the internal doc ID and version for the uid from the reader, returning<ul>
* <li>null if the uid wasn't found,
* <li>a doc ID and a version otherwise
* </ul>
*/
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
assert term.field().equals(UidFieldMapper.NAME);
List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return null;
}
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReaderContext context = leaves.get(i);
LeafReader leaf = context.reader();
PerThreadIDAndVersionLookup lookup = getLookupState(leaf);
DocIdAndVersion result = lookup.lookup(term.bytes(), leaf.getLiveDocs(), context);
if (result != null) {
return result;
}
}
return null;
}

/**
* Load the version for the uid from the reader, returning<ul>
* <li>{@link #NOT_FOUND} if no matching doc exists,
* <li>the version associated with the provided uid otherwise
* </ul>
*/
public static long loadVersion(IndexReader reader, Term term) throws IOException {
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term);
return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version;
}


/**
* Returns the sequence number for the given uid term, returning
* {@code SequenceNumbersService.UNASSIGNED_SEQ_NO} if none is found.
*/
public static long loadSeqNo(IndexReader reader, Term term) throws IOException {
assert term.field().equals(UidFieldMapper.NAME) : "can only load _seq_no by uid";
List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
}

// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReader leaf = leaves.get(i).reader();
Bits liveDocs = leaf.getLiveDocs();

TermsEnum termsEnum = null;
SortedNumericDocValues dvField = null;
PostingsEnum docsEnum = null;

final Fields fields = leaf.fields();
if (fields != null) {
Terms terms = fields.terms(UidFieldMapper.NAME);
if (terms != null) {
termsEnum = terms.iterator();
assert termsEnum != null;
dvField = leaf.getSortedNumericDocValues(SeqNoFieldMapper.NAME);
assert dvField != null;

final BytesRef id = term.bytes();
if (termsEnum.seekExact(id)) {
// there may be more than one matching docID, in the
// case of nested docs, so we want the last one:
docsEnum = termsEnum.postings(docsEnum, 0);
int docID = DocIdSetIterator.NO_MORE_DOCS;
for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
if (liveDocs != null && liveDocs.get(d) == false) {
continue;
}
docID = d;
}

if (docID != DocIdSetIterator.NO_MORE_DOCS) {
dvField.setDocument(docID);
assert dvField.count() == 1 : "expected only a single value for _seq_no but got " +
dvField.count();
return dvField.valueAt(0);
}
}
}
}

}
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
}

/**
* Returns the primary term for the given uid term, returning {@code 0} if none is found.
*/
public static long loadPrimaryTerm(IndexReader reader, Term term) throws IOException {
assert term.field().equals(UidFieldMapper.NAME) : "can only load _primary_term by uid";
List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return 0;
}

// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReader leaf = leaves.get(i).reader();
Bits liveDocs = leaf.getLiveDocs();

TermsEnum termsEnum = null;
NumericDocValues dvField = null;
PostingsEnum docsEnum = null;

final Fields fields = leaf.fields();
if (fields != null) {
Terms terms = fields.terms(UidFieldMapper.NAME);
if (terms != null) {
termsEnum = terms.iterator();
assert termsEnum != null;
dvField = leaf.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
assert dvField != null;

final BytesRef id = term.bytes();
if (termsEnum.seekExact(id)) {
// there may be more than one matching docID, in the
// case of nested docs, so we want the last one:
docsEnum = termsEnum.postings(docsEnum, 0);
int docID = DocIdSetIterator.NO_MORE_DOCS;
for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
if (liveDocs != null && liveDocs.get(d) == false) {
continue;
}
docID = d;
}

if (docID != DocIdSetIterator.NO_MORE_DOCS) {
return dvField.get(docID);
}
}
}
}

}
return 0;
}
}
Loading