Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f2211c1
initial version
bleskes Mar 7, 2017
baa7e51
fix testAppendWhileRecovering
bleskes Mar 7, 2017
14b4b4b
lingering reference to SortedNumericDocValuesField
bleskes Mar 7, 2017
2b835f7
currectly load sequence numbers if the index doesn't have the right d…
bleskes Mar 8, 2017
4a2f350
Merge remote-tracking branch 'upstream/master' into seq_no_as_version
bleskes Mar 8, 2017
6b9e3ca
share more version look up code
bleskes Mar 8, 2017
cde619c
remove no commit
bleskes Mar 8, 2017
d81bc8c
minor tweaks
bleskes Mar 9, 2017
a27352d
internal versioning test for primary. Fix found flag on version confl…
bleskes Mar 10, 2017
7b58f1d
fix delete error handling
bleskes Mar 10, 2017
b626e95
add primary external versioning test
bleskes Mar 11, 2017
eca7872
added testVersioningPromotedReplica
bleskes Mar 11, 2017
0e4a2f6
add concurrency tests
bleskes Mar 11, 2017
ca138f1
clean duplicate tests
bleskes Mar 11, 2017
c0343ea
added testConcurrentGetAndSetOnPrimary
bleskes Mar 11, 2017
a63c2c2
Merge remote-tracking branch 'upstream/master' into seq_no_as_version
bleskes Mar 22, 2017
a4f2335
rollback seq no based out of order resolving and leave it at refactor…
bleskes Mar 22, 2017
3328c94
fix line length
bleskes Mar 22, 2017
baa5032
Merge branch 'master' into engine_clearer_flow
bleskes Mar 23, 2017
b640c4f
Merge remote-tracking branch 'upstream/master' into engine_clearer_flow
bleskes Mar 29, 2017
e775faa
feedback
bleskes Mar 29, 2017
7cc73a1
some more minor cleanup
bleskes Mar 29, 2017
9eba7a0
Merge remote-tracking branch 'upstream/master' into engine_clearer_flow
bleskes Mar 30, 2017
7c0a050
IndexingPlan
bleskes Mar 30, 2017
99866b9
DeletionPlan
bleskes Mar 30, 2017
3ba080b
lint
bleskes Mar 30, 2017
f88cdb6
Merge remote-tracking branch 'upstream/master' into engine_clearer_flow
bleskes Mar 31, 2017
8cdac43
feedback
bleskes Mar 31, 2017
ffc9a85
flip Lucene/Op comparision
bleskes Mar 31, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.lucene.uid.Versions.DocIdAndVersion;
import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;

Expand All @@ -51,49 +51,66 @@ final class PerThreadIDAndVersionLookup {
private final TermsEnum termsEnum;
/** _version data */
private final NumericDocValues versions;

/** Reused for iteration (when the term exists) */
private PostingsEnum docsEnum;

/** used for assertions to make sure class usage meets assumptions */
private final Object readerKey;

/**
* Initialize lookup for the provided segment
*/
PerThreadIDAndVersionLookup(LeafReader reader) throws IOException {
TermsEnum termsEnum = null;
NumericDocValues versions = null;

Fields fields = reader.fields();
if (fields != null) {
Terms terms = fields.terms(UidFieldMapper.NAME);
if (terms != null) {
termsEnum = terms.iterator();
assert termsEnum != null;
versions = reader.getNumericDocValues(VersionFieldMapper.NAME);
assert versions != null;
}
Terms terms = fields.terms(UidFieldMapper.NAME);
termsEnum = terms.iterator();
if (termsEnum == null) {
throw new IllegalArgumentException("reader misses the [" + UidFieldMapper.NAME +
"] field");
}

this.versions = versions;
this.termsEnum = termsEnum;
versions = reader.getNumericDocValues(VersionFieldMapper.NAME);
if (versions == null) {
throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME +
"] field");
}
Object readerKey = null;
assert (readerKey = reader.getCoreCacheKey()) != null;
this.readerKey = readerKey;
}

/** Return null if id is not found. */
public DocIdAndVersion lookup(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException {
public DocIdAndVersion lookupVersion(BytesRef id, Bits liveDocs, LeafReaderContext context)
throws IOException {
assert context.reader().getCoreCacheKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
int docID = getDocID(id, liveDocs);

if (docID != DocIdSetIterator.NO_MORE_DOCS) {
return new DocIdAndVersion(docID, versions.get(docID), context);
} else {
return null;
}
}

/**
* returns the internal lucene doc id for the given id bytes.
* {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found
* */
private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
if (termsEnum.seekExact(id)) {
int docID = DocIdSetIterator.NO_MORE_DOCS;
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
docsEnum = termsEnum.postings(docsEnum, 0);
int docID = DocIdSetIterator.NO_MORE_DOCS;
for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
if (liveDocs != null && liveDocs.get(d) == false) {
continue;
}
docID = d;
}

if (docID != DocIdSetIterator.NO_MORE_DOCS) {
return new DocIdAndVersion(docID, versions.get(docID), context);
}
return docID;
} else {
return DocIdSetIterator.NO_MORE_DOCS;
}

return null;
}
}
233 changes: 1 addition & 232 deletions core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,7 @@

package org.elasticsearch.common.lucene.uid;

import org.apache.lucene.index.Fields;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReader.CoreClosedListener;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbersService;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentMap;

/** Utility class to resolve the Lucene doc ID and version for a given uid. */
public class Versions {
public final class Versions {

/** used to indicate the write operation should succeed regardless of current version **/
public static final long MATCH_ANY = -3L;
Expand All @@ -59,210 +34,4 @@ public class Versions {
* i.e., not found in the index and/or found as deleted (with version) in the version map
*/
public static final long MATCH_DELETED = -4L;

// TODO: is there somewhere else we can store these?
static final ConcurrentMap<Object, CloseableThreadLocal<PerThreadIDAndVersionLookup>> lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();

// Evict this reader from lookupStates once it's closed:
private static final CoreClosedListener removeLookupState = new CoreClosedListener() {
@Override
public void onClose(Object key) {
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.remove(key);
if (ctl != null) {
ctl.close();
}
}
};

private static PerThreadIDAndVersionLookup getLookupState(LeafReader reader) throws IOException {
Object key = reader.getCoreCacheKey();
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.get(key);
if (ctl == null) {
// First time we are seeing this reader's core; make a
// new CTL:
ctl = new CloseableThreadLocal<>();
CloseableThreadLocal<PerThreadIDAndVersionLookup> other = lookupStates.putIfAbsent(key, ctl);
if (other == null) {
// Our CTL won, we must remove it when the
// core is closed:
reader.addCoreClosedListener(removeLookupState);
} else {
// Another thread beat us to it: just use
// their CTL:
ctl = other;
}
}

PerThreadIDAndVersionLookup lookupState = ctl.get();
if (lookupState == null) {
lookupState = new PerThreadIDAndVersionLookup(reader);
ctl.set(lookupState);
}

return lookupState;
}

private Versions() {
}

/** Wraps an {@link LeafReaderContext}, a doc ID <b>relative to the context doc base</b> and a version. */
public static class DocIdAndVersion {
public final int docId;
public final long version;
public final LeafReaderContext context;

public DocIdAndVersion(int docId, long version, LeafReaderContext context) {
this.docId = docId;
this.version = version;
this.context = context;
}
}

/**
* Load the internal doc ID and version for the uid from the reader, returning<ul>
* <li>null if the uid wasn't found,
* <li>a doc ID and a version otherwise
* </ul>
*/
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
assert term.field().equals(UidFieldMapper.NAME);
List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return null;
}
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReaderContext context = leaves.get(i);
LeafReader leaf = context.reader();
PerThreadIDAndVersionLookup lookup = getLookupState(leaf);
DocIdAndVersion result = lookup.lookup(term.bytes(), leaf.getLiveDocs(), context);
if (result != null) {
return result;
}
}
return null;
}

/**
* Load the version for the uid from the reader, returning<ul>
* <li>{@link #NOT_FOUND} if no matching doc exists,
* <li>the version associated with the provided uid otherwise
* </ul>
*/
public static long loadVersion(IndexReader reader, Term term) throws IOException {
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term);
return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version;
}


/**
* Returns the sequence number for the given uid term, returning
* {@code SequenceNumbersService.UNASSIGNED_SEQ_NO} if none is found.
*/
public static long loadSeqNo(IndexReader reader, Term term) throws IOException {
assert term.field().equals(UidFieldMapper.NAME) : "can only load _seq_no by uid";
List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
}

// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReader leaf = leaves.get(i).reader();
Bits liveDocs = leaf.getLiveDocs();

TermsEnum termsEnum = null;
SortedNumericDocValues dvField = null;
PostingsEnum docsEnum = null;

final Fields fields = leaf.fields();
if (fields != null) {
Terms terms = fields.terms(UidFieldMapper.NAME);
if (terms != null) {
termsEnum = terms.iterator();
assert termsEnum != null;
dvField = leaf.getSortedNumericDocValues(SeqNoFieldMapper.NAME);
assert dvField != null;

final BytesRef id = term.bytes();
if (termsEnum.seekExact(id)) {
// there may be more than one matching docID, in the
// case of nested docs, so we want the last one:
docsEnum = termsEnum.postings(docsEnum, 0);
int docID = DocIdSetIterator.NO_MORE_DOCS;
for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
if (liveDocs != null && liveDocs.get(d) == false) {
continue;
}
docID = d;
}

if (docID != DocIdSetIterator.NO_MORE_DOCS) {
dvField.setDocument(docID);
assert dvField.count() == 1 : "expected only a single value for _seq_no but got " +
dvField.count();
return dvField.valueAt(0);
}
}
}
}

}
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
}

/**
* Returns the primary term for the given uid term, returning {@code 0} if none is found.
*/
public static long loadPrimaryTerm(IndexReader reader, Term term) throws IOException {
assert term.field().equals(UidFieldMapper.NAME) : "can only load _primary_term by uid";
List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return 0;
}

// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReader leaf = leaves.get(i).reader();
Bits liveDocs = leaf.getLiveDocs();

TermsEnum termsEnum = null;
NumericDocValues dvField = null;
PostingsEnum docsEnum = null;

final Fields fields = leaf.fields();
if (fields != null) {
Terms terms = fields.terms(UidFieldMapper.NAME);
if (terms != null) {
termsEnum = terms.iterator();
assert termsEnum != null;
dvField = leaf.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
assert dvField != null;

final BytesRef id = term.bytes();
if (termsEnum.seekExact(id)) {
// there may be more than one matching docID, in the
// case of nested docs, so we want the last one:
docsEnum = termsEnum.postings(docsEnum, 0);
int docID = DocIdSetIterator.NO_MORE_DOCS;
for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
if (liveDocs != null && liveDocs.get(d) == false) {
continue;
}
docID = d;
}

if (docID != DocIdSetIterator.NO_MORE_DOCS) {
return dvField.get(docID);
}
}
}
}

}
return 0;
}
}
Loading