Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,36 @@ private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOException {
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
int docID = getDocID(id, context.reader().getLiveDocs());
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
long seqNo;
if (seqNos != null && seqNos.advanceExact(docID)) {
seqNo = seqNos.longValue();
} else {
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
// termsEnum can possibly be null here if this leaf contains only no-ops.
if (termsEnum != null && termsEnum.seekExact(id)) {
docsEnum = termsEnum.postings(docsEnum, 0);
final Bits liveDocs = context.reader().getLiveDocs();
DocIdAndSeqNo result = null;
int docID = docsEnum.nextDoc();
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
final NumericDocValues seqNoDV = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
for (; docID != DocIdSetIterator.NO_MORE_DOCS; docID = docsEnum.nextDoc()) {
final long seqNo;
if (seqNoDV != null && seqNoDV.advanceExact(docID)) {
seqNo = seqNoDV.longValue();
} else {
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
final boolean isLive = (liveDocs == null || liveDocs.get(docID));
if (isLive) {
// The live document must always be the latest copy, thus we can early terminate here.
// If a nested docs is live, we return the first doc which doesn't have term (only the last doc has term).
// This should not be an issue since we no longer use primary term as tier breaker when comparing operations.
assert result == null || result.seqNo <= seqNo :
"the live doc does not have the highest seq_no; live_seq_no=" + seqNo + " < deleted_seq_no=" + result.seqNo;
return new DocIdAndSeqNo(docID, seqNo, context, isLive);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked the assertion you had there that if already have a result, this one has a higher seq no

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I pushed 33675c4.

}
if (result == null || result.seqNo < seqNo) {
result = new DocIdAndSeqNo(docID, seqNo, context, isLive);
}
}
}
return new DocIdAndSeqNo(docID, seqNo, context);
return result;
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.Term;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -114,11 +112,13 @@ public static class DocIdAndSeqNo {
public final int docId;
public final long seqNo;
public final LeafReaderContext context;
public final boolean isLive;

DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) {
DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context, boolean isLive) {
this.docId = docId;
this.seqNo = seqNo;
this.context = context;
this.isLive = isLive;
}
}

Expand Down Expand Up @@ -146,41 +146,34 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term)
}

/**
* Load the internal doc ID and sequence number for the uid from the reader, returning<ul>
* <li>null if the uid wasn't found,
* <li>a doc ID and the associated seqNo otherwise
* </ul>
* Loads the internal docId and sequence number of the latest copy for a given uid from the provided reader.
* The flag {@link DocIdAndSeqNo#isLive} indicates whether the returned document is live or (soft)deleted.
* This returns {@code null} if no such document matching the given term uid.
*/
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException {
PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
List<LeafReaderContext> leaves = reader.leaves();
final PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
final List<LeafReaderContext> leaves = reader.leaves();
DocIdAndSeqNo latest = null;
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
final LeafReaderContext leaf = leaves.get(i);
PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf);
if (result != null) {
final PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
final DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf);
if (result == null) {
continue;
}
if (result.isLive) {
// The live document must always be the latest copy, thus we can early terminate here.
assert latest == null || latest.seqNo <= result.seqNo :
"the live doc does not have the highest seq_no; live_seq_no=" + result.seqNo + " < deleted_seq_no=" + latest.seqNo;
return result;
}
if (latest == null || latest.seqNo < result.seqNo) {
latest = result;
}
}
return null;
}

/**
* Load the primaryTerm associated with the given {@link DocIdAndSeqNo}
*/
public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo, String uidField) throws IOException {
NumericDocValues primaryTerms = docIdAndSeqNo.context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
long result;
if (primaryTerms != null && primaryTerms.advanceExact(docIdAndSeqNo.docId)) {
result = primaryTerms.longValue();
} else {
result = 0;
}
assert result > 0 : "should always resolve a primary term for a resolved sequence number. primary_term [" + result + "]"
+ " docId [" + docIdAndSeqNo.docId + "] seqNo [" + docIdAndSeqNo.seqNo + "]";
return result;
return latest;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,31 +666,32 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
VersionValue versionValue = getVersionFromMap(op.uid().bytes());
assert incrementVersionLookup();
if (versionValue != null) {
if (op.seqNo() > versionValue.seqNo ||
(op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term))
if (op.seqNo() > versionValue.seqNo) {
status = OpVsLuceneDocStatus.OP_NEWER;
else {
} else if (op.seqNo() == versionValue.seqNo) {
assert versionValue.term == op.primaryTerm() : "primary term not matched; id=" + op.id() + " seq_no=" + op.seqNo()
+ " op_term=" + op.primaryTerm() + " existing_term=" + versionValue.term;
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
} else {
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
}
} else {
// load from index
assert incrementIndexVersionLookup();
try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) {
DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid());
final DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid());
if (docAndSeqNo == null) {
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
} else if (op.seqNo() > docAndSeqNo.seqNo) {
status = OpVsLuceneDocStatus.OP_NEWER;
} else if (op.seqNo() == docAndSeqNo.seqNo) {
assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false :
"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();
// load term to tie break
final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo, op.uid().field());
if (op.primaryTerm() > existingTerm) {
if (docAndSeqNo.isLive) {
status = OpVsLuceneDocStatus.OP_NEWER;
} else {
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
}
} else if (op.seqNo() == docAndSeqNo.seqNo) {
assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false :
"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
} else {
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
Expand Down Expand Up @@ -139,6 +140,7 @@
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -3681,6 +3683,81 @@ public void testSequenceIDs() throws Exception {
searchResult.close();
}

public void testLookupSeqNoByIdInLucene() throws Exception {
int numOps = between(10, 100);
long seqNo = 0;
List<Engine.Operation> operations = new ArrayList<>(numOps);
for (int i = 0; i < numOps; i++) {
String id = Integer.toString(between(1, 50));
boolean isIndexing = randomBoolean();
int copies = frequently() ? 1 : between(2, 4);
for (int c = 0; c < copies; c++) {
final ParsedDocument doc = EngineTestCase.createParsedDoc(id, null);
if (isIndexing) {
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), -1, true));
} else {
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis()));
}
}
seqNo++;
if (rarely()) {
seqNo++;
}
}
Randomness.shuffle(operations);
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
Map<String, Engine.Operation> latestOps = new HashMap<>(); // id -> latest seq_no
try (Store store = createStore();
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))) {
CheckedRunnable<IOException> lookupAndCheck = () -> {
try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
for (String id : latestOps.keySet()) {
String msg = "latestOps=" + latestOps + " op=" + id;
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(id));
assertThat(msg, docIdAndSeqNo.seqNo, equalTo(latestOps.get(id).seqNo()));
assertThat(msg, docIdAndSeqNo.isLive, equalTo(latestOps.get(id).operationType() == Engine.Operation.TYPE.INDEX));
}
assertThat(VersionsAndSeqNoResolver.loadDocIdAndVersion(
searcher.reader(), newUid("any-" + between(1, 10))), nullValue());
Map<String, Long> liveOps = latestOps.entrySet().stream()
.filter(e -> e.getValue().operationType() == Engine.Operation.TYPE.INDEX)
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().seqNo()));
assertThat(getDocIds(engine, true).stream().collect(Collectors.toMap(e -> e.getId(), e -> e.getSeqNo())),
equalTo(liveOps));
}
};
for (Engine.Operation op : operations) {
if (op instanceof Engine.Index) {
engine.index((Engine.Index) op);
if (latestOps.containsKey(op.id()) == false || latestOps.get(op.id()).seqNo() < op.seqNo()) {
latestOps.put(op.id(), op);
}
} else if (op instanceof Engine.Delete) {
engine.delete((Engine.Delete) op);
if (latestOps.containsKey(op.id()) == false || latestOps.get(op.id()).seqNo() < op.seqNo()) {
latestOps.put(op.id(), op);
}
}
if (randomInt(100) < 10) {
engine.refresh("test");
lookupAndCheck.run();
}
if (rarely()) {
engine.flush();
lookupAndCheck.run();
}
}
engine.refresh("test");
lookupAndCheck.run();
}
}

/**
* A sequence number generator that will generate a sequence number and if {@code stall} is set to true will wait on the barrier and the
* referenced latch before returning. If the local checkpoint should advance (because {@code stall} is false, then the value of
Expand Down Expand Up @@ -4059,7 +4136,11 @@ private Tuple<Long, Long> getSequenceID(Engine engine, Engine.Get get) throws En
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
} else {
seqNo = docIdAndSeqNo.seqNo;
primaryTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docIdAndSeqNo, get.uid().field());
NumericDocValues primaryTerms = docIdAndSeqNo.context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
if (primaryTerms == null || primaryTerms.advanceExact(docIdAndSeqNo.docId) == false) {
throw new AssertionError("document does not have primary term [" + docIdAndSeqNo.docId + "]");
}
primaryTerm = primaryTerms.longValue();
}
return new Tuple<>(seqNo, primaryTerm);
} catch (Exception e) {
Expand Down Expand Up @@ -5164,6 +5245,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
commits.add(new ArrayList<>());
try (Store store = createStore()) {
EngineConfig config = config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get);
final List<DocIdSeqNoAndTerm> docs;
try (InternalEngine engine = createEngine(config)) {
List<Engine.Operation> flushedOperations = new ArrayList<>();
for (Engine.Operation op : operations) {
Expand All @@ -5186,6 +5268,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
}
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.syncTranslog();
docs = getDocIds(engine, true);
}
trimUnsafeCommits(config);
List<Engine.Operation> safeCommit = null;
Expand All @@ -5202,6 +5285,9 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
assertThat("seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + " checkpoint=" + tracker.getCheckpoint(),
tracker.contains(op.seqNo()), equalTo(safeCommit.contains(op)));
}
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertThat(getDocIds(engine, true), equalTo(docs));
}
}
}
Expand Down