diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java index 3a037bed62b7f..edbe042d55114 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java @@ -140,16 +140,36 @@ private int getDocID(BytesRef id, Bits liveDocs) throws IOException { DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOException { assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) : "context's reader is not the same as the reader class was initialized on."; - int docID = getDocID(id, context.reader().getLiveDocs()); - if (docID != DocIdSetIterator.NO_MORE_DOCS) { - NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME); - long seqNo; - if (seqNos != null && seqNos.advanceExact(docID)) { - seqNo = seqNos.longValue(); - } else { - seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + // termsEnum can possibly be null here if this leaf contains only no-ops. + if (termsEnum != null && termsEnum.seekExact(id)) { + docsEnum = termsEnum.postings(docsEnum, 0); + final Bits liveDocs = context.reader().getLiveDocs(); + DocIdAndSeqNo result = null; + int docID = docsEnum.nextDoc(); + if (docID != DocIdSetIterator.NO_MORE_DOCS) { + final NumericDocValues seqNoDV = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + for (; docID != DocIdSetIterator.NO_MORE_DOCS; docID = docsEnum.nextDoc()) { + final long seqNo; + if (seqNoDV != null && seqNoDV.advanceExact(docID)) { + seqNo = seqNoDV.longValue(); + } else { + seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + } + final boolean isLive = (liveDocs == null || liveDocs.get(docID)); + if (isLive) { + // The live document must always be the latest copy, thus we can early terminate here. + // If a nested docs is live, we return the first doc which doesn't have term (only the last doc has term). + // This should not be an issue since we no longer use primary term as tier breaker when comparing operations. + assert result == null || result.seqNo <= seqNo : + "the live doc does not have the highest seq_no; live_seq_no=" + seqNo + " < deleted_seq_no=" + result.seqNo; + return new DocIdAndSeqNo(docID, seqNo, context, isLive); + } + if (result == null || result.seqNo < seqNo) { + result = new DocIdAndSeqNo(docID, seqNo, context, isLive); + } + } } - return new DocIdAndSeqNo(docID, seqNo, context); + return result; } else { return null; } diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java index 9db7e3716d51a..7ebcb8998441d 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -22,11 +22,9 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.Term; import org.apache.lucene.util.CloseableThreadLocal; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.index.mapper.SeqNoFieldMapper; import java.io.IOException; import java.util.List; @@ -114,11 +112,13 @@ public static class DocIdAndSeqNo { public final int docId; public final long seqNo; public final LeafReaderContext context; + public final boolean isLive; - DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) { + DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context, boolean isLive) { this.docId = docId; this.seqNo = seqNo; this.context = context; + this.isLive = isLive; } } @@ -146,41 +146,34 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) } /** - * Load the internal doc ID and sequence number for the uid from the reader, returning + * Loads the internal docId and sequence number of the latest copy for a given uid from the provided reader. + * The flag {@link DocIdAndSeqNo#isLive} indicates whether the returned document is live or (soft)deleted. + * This returns {@code null} if no such document matching the given term uid. */ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException { - PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field()); - List leaves = reader.leaves(); + final PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field()); + final List leaves = reader.leaves(); + DocIdAndSeqNo latest = null; // iterate backwards to optimize for the frequently updated documents // which are likely to be in the last segments for (int i = leaves.size() - 1; i >= 0; i--) { final LeafReaderContext leaf = leaves.get(i); - PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord]; - DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf); - if (result != null) { + final PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord]; + final DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf); + if (result == null) { + continue; + } + if (result.isLive) { + // The live document must always be the latest copy, thus we can early terminate here. + assert latest == null || latest.seqNo <= result.seqNo : + "the live doc does not have the highest seq_no; live_seq_no=" + result.seqNo + " < deleted_seq_no=" + latest.seqNo; return result; } + if (latest == null || latest.seqNo < result.seqNo) { + latest = result; + } } - return null; - } - - /** - * Load the primaryTerm associated with the given {@link DocIdAndSeqNo} - */ - public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo, String uidField) throws IOException { - NumericDocValues primaryTerms = docIdAndSeqNo.context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); - long result; - if (primaryTerms != null && primaryTerms.advanceExact(docIdAndSeqNo.docId)) { - result = primaryTerms.longValue(); - } else { - result = 0; - } - assert result > 0 : "should always resolve a primary term for a resolved sequence number. primary_term [" + result + "]" - + " docId [" + docIdAndSeqNo.docId + "] seqNo [" + docIdAndSeqNo.seqNo + "]"; - return result; + return latest; } /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index c58e13d65deb6..31d1cfb660f0e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -666,31 +666,32 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) VersionValue versionValue = getVersionFromMap(op.uid().bytes()); assert incrementVersionLookup(); if (versionValue != null) { - if (op.seqNo() > versionValue.seqNo || - (op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) + if (op.seqNo() > versionValue.seqNo) { status = OpVsLuceneDocStatus.OP_NEWER; - else { + } else if (op.seqNo() == versionValue.seqNo) { + assert versionValue.term == op.primaryTerm() : "primary term not matched; id=" + op.id() + " seq_no=" + op.seqNo() + + " op_term=" + op.primaryTerm() + " existing_term=" + versionValue.term; + status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } else { status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } } else { // load from index assert incrementIndexVersionLookup(); try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) { - DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid()); + final DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid()); if (docAndSeqNo == null) { status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; } else if (op.seqNo() > docAndSeqNo.seqNo) { - status = OpVsLuceneDocStatus.OP_NEWER; - } else if (op.seqNo() == docAndSeqNo.seqNo) { - assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false : - "local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id(); - // load term to tie break - final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo, op.uid().field()); - if (op.primaryTerm() > existingTerm) { + if (docAndSeqNo.isLive) { status = OpVsLuceneDocStatus.OP_NEWER; } else { - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; } + } else if (op.seqNo() == docAndSeqNo.seqNo) { + assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false : + "local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id(); + status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } else { status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 379043fa93954..6d4ee6b77566e 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -76,6 +76,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; @@ -139,6 +140,7 @@ import java.util.Base64; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; @@ -3681,6 +3683,81 @@ public void testSequenceIDs() throws Exception { searchResult.close(); } + public void testLookupSeqNoByIdInLucene() throws Exception { + int numOps = between(10, 100); + long seqNo = 0; + List operations = new ArrayList<>(numOps); + for (int i = 0; i < numOps; i++) { + String id = Integer.toString(between(1, 50)); + boolean isIndexing = randomBoolean(); + int copies = frequently() ? 1 : between(2, 4); + for (int c = 0; c < copies; c++) { + final ParsedDocument doc = EngineTestCase.createParsedDoc(id, null); + if (isIndexing) { + operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(), + i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), -1, true)); + } else { + operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(), + i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis())); + } + } + seqNo++; + if (rarely()) { + seqNo++; + } + } + Randomness.shuffle(operations); + Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); + final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); + Map latestOps = new HashMap<>(); // id -> latest seq_no + try (Store store = createStore(); + InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))) { + CheckedRunnable lookupAndCheck = () -> { + try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + for (String id : latestOps.keySet()) { + String msg = "latestOps=" + latestOps + " op=" + id; + DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(id)); + assertThat(msg, docIdAndSeqNo.seqNo, equalTo(latestOps.get(id).seqNo())); + assertThat(msg, docIdAndSeqNo.isLive, equalTo(latestOps.get(id).operationType() == Engine.Operation.TYPE.INDEX)); + } + assertThat(VersionsAndSeqNoResolver.loadDocIdAndVersion( + searcher.reader(), newUid("any-" + between(1, 10))), nullValue()); + Map liveOps = latestOps.entrySet().stream() + .filter(e -> e.getValue().operationType() == Engine.Operation.TYPE.INDEX) + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().seqNo())); + assertThat(getDocIds(engine, true).stream().collect(Collectors.toMap(e -> e.getId(), e -> e.getSeqNo())), + equalTo(liveOps)); + } + }; + for (Engine.Operation op : operations) { + if (op instanceof Engine.Index) { + engine.index((Engine.Index) op); + if (latestOps.containsKey(op.id()) == false || latestOps.get(op.id()).seqNo() < op.seqNo()) { + latestOps.put(op.id(), op); + } + } else if (op instanceof Engine.Delete) { + engine.delete((Engine.Delete) op); + if (latestOps.containsKey(op.id()) == false || latestOps.get(op.id()).seqNo() < op.seqNo()) { + latestOps.put(op.id(), op); + } + } + if (randomInt(100) < 10) { + engine.refresh("test"); + lookupAndCheck.run(); + } + if (rarely()) { + engine.flush(); + lookupAndCheck.run(); + } + } + engine.refresh("test"); + lookupAndCheck.run(); + } + } + /** * A sequence number generator that will generate a sequence number and if {@code stall} is set to true will wait on the barrier and the * referenced latch before returning. If the local checkpoint should advance (because {@code stall} is false, then the value of @@ -4059,7 +4136,11 @@ private Tuple getSequenceID(Engine engine, Engine.Get get) throws En seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; } else { seqNo = docIdAndSeqNo.seqNo; - primaryTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docIdAndSeqNo, get.uid().field()); + NumericDocValues primaryTerms = docIdAndSeqNo.context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + if (primaryTerms == null || primaryTerms.advanceExact(docIdAndSeqNo.docId) == false) { + throw new AssertionError("document does not have primary term [" + docIdAndSeqNo.docId + "]"); + } + primaryTerm = primaryTerms.longValue(); } return new Tuple<>(seqNo, primaryTerm); } catch (Exception e) { @@ -5164,6 +5245,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception { commits.add(new ArrayList<>()); try (Store store = createStore()) { EngineConfig config = config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get); + final List docs; try (InternalEngine engine = createEngine(config)) { List flushedOperations = new ArrayList<>(); for (Engine.Operation op : operations) { @@ -5186,6 +5268,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception { } globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); engine.syncTranslog(); + docs = getDocIds(engine, true); } trimUnsafeCommits(config); List safeCommit = null; @@ -5202,6 +5285,9 @@ public void testRebuildLocalCheckpointTracker() throws Exception { assertThat("seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + " checkpoint=" + tracker.getCheckpoint(), tracker.contains(op.seqNo()), equalTo(safeCommit.contains(op))); } + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + assertThat(getDocIds(engine, true), equalTo(docs)); } } }