diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java index 99e725b153c1f..94ba9abf58fde 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java @@ -102,38 +102,20 @@ public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderC throws IOException { assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) : "context's reader is not the same as the reader class was initialized on."; - int docID = getDocID(id, context.reader().getLiveDocs()); + int docID = getDocID(id, context); if (docID != DocIdSetIterator.NO_MORE_DOCS) { - final NumericDocValues versions = context.reader().getNumericDocValues(VersionFieldMapper.NAME); - if (versions == null) { - throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME + "] field"); - } - if (versions.advanceExact(docID) == false) { - throw new IllegalArgumentException("Document [" + docID + "] misses the [" + VersionFieldMapper.NAME + "] field"); - } final long seqNo; final long term; if (loadSeqNo) { - NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME); - // remove the null check in 7.0 once we can't read indices with no seq# - if (seqNos != null && seqNos.advanceExact(docID)) { - seqNo = seqNos.longValue(); - } else { - seqNo = UNASSIGNED_SEQ_NO; - } - NumericDocValues terms = context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); - if (terms != null && terms.advanceExact(docID)) { - term = terms.longValue(); - } else { - term = UNASSIGNED_PRIMARY_TERM; - } - + seqNo = readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID); + term = readNumericDocValues(context.reader(), SeqNoFieldMapper.PRIMARY_TERM_NAME, docID); } else { seqNo = UNASSIGNED_SEQ_NO; term = UNASSIGNED_PRIMARY_TERM; } - return new DocIdAndVersion(docID, versions.longValue(), seqNo, term, context.reader(), context.docBase); + final long version = readNumericDocValues(context.reader(), VersionFieldMapper.NAME, docID); + return new DocIdAndVersion(docID, version, seqNo, term, context.reader(), context.docBase); } else { return null; } @@ -143,9 +125,10 @@ public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderC * returns the internal lucene doc id for the given id bytes. * {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found * */ - private int getDocID(BytesRef id, Bits liveDocs) throws IOException { + private int getDocID(BytesRef id, LeafReaderContext context) throws IOException { // termsEnum can possibly be null here if this leaf contains only no-ops. if (termsEnum != null && termsEnum.seekExact(id)) { + final Bits liveDocs = context.reader().getLiveDocs(); int docID = DocIdSetIterator.NO_MORE_DOCS; // there may be more than one matching docID, in the case of nested docs, so we want the last one: docsEnum = termsEnum.postings(docsEnum, 0); @@ -161,41 +144,23 @@ private int getDocID(BytesRef id, Bits liveDocs) throws IOException { } } + private static long readNumericDocValues(LeafReader reader, String field, int docId) throws IOException { + final NumericDocValues dv = reader.getNumericDocValues(field); + if (dv == null || dv.advanceExact(docId) == false) { + assert false : "document [" + docId + "] does not have docValues for [" + field + "]"; + throw new IllegalStateException("document [" + docId + "] does not have docValues for [" + field + "]"); + } + return dv.longValue(); + } + /** Return null if id is not found. */ DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOException { assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) : "context's reader is not the same as the reader class was initialized on."; - // termsEnum can possibly be null here if this leaf contains only no-ops. - if (termsEnum != null && termsEnum.seekExact(id)) { - docsEnum = termsEnum.postings(docsEnum, 0); - final Bits liveDocs = context.reader().getLiveDocs(); - DocIdAndSeqNo result = null; - int docID = docsEnum.nextDoc(); - if (docID != DocIdSetIterator.NO_MORE_DOCS) { - final NumericDocValues seqNoDV = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME); - for (; docID != DocIdSetIterator.NO_MORE_DOCS; docID = docsEnum.nextDoc()) { - final long seqNo; - // remove the null check in 7.0 once we can't read indices with no seq# - if (seqNoDV != null && seqNoDV.advanceExact(docID)) { - seqNo = seqNoDV.longValue(); - } else { - seqNo = UNASSIGNED_SEQ_NO; - } - final boolean isLive = (liveDocs == null || liveDocs.get(docID)); - if (isLive) { - // The live document must always be the latest copy, thus we can early terminate here. - // If a nested docs is live, we return the first doc which doesn't have term (only the last doc has term). - // This should not be an issue since we no longer use primary term as tier breaker when comparing operations. - assert result == null || result.seqNo <= seqNo : - "the live doc does not have the highest seq_no; live_seq_no=" + seqNo + " < deleted_seq_no=" + result.seqNo; - return new DocIdAndSeqNo(docID, seqNo, context, isLive); - } - if (result == null || result.seqNo < seqNo) { - result = new DocIdAndSeqNo(docID, seqNo, context, isLive); - } - } - } - return result; + final int docID = getDocID(id, context); + if (docID != DocIdSetIterator.NO_MORE_DOCS) { + final long seqNo = readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID); + return new DocIdAndSeqNo(docID, seqNo, context); } else { return null; } diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java index 611887342adca..bfd859c115fe2 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -114,13 +114,11 @@ public static class DocIdAndSeqNo { public final int docId; public final long seqNo; public final LeafReaderContext context; - public final boolean isLive; - DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context, boolean isLive) { + DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) { this.docId = docId; this.seqNo = seqNo; this.context = context; - this.isLive = isLive; } } @@ -149,32 +147,21 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term, /** * Loads the internal docId and sequence number of the latest copy for a given uid from the provided reader. - * The flag {@link DocIdAndSeqNo#isLive} indicates whether the returned document is live or (soft)deleted. - * This returns {@code null} if no such document matching the given term uid. + * The result is either null or the live and latest version of the given uid. */ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException { final PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field()); final List leaves = reader.leaves(); - DocIdAndSeqNo latest = null; // iterate backwards to optimize for the frequently updated documents // which are likely to be in the last segments for (int i = leaves.size() - 1; i >= 0; i--) { final LeafReaderContext leaf = leaves.get(i); final PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord]; final DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf); - if (result == null) { - continue; - } - if (result.isLive) { - // The live document must always be the latest copy, thus we can early terminate here. - assert latest == null || latest.seqNo <= result.seqNo : - "the live doc does not have the highest seq_no; live_seq_no=" + result.seqNo + " < deleted_seq_no=" + latest.seqNo; + if (result != null) { return result; } - if (latest == null || latest.seqNo < result.seqNo) { - latest = result; - } } - return latest; + return null; } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index cdd6cd86e6afb..6249dee2f7ca4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -705,11 +705,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) if (docAndSeqNo == null) { status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; } else if (op.seqNo() > docAndSeqNo.seqNo) { - if (docAndSeqNo.isLive) { - status = OpVsLuceneDocStatus.OP_NEWER; - } else { - status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; - } + status = OpVsLuceneDocStatus.OP_NEWER; } else if (op.seqNo() == docAndSeqNo.seqNo) { assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false : "local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id(); diff --git a/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java b/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java index 28b02de80f63c..1c55848cb04b2 100644 --- a/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java +++ b/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; import org.elasticsearch.index.mapper.IdFieldMapper; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.test.ESTestCase; @@ -52,6 +53,8 @@ public void testSimple() throws Exception { Document doc = new Document(); doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE)); doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87)); + doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong())); + doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE))); writer.addDocument(doc); writer.addDocument(new Document()); DirectoryReader reader = DirectoryReader.open(writer); @@ -86,6 +89,8 @@ public void testTwoDocuments() throws Exception { Document doc = new Document(); doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE)); doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87)); + doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong())); + doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE))); writer.addDocument(doc); writer.addDocument(doc); writer.addDocument(new Document()); diff --git a/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java b/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java index 94945dc92c952..e3e558395ca78 100644 --- a/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java +++ b/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.index.mapper.IdFieldMapper; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; @@ -68,6 +69,8 @@ public void testVersions() throws Exception { Document doc = new Document(); doc.add(new Field(IdFieldMapper.NAME, "1", IdFieldMapper.Defaults.FIELD_TYPE)); doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 1)); + doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong())); + doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE))); writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc); directoryReader = reopen(directoryReader); assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(1L)); @@ -77,6 +80,8 @@ public void testVersions() throws Exception { Field version = new NumericDocValuesField(VersionFieldMapper.NAME, 2); doc.add(uid); doc.add(version); + doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong())); + doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE))); writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc); directoryReader = reopen(directoryReader); assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(2L)); @@ -86,6 +91,8 @@ public void testVersions() throws Exception { version.setLongValue(3); doc.add(uid); doc.add(version); + doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong())); + doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE))); writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc); directoryReader = reopen(directoryReader); @@ -115,6 +122,8 @@ public void testNestedDocuments() throws IOException { doc.add(new Field(IdFieldMapper.NAME, "1", IdFieldMapper.Defaults.FIELD_TYPE)); NumericDocValuesField version = new NumericDocValuesField(VersionFieldMapper.NAME, 5L); doc.add(version); + doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong())); + doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE))); docs.add(doc); writer.updateDocuments(new Term(IdFieldMapper.NAME, "1"), docs); @@ -145,6 +154,8 @@ public void testCache() throws Exception { Document doc = new Document(); doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE)); doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87)); + doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong())); + doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE))); writer.addDocument(doc); DirectoryReader reader = DirectoryReader.open(writer); // should increase cache size by 1 @@ -170,6 +181,8 @@ public void testCacheFilterReader() throws Exception { Document doc = new Document(); doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE)); doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87)); + doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong())); + doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE))); writer.addDocument(doc); DirectoryReader reader = DirectoryReader.open(writer); assertEquals(87, loadDocIdAndVersion(reader, new Term(IdFieldMapper.NAME, "6"), randomBoolean()).version); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 9aaf6c704beae..b0e4fb102ea42 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4027,7 +4027,6 @@ public void testSequenceIDs() throws Exception { searchResult.close(); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42979") public void testLookupSeqNoByIdInLucene() throws Exception { int numOps = between(10, 100); long seqNo = 0; @@ -4062,20 +4061,23 @@ public void testLookupSeqNoByIdInLucene() throws Exception { InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))) { CheckedRunnable lookupAndCheck = () -> { try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { - for (String id : latestOps.keySet()) { - String msg = "latestOps=" + latestOps + " op=" + id; - DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(id)); - assertThat(msg, docIdAndSeqNo.seqNo, equalTo(latestOps.get(id).seqNo())); - assertThat(msg, docIdAndSeqNo.isLive, - equalTo(latestOps.get(id).operationType() == Engine.Operation.TYPE.INDEX)); - } - assertThat(VersionsAndSeqNoResolver.loadDocIdAndVersion( - searcher.reader(), newUid("any-" + between(1, 10)), randomBoolean()), nullValue()); Map liveOps = latestOps.entrySet().stream() .filter(e -> e.getValue().operationType() == Engine.Operation.TYPE.INDEX) .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().seqNo())); assertThat(getDocIds(engine, true).stream().collect(Collectors.toMap(e -> e.getId(), e -> e.getSeqNo())), equalTo(liveOps)); + for (String id : latestOps.keySet()) { + String msg = "latestOps=" + latestOps + " op=" + id; + DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(id)); + if (liveOps.containsKey(id) == false) { + assertNull(msg, docIdAndSeqNo); + } else { + assertNotNull(msg, docIdAndSeqNo); + assertThat(msg, docIdAndSeqNo.seqNo, equalTo(latestOps.get(id).seqNo())); + } + } + String notFoundId = randomValueOtherThanMany(liveOps::containsKey, () -> Long.toString(randomNonNegativeLong())); + assertNull(VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(notFoundId))); } }; for (Engine.Operation op : operations) {