Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,38 +102,20 @@ public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderC
throws IOException {
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
int docID = getDocID(id, context.reader().getLiveDocs());
int docID = getDocID(id, context);

if (docID != DocIdSetIterator.NO_MORE_DOCS) {
final NumericDocValues versions = context.reader().getNumericDocValues(VersionFieldMapper.NAME);
if (versions == null) {
throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME + "] field");
}
if (versions.advanceExact(docID) == false) {
throw new IllegalArgumentException("Document [" + docID + "] misses the [" + VersionFieldMapper.NAME + "] field");
}
final long seqNo;
final long term;
if (loadSeqNo) {
NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
// remove the null check in 7.0 once we can't read indices with no seq#
if (seqNos != null && seqNos.advanceExact(docID)) {
seqNo = seqNos.longValue();
} else {
seqNo = UNASSIGNED_SEQ_NO;
}
NumericDocValues terms = context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
if (terms != null && terms.advanceExact(docID)) {
term = terms.longValue();
} else {
term = UNASSIGNED_PRIMARY_TERM;
}

seqNo = readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID);
term = readNumericDocValues(context.reader(), SeqNoFieldMapper.PRIMARY_TERM_NAME, docID);
} else {
seqNo = UNASSIGNED_SEQ_NO;
term = UNASSIGNED_PRIMARY_TERM;
}
return new DocIdAndVersion(docID, versions.longValue(), seqNo, term, context.reader(), context.docBase);
final long version = readNumericDocValues(context.reader(), VersionFieldMapper.NAME, docID);
return new DocIdAndVersion(docID, version, seqNo, term, context.reader(), context.docBase);
} else {
return null;
}
Expand All @@ -143,9 +125,10 @@ public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderC
* returns the internal lucene doc id for the given id bytes.
* {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found
* */
private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
private int getDocID(BytesRef id, LeafReaderContext context) throws IOException {
// termsEnum can possibly be null here if this leaf contains only no-ops.
if (termsEnum != null && termsEnum.seekExact(id)) {
final Bits liveDocs = context.reader().getLiveDocs();
int docID = DocIdSetIterator.NO_MORE_DOCS;
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
docsEnum = termsEnum.postings(docsEnum, 0);
Expand All @@ -161,41 +144,23 @@ private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
}
}

private static long readNumericDocValues(LeafReader reader, String field, int docId) throws IOException {
final NumericDocValues dv = reader.getNumericDocValues(field);
if (dv == null || dv.advanceExact(docId) == false) {
assert false : "document [" + docId + "] does not have docValues for [" + field + "]";
throw new IllegalStateException("document [" + docId + "] does not have docValues for [" + field + "]");
}
return dv.longValue();
}

/** Return null if id is not found. */
DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOException {
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
// termsEnum can possibly be null here if this leaf contains only no-ops.
if (termsEnum != null && termsEnum.seekExact(id)) {
docsEnum = termsEnum.postings(docsEnum, 0);
final Bits liveDocs = context.reader().getLiveDocs();
DocIdAndSeqNo result = null;
int docID = docsEnum.nextDoc();
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
final NumericDocValues seqNoDV = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
for (; docID != DocIdSetIterator.NO_MORE_DOCS; docID = docsEnum.nextDoc()) {
final long seqNo;
// remove the null check in 7.0 once we can't read indices with no seq#
if (seqNoDV != null && seqNoDV.advanceExact(docID)) {
seqNo = seqNoDV.longValue();
} else {
seqNo = UNASSIGNED_SEQ_NO;
}
final boolean isLive = (liveDocs == null || liveDocs.get(docID));
if (isLive) {
// The live document must always be the latest copy, thus we can early terminate here.
// If a nested docs is live, we return the first doc which doesn't have term (only the last doc has term).
// This should not be an issue since we no longer use primary term as tier breaker when comparing operations.
assert result == null || result.seqNo <= seqNo :
"the live doc does not have the highest seq_no; live_seq_no=" + seqNo + " < deleted_seq_no=" + result.seqNo;
return new DocIdAndSeqNo(docID, seqNo, context, isLive);
}
if (result == null || result.seqNo < seqNo) {
result = new DocIdAndSeqNo(docID, seqNo, context, isLive);
}
}
}
return result;
final int docID = getDocID(id, context);
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
final long seqNo = readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID);
return new DocIdAndSeqNo(docID, seqNo, context);
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,11 @@ public static class DocIdAndSeqNo {
public final int docId;
public final long seqNo;
public final LeafReaderContext context;
public final boolean isLive;

DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context, boolean isLive) {
DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) {
this.docId = docId;
this.seqNo = seqNo;
this.context = context;
this.isLive = isLive;
}
}

Expand Down Expand Up @@ -149,32 +147,21 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term,

/**
* Loads the internal docId and sequence number of the latest copy for a given uid from the provided reader.
* The flag {@link DocIdAndSeqNo#isLive} indicates whether the returned document is live or (soft)deleted.
* This returns {@code null} if no such document matching the given term uid.
* The result is either null or the live and latest version of the given uid.
*/
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException {
final PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
final List<LeafReaderContext> leaves = reader.leaves();
DocIdAndSeqNo latest = null;
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
final LeafReaderContext leaf = leaves.get(i);
final PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
final DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf);
if (result == null) {
continue;
}
if (result.isLive) {
// The live document must always be the latest copy, thus we can early terminate here.
assert latest == null || latest.seqNo <= result.seqNo :
"the live doc does not have the highest seq_no; live_seq_no=" + result.seqNo + " < deleted_seq_no=" + latest.seqNo;
if (result != null) {
return result;
}
if (latest == null || latest.seqNo < result.seqNo) {
latest = result;
}
}
return latest;
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -705,11 +705,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
if (docAndSeqNo == null) {
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
} else if (op.seqNo() > docAndSeqNo.seqNo) {
if (docAndSeqNo.isLive) {
status = OpVsLuceneDocStatus.OP_NEWER;
} else {
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
}
status = OpVsLuceneDocStatus.OP_NEWER;
} else if (op.seqNo() == docAndSeqNo.seqNo) {
assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false :
"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.test.ESTestCase;

Expand All @@ -52,6 +53,8 @@ public void testSimple() throws Exception {
Document doc = new Document();
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
writer.addDocument(doc);
writer.addDocument(new Document());
DirectoryReader reader = DirectoryReader.open(writer);
Expand Down Expand Up @@ -86,6 +89,8 @@ public void testTwoDocuments() throws Exception {
Document doc = new Document();
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
writer.addDocument(doc);
writer.addDocument(doc);
writer.addDocument(new Document());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -68,6 +69,8 @@ public void testVersions() throws Exception {
Document doc = new Document();
doc.add(new Field(IdFieldMapper.NAME, "1", IdFieldMapper.Defaults.FIELD_TYPE));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 1));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc);
directoryReader = reopen(directoryReader);
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(1L));
Expand All @@ -77,6 +80,8 @@ public void testVersions() throws Exception {
Field version = new NumericDocValuesField(VersionFieldMapper.NAME, 2);
doc.add(uid);
doc.add(version);
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc);
directoryReader = reopen(directoryReader);
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(2L));
Expand All @@ -86,6 +91,8 @@ public void testVersions() throws Exception {
version.setLongValue(3);
doc.add(uid);
doc.add(version);
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc);

directoryReader = reopen(directoryReader);
Expand Down Expand Up @@ -115,6 +122,8 @@ public void testNestedDocuments() throws IOException {
doc.add(new Field(IdFieldMapper.NAME, "1", IdFieldMapper.Defaults.FIELD_TYPE));
NumericDocValuesField version = new NumericDocValuesField(VersionFieldMapper.NAME, 5L);
doc.add(version);
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
docs.add(doc);

writer.updateDocuments(new Term(IdFieldMapper.NAME, "1"), docs);
Expand Down Expand Up @@ -145,6 +154,8 @@ public void testCache() throws Exception {
Document doc = new Document();
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
writer.addDocument(doc);
DirectoryReader reader = DirectoryReader.open(writer);
// should increase cache size by 1
Expand All @@ -170,6 +181,8 @@ public void testCacheFilterReader() throws Exception {
Document doc = new Document();
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
writer.addDocument(doc);
DirectoryReader reader = DirectoryReader.open(writer);
assertEquals(87, loadDocIdAndVersion(reader, new Term(IdFieldMapper.NAME, "6"), randomBoolean()).version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4027,7 +4027,6 @@ public void testSequenceIDs() throws Exception {
searchResult.close();
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42979")
public void testLookupSeqNoByIdInLucene() throws Exception {
int numOps = between(10, 100);
long seqNo = 0;
Expand Down Expand Up @@ -4062,20 +4061,23 @@ public void testLookupSeqNoByIdInLucene() throws Exception {
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))) {
CheckedRunnable<IOException> lookupAndCheck = () -> {
try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
for (String id : latestOps.keySet()) {
String msg = "latestOps=" + latestOps + " op=" + id;
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(id));
assertThat(msg, docIdAndSeqNo.seqNo, equalTo(latestOps.get(id).seqNo()));
assertThat(msg, docIdAndSeqNo.isLive,
equalTo(latestOps.get(id).operationType() == Engine.Operation.TYPE.INDEX));
}
assertThat(VersionsAndSeqNoResolver.loadDocIdAndVersion(
searcher.reader(), newUid("any-" + between(1, 10)), randomBoolean()), nullValue());
Map<String, Long> liveOps = latestOps.entrySet().stream()
.filter(e -> e.getValue().operationType() == Engine.Operation.TYPE.INDEX)
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().seqNo()));
assertThat(getDocIds(engine, true).stream().collect(Collectors.toMap(e -> e.getId(), e -> e.getSeqNo())),
equalTo(liveOps));
for (String id : latestOps.keySet()) {
String msg = "latestOps=" + latestOps + " op=" + id;
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(id));
if (liveOps.containsKey(id) == false) {
assertNull(msg, docIdAndSeqNo);
} else {
assertNotNull(msg, docIdAndSeqNo);
assertThat(msg, docIdAndSeqNo.seqNo, equalTo(latestOps.get(id).seqNo()));
}
}
String notFoundId = randomValueOtherThanMany(liveOps::containsKey, () -> Long.toString(randomNonNegativeLong()));
assertNull(VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(notFoundId)));
}
};
for (Engine.Operation op : operations) {
Expand Down