From e24b4e6de49fa2224a51666964724f89ece71108 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 23 Apr 2018 10:24:15 -0400 Subject: [PATCH 1/4] Index stale operations to Lucene to have a complete history Today, when processing out of order operations, we only add it into translog but skip adding into Lucene. Translog, therefore, has a complete history of sequence numbers while Lucene does not. Since we would like to have a complete history in Lucene, this change makes sure that stale operations will be added to Lucene as soft-deleted documents if required. --- .../elasticsearch/common/lucene/Lucene.java | 53 ++++++ .../uid/PerThreadIDVersionAndSeqNoLookup.java | 11 ++ .../lucene/uid/VersionsAndSeqNoResolver.java | 30 +++ .../index/engine/InternalEngine.java | 176 +++++++++++++----- .../index/engine/InternalEngineTests.java | 71 ++++++- .../index/shard/IndexShardTests.java | 27 +++ .../index/engine/EngineTestCase.java | 37 +++- 7 files changed, 346 insertions(+), 59 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index f707ecc1fe65c..63df96919b164 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -30,6 +30,7 @@ import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.index.FilterLeafReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFileNames; @@ -833,6 +834,58 @@ public int length() { }; } + /** + * Wraps a directory reader to include soft-deleted documents. + * This should be only used to query the history of documents rather than the documents. + * + * @param in the input directory reader + * @return the wrapped reader including soft-deleted documents. + */ + public static DirectoryReader includeSoftDeletes(DirectoryReader in) throws IOException { + return new DirectoryReaderWithSoftDeletes(in); + } + + private static final class DirectoryReaderWithSoftDeletes extends FilterDirectoryReader { + static final class SubReaderWithSoftDeletes extends FilterLeafReader { + SubReaderWithSoftDeletes(LeafReader in) { + super(in); + } + @Override + public Bits getLiveDocs() { + return null; + } + @Override + public int numDocs() { + return maxDoc(); + } + @Override + public CacheHelper getCoreCacheHelper() { + return in.getCoreCacheHelper(); + } + @Override + public CacheHelper getReaderCacheHelper() { + return null; // Modifying liveDocs + } + } + DirectoryReaderWithSoftDeletes(DirectoryReader in) throws IOException { + super(in, new FilterDirectoryReader.SubReaderWrapper() { + @Override + public LeafReader wrap(LeafReader leaf) { + return new SubReaderWithSoftDeletes(leaf); + } + }); + } + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { + return includeSoftDeletes(in); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return null; // Modifying liveDocs + } + } + /** * Returns a numeric docvalues which can be used to soft-delete documents. */ diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java index 38fcdfe5f1b62..d032dbf3917e9 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java @@ -145,4 +145,15 @@ DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOExcep return null; } } + + /** + * Returns an internal posting list of the given uid + */ + PostingsEnum getPostingsOrNull(BytesRef id) throws IOException { + if (termsEnum.seekExact(id)) { + docsEnum = termsEnum.postings(docsEnum, 0); + return docsEnum; + } + return null; + } } diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java index 9db7e3716d51a..1e82e8810dfba 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -23,7 +23,9 @@ import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.PostingsEnum; import org.apache.lucene.index.Term; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.util.CloseableThreadLocal; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.mapper.SeqNoFieldMapper; @@ -193,4 +195,32 @@ public static long loadVersion(IndexReader reader, Term term) throws IOException final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term); return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version; } + + /** + * Checks for the existence of the history of a pair SeqNo/PrimaryTerm in Lucene. The checking pair is considered as existed + * if there is a pair such as the seqNo equals to the checking seqNo and the primary term is at least the checking term. + */ + public static boolean hasHistoryInLucene(IndexReader reader, Term idTerm, long seqNo, long primaryTerm) throws IOException { + final PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, idTerm.field()); + final List leaves = reader.leaves(); + // iterate backwards to optimize for the frequently updated documents which are likely to be in the last segments + for (int i = leaves.size() - 1; i >= 0; i--) { + final LeafReaderContext leaf = leaves.get(i); + final PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord]; + final PostingsEnum postingsEnum = lookup.getPostingsOrNull(idTerm.bytes()); + if (postingsEnum == null) { + continue; + } + final NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + final NumericDocValues primaryTermDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + for (int docId = postingsEnum.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = postingsEnum.nextDoc()) { + if (seqNoDV != null && seqNoDV.advanceExact(docId) && primaryTermDV != null && primaryTermDV.advanceExact(docId)) { + if (seqNoDV.longValue() == seqNo && primaryTermDV.longValue() >= primaryTerm) { + return true; + } + } + } + } + return false; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 28e4101a2217d..d8cfe11b7275e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -21,12 +21,14 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.document.Field; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; @@ -43,6 +45,7 @@ import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.InfoStream; +import org.elasticsearch.Assertions; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; @@ -577,23 +580,35 @@ public GetResult get(Get get, BiFunction search enum OpVsLuceneDocStatus { /** the op is more recent than the one that last modified the doc found in lucene*/ OP_NEWER, - /** the op is older or the same as the one that last modified the doc found in lucene*/ - OP_STALE_OR_EQUAL, + /** the op is stale but its history is existed in Lucene */ + OP_STALE_HISTORY_EXISTED, + /** the op is stale and its history is not found in Lucene */ + OP_STALE_HISTORY_NOT_FOUND, /** no doc was found in lucene */ LUCENE_DOC_NOT_FOUND } + private OpVsLuceneDocStatus compareToLuceneHistory(final Operation op, final Searcher searcher) throws IOException { + if (VersionsAndSeqNoResolver.hasHistoryInLucene(searcher.reader(), op.uid(), op.seqNo(), op.primaryTerm())) { + return OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTED; + } else { + return OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND; + } + } + private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; final OpVsLuceneDocStatus status; VersionValue versionValue = getVersionFromMap(op.uid().bytes()); assert incrementVersionLookup(); if (versionValue != null) { - if (op.seqNo() > versionValue.seqNo || - (op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) + if (op.seqNo() > versionValue.seqNo || + (op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) { status = OpVsLuceneDocStatus.OP_NEWER; - else { - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } else if (op.seqNo() == versionValue.seqNo && op.primaryTerm() == versionValue.term) { + status = OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTED; + } else { + status = OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND; } } else { // load from index @@ -610,10 +625,10 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) if (op.primaryTerm() > existingTerm) { status = OpVsLuceneDocStatus.OP_NEWER; } else { - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + status = compareToLuceneHistory(op, searcher); } } else { - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + status = compareToLuceneHistory(op, searcher); } } } @@ -769,10 +784,9 @@ public IndexResult index(Index index) throws IOException { if (plan.earlyResultOnPreFlightError.isPresent()) { indexResult = plan.earlyResultOnPreFlightError.get(); assert indexResult.hasFailure(); - } else if (plan.indexIntoLucene) { + } else if (plan.indexIntoLucene || plan.addStaleOpToLucene) { indexResult = indexIntoLucene(index, plan); } else { - // TODO: We need to index stale documents to have a full history in Lucene. indexResult = new IndexResult( plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } @@ -847,12 +861,14 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO // question may have been deleted in an out of order op that is not replayed. // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + opVsLucene = OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTED; } else { opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); } - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { + if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTED) { plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); + } else if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND) { + plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version()); } else { plan = IndexingStrategy.processNormally( opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.seqNo(), index.version() @@ -914,7 +930,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws IOException { assert plan.seqNoForIndexing >= 0 : "ops should have an assigned seq no.; origin: " + index.origin(); assert plan.versionForIndexing >= 0 : "version must be set. got " + plan.versionForIndexing; - assert plan.indexIntoLucene; + assert plan.indexIntoLucene || plan.addStaleOpToLucene; /* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence * number service if this is on the primary, or the existing document's sequence number if this is on the replica. The * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created. @@ -922,7 +938,9 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) index.parsedDoc().updateSeqID(plan.seqNoForIndexing, index.primaryTerm()); index.parsedDoc().version().setLongValue(plan.versionForIndexing); try { - if (plan.useLuceneUpdateDocument) { + if (plan.addStaleOpToLucene) { + addStaleDocs(index.docs(), indexWriter); + } else if (plan.useLuceneUpdateDocument) { updateDocs(index.uid(), index.docs(), indexWriter); } else { // document does not exists, we can optimize for create, but double check if assertions are running @@ -986,16 +1004,27 @@ private void addDocs(final List docs, final IndexWriter i numDocAppends.inc(docs.size()); } + private void addStaleDocs(final List docs, final IndexWriter indexWriter) throws IOException { + assert softDeleteEnabled : "Add history documents but soft-deletes is disabled"; + docs.forEach(d -> d.add(softDeleteField)); + if (docs.size() > 1) { + indexWriter.addDocuments(docs); + } else { + indexWriter.addDocument(docs.get(0)); + } + } + protected static final class IndexingStrategy { final boolean currentNotFoundOrDeleted; final boolean useLuceneUpdateDocument; final long seqNoForIndexing; final long versionForIndexing; final boolean indexIntoLucene; + final boolean addStaleOpToLucene; final Optional earlyResultOnPreFlightError; private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpdateDocument, - boolean indexIntoLucene, long seqNoForIndexing, + boolean indexIntoLucene, boolean addStaleOpToLucene, long seqNoForIndexing, long versionForIndexing, IndexResult earlyResultOnPreFlightError) { assert useLuceneUpdateDocument == false || indexIntoLucene : "use lucene update is set to true, but we're not indexing into lucene"; @@ -1008,37 +1037,40 @@ private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpda this.seqNoForIndexing = seqNoForIndexing; this.versionForIndexing = versionForIndexing; this.indexIntoLucene = indexIntoLucene; + this.addStaleOpToLucene = addStaleOpToLucene; this.earlyResultOnPreFlightError = earlyResultOnPreFlightError == null ? Optional.empty() : Optional.of(earlyResultOnPreFlightError); } static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) { - return new IndexingStrategy(true, false, true, seqNoForIndexing, 1, null); + return new IndexingStrategy(true, false, true, false, seqNoForIndexing, 1, null); } static IndexingStrategy skipDueToVersionConflict( VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) { final IndexResult result = new IndexResult(e, currentVersion); return new IndexingStrategy( - currentNotFoundOrDeleted, false, false, SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, result); + currentNotFoundOrDeleted, false, false, false, SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, result); } static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, long seqNoForIndexing, long versionForIndexing) { return new IndexingStrategy(currentNotFoundOrDeleted, currentNotFoundOrDeleted == false, - true, seqNoForIndexing, versionForIndexing, null); + true, false, seqNoForIndexing, versionForIndexing, null); } static IndexingStrategy overrideExistingAsIfNotThere( long seqNoForIndexing, long versionForIndexing) { - return new IndexingStrategy(true, true, true, seqNoForIndexing, versionForIndexing, null); + return new IndexingStrategy(true, true, true, false, seqNoForIndexing, versionForIndexing, null); } - static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, - long seqNoForIndexing, long versionForIndexing) { - return new IndexingStrategy(currentNotFoundOrDeleted, false, - false, seqNoForIndexing, versionForIndexing, null); + static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing, long versionForIndexing) { + return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, seqNoForIndexing, versionForIndexing, null); + } + + static IndexingStrategy processAsStaleOp(boolean addStaleOpToLucene, long seqNoForIndexing, long versionForIndexing) { + return new IndexingStrategy(false, false, false, addStaleOpToLucene, seqNoForIndexing, versionForIndexing, null); } } @@ -1096,7 +1128,7 @@ public DeleteResult delete(Delete delete) throws IOException { if (plan.earlyResultOnPreflightError.isPresent()) { deleteResult = plan.earlyResultOnPreflightError.get(); - } else if (plan.deleteFromLucene) { + } else if (plan.deleteFromLucene || plan.addStaleOpToLucene) { deleteResult = deleteInLucene(delete, plan); } else { deleteResult = new DeleteResult( @@ -1161,14 +1193,17 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws // question may have been deleted in an out of order op that is not replayed. // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + opVsLucene = OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTED; } else { opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); } final DeletionStrategy plan; - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { + + if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTED) { plan = DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version()); + } else if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND) { + plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version()); } else { plan = DeletionStrategy.processNormally( opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, @@ -1212,25 +1247,29 @@ protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOE private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws IOException { try { - if (plan.currentlyDeleted == false) { - // any exception that comes from this is a either an ACE or a fatal exception there - // can't be any document failures coming from this - if (softDeleteEnabled) { - final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newTombstoneDoc(delete.type(), delete.id()); - assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]"; - tombstone.updateSeqID(plan.seqNoOfDeletion, delete.primaryTerm()); - tombstone.version().setLongValue(plan.versionOfDeletion); - final ParseContext.Document doc = tombstone.docs().get(0); - doc.add(softDeleteField); - indexWriter.softUpdateDocument(delete.uid(), doc, softDeleteField); + if (softDeleteEnabled) { + final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newTombstoneDoc(delete.type(), delete.id()); + assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]"; + tombstone.updateSeqID(plan.seqNoOfDeletion, delete.primaryTerm()); + tombstone.version().setLongValue(plan.versionOfDeletion); + final ParseContext.Document doc = tombstone.docs().get(0); + doc.add(softDeleteField); + if (plan.addStaleOpToLucene || plan.currentlyDeleted) { + indexWriter.addDocument(doc); } else { - indexWriter.deleteDocuments(delete.uid()); + indexWriter.softUpdateDocument(delete.uid(), doc, softDeleteField); } + } else if (plan.currentlyDeleted == false) { + // any exception that comes from this is a either an ACE or a fatal exception there + // can't be any document failures coming from this + indexWriter.deleteDocuments(delete.uid()); + } + if (plan.deleteFromLucene) { numDocDeletes.inc(); + versionMap.putDeleteUnderLock(delete.uid().bytes(), + new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(), + engineConfig.getThreadPool().relativeTimeInMillis())); } - versionMap.putDeleteUnderLock(delete.uid().bytes(), - new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(), - engineConfig.getThreadPool().relativeTimeInMillis())); return new DeleteResult( plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false); } catch (Exception ex) { @@ -1247,12 +1286,13 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) protected static final class DeletionStrategy { // of a rare double delete final boolean deleteFromLucene; + final boolean addStaleOpToLucene; final boolean currentlyDeleted; final long seqNoOfDeletion; final long versionOfDeletion; final Optional earlyResultOnPreflightError; - private DeletionStrategy(boolean deleteFromLucene, boolean currentlyDeleted, + private DeletionStrategy(boolean deleteFromLucene, boolean addStaleOpToLucene, boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion, DeleteResult earlyResultOnPreflightError) { assert (deleteFromLucene && earlyResultOnPreflightError != null) == false : @@ -1260,6 +1300,7 @@ private DeletionStrategy(boolean deleteFromLucene, boolean currentlyDeleted, "deleteFromLucene: " + deleteFromLucene + " earlyResultOnPreFlightError:" + earlyResultOnPreflightError; this.deleteFromLucene = deleteFromLucene; + this.addStaleOpToLucene = addStaleOpToLucene; this.currentlyDeleted = currentlyDeleted; this.seqNoOfDeletion = seqNoOfDeletion; this.versionOfDeletion = versionOfDeletion; @@ -1271,16 +1312,22 @@ static DeletionStrategy skipDueToVersionConflict( VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) { final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; final DeleteResult deleteResult = new DeleteResult(e, currentVersion, unassignedSeqNo, currentlyDeleted == false); - return new DeletionStrategy(false, currentlyDeleted, unassignedSeqNo, Versions.NOT_FOUND, deleteResult); + return new DeletionStrategy(false, false, currentlyDeleted, unassignedSeqNo, Versions.NOT_FOUND, deleteResult); } static DeletionStrategy processNormally(boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion) { - return new DeletionStrategy(true, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null); + return new DeletionStrategy(true, false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null); } - public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion) { - return new DeletionStrategy(false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null); + public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, + long seqNoOfDeletion, long versionOfDeletion) { + return new DeletionStrategy(false, false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null); + } + + static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, boolean currentlyDeleted, + long seqNoOfDeletion, long versionOfDeletion) { + return new DeletionStrategy(false, addStaleOpToLucene, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null); } } @@ -1928,7 +1975,11 @@ private IndexWriter createWriter() throws IOException { // pkg-private for testing IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { - return new IndexWriter(directory, iwc); + if (Assertions.ENABLED) { + return new AssertingIndexWriter(directory, iwc); + } else { + return new IndexWriter(directory, iwc); + } } private IndexWriterConfig getIndexWriterConfig() { @@ -2286,4 +2337,35 @@ private static Map commitDataAsMap(final IndexWriter indexWriter } return commitData; } + + private final class AssertingIndexWriter extends IndexWriter { + AssertingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException { + super(d, conf); + } + @Override + public long updateDocument(Term term, Iterable doc) throws IOException { + assert softDeleteEnabled == false : "Call #updateDocument but soft-deletes is enabled"; + return super.updateDocument(term, doc); + } + @Override + public long updateDocuments(Term delTerm, Iterable> docs) throws IOException { + assert softDeleteEnabled == false : "Call #updateDocuments but soft-deletes is enabled"; + return super.updateDocuments(delTerm, docs); + } + @Override + public long deleteDocuments(Term... terms) throws IOException { + assert softDeleteEnabled == false : "Call #deleteDocuments but soft-deletes is enabled"; + return super.deleteDocuments(terms); + } + @Override + public long softUpdateDocument(Term term, Iterable doc, Field... softDeletes) throws IOException { + assert softDeleteEnabled : "Call #softUpdateDocument but soft-deletes is disabled"; + return super.softUpdateDocument(term, doc, softDeletes); + } + @Override + public long softUpdateDocuments(Term term, Iterable> docs, Field... softDeletes) throws IOException { + assert softDeleteEnabled : "Call #softUpdateDocuments but soft-deletes is disabled"; + return super.softUpdateDocuments(term, docs, softDeletes); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 3b6582c6e852d..020c8dd254b72 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -44,10 +44,12 @@ import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.LogDocMergePolicy; +import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.PointValues; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.search.IndexSearcher; @@ -81,6 +83,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; @@ -2935,10 +2938,10 @@ public void testDoubleDeliveryReplicaAppendingAndDeleteOnly() throws IOException } public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { - final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), + final Supplier doc = () -> testParsedDocument("1", null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); - Engine.Index operation = appendOnlyReplica(doc, false, 1, randomIntBetween(0, 5)); - Engine.Index retry = appendOnlyReplica(doc, true, 1, randomIntBetween(0, 5)); + Engine.Index operation = appendOnlyReplica(doc.get(), false, 1, randomIntBetween(0, 5)); + Engine.Index retry = appendOnlyReplica(doc.get(), true, 1, randomIntBetween(0, 5)); // operations with a seq# equal or lower to the local checkpoint are not indexed to lucene // and the version lookup is skipped final boolean belowLckp = operation.seqNo() == 0 && retry.seqNo() == 0; @@ -2977,8 +2980,8 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(1, topDocs.totalHits); } - operation = randomAppendOnly(doc, false, 1); - retry = randomAppendOnly(doc, true, 1); + operation = randomAppendOnly(doc.get(), false, 1); + retry = randomAppendOnly(doc.get(), true, 1); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertNotNull(indexResult.getTranslogLocation()); @@ -3043,6 +3046,7 @@ public void testDoubleDeliveryReplica() throws IOException { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(1, topDocs.totalHits); } + assertThat(getOperationSeqNoInLucene(engine), contains(20L)); } public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOException { @@ -3511,20 +3515,22 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOExceptio final List operations = new ArrayList<>(); final int numberOfOperations = randomIntBetween(16, 32); - final Document document = testDocumentWithTextField(); final AtomicLong sequenceNumber = new AtomicLong(); final Engine.Operation.Origin origin = randomFrom(LOCAL_TRANSLOG_RECOVERY, PEER_RECOVERY, PRIMARY, REPLICA); final LongSupplier sequenceNumberSupplier = origin == PRIMARY ? () -> SequenceNumbers.UNASSIGNED_SEQ_NO : sequenceNumber::getAndIncrement; - document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); - final ParsedDocument doc = testParsedDocument("1", null, document, B_1, null); - final Term uid = newUid(doc); + final Supplier doc = () -> { + final Document document = testDocumentWithTextField(); + document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); + return testParsedDocument("1", null, document, B_1, null); + }; + final Term uid = newUid("1"); final BiFunction searcherFactory = engine::acquireSearcher; for (int i = 0; i < numberOfOperations; i++) { if (randomBoolean()) { final Engine.Index index = new Engine.Index( uid, - doc, + doc.get(), sequenceNumberSupplier.getAsLong(), 1, i, @@ -4560,6 +4566,51 @@ public void testTrimUnsafeCommits() throws Exception { } } + public void testLuceneHistoryOnPrimary() throws Exception { + final List operations = generateSingleDocHistory(false, + randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 10, 300); + assertOperationHistoryInLucene(operations); + } + + public void testLuceneHistoryOnReplica() throws Exception { + final List operations = generateSingleDocHistory(true, + randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 10, 300); + Randomness.shuffle(operations); + assertOperationHistoryInLucene(operations); + } + + private void assertOperationHistoryInLucene(List operations) throws IOException { + final MergePolicy keepSoftDeleteDocsMP = new SoftDeletesRetentionMergePolicy( + Lucene.SOFT_DELETE_FIELD, () -> new MatchAllDocsQuery(), engine.config().getMergePolicy()); + Set expectedSeqNos = new HashSet<>(); + try (Store store = createStore(); + Engine engine = createEngine(config(defaultSettings, store, createTempDir(), keepSoftDeleteDocsMP, null))) { + for (Engine.Operation op : operations) { + if (op instanceof Engine.Index) { + Engine.IndexResult indexResult = engine.index((Engine.Index) op); + assertThat(indexResult.getFailure(), nullValue()); + expectedSeqNos.add(indexResult.getSeqNo()); + } else { + Engine.DeleteResult deleteResult = engine.delete((Engine.Delete) op); + assertThat(deleteResult.getFailure(), nullValue()); + expectedSeqNos.add(deleteResult.getSeqNo()); + } + if (rarely()) { + engine.refresh("test"); + } + if (rarely()) { + engine.flush(); + } + if (rarely()) { + engine.forceMerge(true); + } + } + List operationsInLucene = getOperationSeqNoInLucene(engine); + assertThat(operationsInLucene, hasSize(expectedSeqNos.size())); + assertThat(operationsInLucene, containsInAnyOrder(expectedSeqNos.toArray())); + } + } + private static void trimUnsafeCommits(EngineConfig config) throws IOException { final Store store = config.getStore(); final TranslogConfig translogConfig = config.getTranslogConfig(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 45e507cfa5344..936013e1d1e00 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -61,6 +61,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -3079,4 +3080,30 @@ public void testSupplyTombstoneDoc() throws Exception { assertThat(doc.getField(IdFieldMapper.NAME).binaryValue(), equalTo(Uid.encodeId(id))); closeShards(shard); } + + public void testSearcherIncludesSoftDeletes() throws Exception { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard shard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(shard); + indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}"); + indexDoc(shard, "test", "1", "{\"foo\" : \"baz\"}"); + deleteDoc(shard, "test", "0"); + shard.refresh("test"); + try (Engine.Searcher searcher = shard.acquireSearcher("test")) { + IndexSearcher searchWithSoftDeletes = new IndexSearcher(Lucene.includeSoftDeletes(searcher.getDirectoryReader())); + assertThat(searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10).totalHits, equalTo(0L)); + assertThat(searchWithSoftDeletes.search(new TermQuery(new Term("foo", "bar")), 10).totalHits, equalTo(1L)); + assertThat(searcher.searcher().search(new TermQuery(new Term("foo", "baz")), 10).totalHits, equalTo(1L)); + assertThat(searchWithSoftDeletes.search(new TermQuery(new Term("foo", "baz")), 10).totalHits, equalTo(1L)); + } + closeShards(shard); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 9cb3c7e98bfa5..5be4bb8ee6740 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -28,13 +28,18 @@ import org.apache.lucene.document.TextField; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.ReaderUtil; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Sort; +import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; @@ -551,6 +556,7 @@ public static List generateSingleDocHistory( } else { startWithSeqNo = 0; } + final int seqNoGap = randomBoolean() ? 1 : 2; final String valuePrefix = forReplica ? "r_" : "p_"; final boolean incrementTermWhenIntroducingSeqNo = randomBoolean(); for (int i = 0; i < numOfOps; i++) { @@ -574,7 +580,7 @@ public static List generateSingleDocHistory( } if (randomBoolean()) { op = new Engine.Index(id, testParsedDocument("1", null, testDocumentWithTextField(valuePrefix + i), B_1, null), - forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, + forReplica && i >= startWithSeqNo ? i * seqNoGap : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, @@ -583,7 +589,7 @@ public static List generateSingleDocHistory( ); } else { op = new Engine.Delete("test", "1", id, - forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, + forReplica && i >= startWithSeqNo ? i * seqNoGap : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, @@ -664,6 +670,33 @@ public static void assertOpsOnReplica( } } + /** + * Returns a list of sequence numbers of all existing documents including soft-deleted documents in Lucene. + */ + public static List getOperationSeqNoInLucene(Engine engine) throws IOException { + engine.refresh("test"); + final List seqNos = new ArrayList<>(); + try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + IndexSearcher indexSearcher = new IndexSearcher(Lucene.includeSoftDeletes(searcher.getDirectoryReader())); + List leaves = indexSearcher.getIndexReader().leaves(); + NumericDocValues[] seqNoDocValues = new NumericDocValues[leaves.size()]; + for (int i = 0; i < leaves.size(); i++) { + seqNoDocValues[i] = leaves.get(i).reader().getNumericDocValues(SeqNoFieldMapper.NAME); + } + TopDocs allDocs = indexSearcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE); + for (ScoreDoc scoreDoc : allDocs.scoreDocs) { + int leafIndex = ReaderUtil.subIndex(scoreDoc.doc, leaves); + int segmentDocId = scoreDoc.doc - leaves.get(leafIndex).docBase; + if (seqNoDocValues[leafIndex] != null && seqNoDocValues[leafIndex].advanceExact(segmentDocId)) { + seqNos.add(seqNoDocValues[leafIndex].longValue()); + } else { + throw new AssertionError("Segment without seqno DocValues"); + } + } + } + return seqNos; + } + /** * Exposes a translog associated with the given engine for testing purpose. */ From 5f6e8511d20b09691858ad9d3d883a4d3343d241 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 25 Apr 2018 10:27:37 -0400 Subject: [PATCH 2/4] =?UTF-8?q?Simon=E2=80=99s=20feedbacks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../elasticsearch/common/lucene/Lucene.java | 22 +++++++++---------- .../uid/PerThreadIDVersionAndSeqNoLookup.java | 10 ++++----- .../lucene/uid/VersionsAndSeqNoResolver.java | 4 +++- .../index/engine/InternalEngine.java | 14 ++++++------ .../index/shard/IndexShardTests.java | 2 +- .../index/engine/EngineTestCase.java | 2 +- 6 files changed, 27 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 63df96919b164..676ff568b766c 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -835,19 +835,19 @@ public int length() { } /** - * Wraps a directory reader to include soft-deleted documents. - * This should be only used to query the history of documents rather than the documents. + * Wraps a directory reader to include all live docs. + * The wrapped reader can be used to query documents which are soft-deleted. * * @param in the input directory reader - * @return the wrapped reader including soft-deleted documents. + * @return the wrapped reader */ - public static DirectoryReader includeSoftDeletes(DirectoryReader in) throws IOException { - return new DirectoryReaderWithSoftDeletes(in); + public static DirectoryReader wrapAllDocsLive(DirectoryReader in) throws IOException { + return new DirectoryReaderWithAllLiveDocs(in); } - private static final class DirectoryReaderWithSoftDeletes extends FilterDirectoryReader { - static final class SubReaderWithSoftDeletes extends FilterLeafReader { - SubReaderWithSoftDeletes(LeafReader in) { + private static final class DirectoryReaderWithAllLiveDocs extends FilterDirectoryReader { + static final class SubReaderWithAllLiveDocs extends FilterLeafReader { + SubReaderWithAllLiveDocs(LeafReader in) { super(in); } @Override @@ -867,17 +867,17 @@ public CacheHelper getReaderCacheHelper() { return null; // Modifying liveDocs } } - DirectoryReaderWithSoftDeletes(DirectoryReader in) throws IOException { + DirectoryReaderWithAllLiveDocs(DirectoryReader in) throws IOException { super(in, new FilterDirectoryReader.SubReaderWrapper() { @Override public LeafReader wrap(LeafReader leaf) { - return new SubReaderWithSoftDeletes(leaf); + return new SubReaderWithAllLiveDocs(leaf); } }); } @Override protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { - return includeSoftDeletes(in); + return wrapAllDocsLive(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java index d032dbf3917e9..1156f005d5ba6 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java @@ -111,20 +111,18 @@ public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context) * {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found * */ private int getDocID(BytesRef id, Bits liveDocs) throws IOException { - if (termsEnum.seekExact(id)) { - int docID = DocIdSetIterator.NO_MORE_DOCS; + int docID = DocIdSetIterator.NO_MORE_DOCS; + final PostingsEnum docsEnum = getPostingsOrNull(id); + if (docsEnum != null) { // there may be more than one matching docID, in the case of nested docs, so we want the last one: - docsEnum = termsEnum.postings(docsEnum, 0); for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) { if (liveDocs != null && liveDocs.get(d) == false) { continue; } docID = d; } - return docID; - } else { - return DocIdSetIterator.NO_MORE_DOCS; } + return docID; } /** Return null if id is not found. */ diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java index 1e82e8810dfba..21609e2926c61 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -212,9 +212,11 @@ public static boolean hasHistoryInLucene(IndexReader reader, Term idTerm, long s continue; } final NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + assert seqNoDV != null : "SeqNoDV does not exist"; final NumericDocValues primaryTermDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + assert primaryTermDV != null : "PrimaryTermDV does not exist"; for (int docId = postingsEnum.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = postingsEnum.nextDoc()) { - if (seqNoDV != null && seqNoDV.advanceExact(docId) && primaryTermDV != null && primaryTermDV.advanceExact(docId)) { + if (seqNoDV.advanceExact(docId) && primaryTermDV.advanceExact(docId)) { if (seqNoDV.longValue() == seqNo && primaryTermDV.longValue() >= primaryTerm) { return true; } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index d8cfe11b7275e..216c0c7eae7d8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -581,7 +581,7 @@ enum OpVsLuceneDocStatus { /** the op is more recent than the one that last modified the doc found in lucene*/ OP_NEWER, /** the op is stale but its history is existed in Lucene */ - OP_STALE_HISTORY_EXISTED, + OP_STALE_HISTORY_EXISTS, /** the op is stale and its history is not found in Lucene */ OP_STALE_HISTORY_NOT_FOUND, /** no doc was found in lucene */ @@ -590,7 +590,7 @@ enum OpVsLuceneDocStatus { private OpVsLuceneDocStatus compareToLuceneHistory(final Operation op, final Searcher searcher) throws IOException { if (VersionsAndSeqNoResolver.hasHistoryInLucene(searcher.reader(), op.uid(), op.seqNo(), op.primaryTerm())) { - return OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTED; + return OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS; } else { return OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND; } @@ -606,7 +606,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) (op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) { status = OpVsLuceneDocStatus.OP_NEWER; } else if (op.seqNo() == versionValue.seqNo && op.primaryTerm() == versionValue.term) { - status = OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTED; + status = OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS; } else { status = OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND; } @@ -861,11 +861,11 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO // question may have been deleted in an out of order op that is not replayed. // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - opVsLucene = OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTED; + opVsLucene = OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS; } else { opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); } - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTED) { + if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS) { plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); } else if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND) { plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version()); @@ -1193,14 +1193,14 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws // question may have been deleted in an out of order op that is not replayed. // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - opVsLucene = OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTED; + opVsLucene = OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS; } else { opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); } final DeletionStrategy plan; - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTED) { + if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS) { plan = DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version()); } else if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND) { plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version()); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 936013e1d1e00..233707e655063 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -3098,7 +3098,7 @@ public void testSearcherIncludesSoftDeletes() throws Exception { deleteDoc(shard, "test", "0"); shard.refresh("test"); try (Engine.Searcher searcher = shard.acquireSearcher("test")) { - IndexSearcher searchWithSoftDeletes = new IndexSearcher(Lucene.includeSoftDeletes(searcher.getDirectoryReader())); + IndexSearcher searchWithSoftDeletes = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader())); assertThat(searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10).totalHits, equalTo(0L)); assertThat(searchWithSoftDeletes.search(new TermQuery(new Term("foo", "bar")), 10).totalHits, equalTo(1L)); assertThat(searcher.searcher().search(new TermQuery(new Term("foo", "baz")), 10).totalHits, equalTo(1L)); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 5be4bb8ee6740..bcb73fe8b32dc 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -677,7 +677,7 @@ public static List getOperationSeqNoInLucene(Engine engine) throws IOExcep engine.refresh("test"); final List seqNos = new ArrayList<>(); try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { - IndexSearcher indexSearcher = new IndexSearcher(Lucene.includeSoftDeletes(searcher.getDirectoryReader())); + IndexSearcher indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader())); List leaves = indexSearcher.getIndexReader().leaves(); NumericDocValues[] seqNoDocValues = new NumericDocValues[leaves.size()]; for (int i = 0; i < leaves.size(); i++) { From 70d5359b871ca7f37bff01e885de8d6f78fed98f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 25 Apr 2018 16:30:24 -0400 Subject: [PATCH 3/4] Accept duplicate stale docs --- .../uid/PerThreadIDVersionAndSeqNoLookup.java | 21 ++---- .../lucene/uid/VersionsAndSeqNoResolver.java | 32 --------- .../index/engine/InternalEngine.java | 68 +++++++------------ .../index/engine/InternalEngineTests.java | 4 +- .../index/engine/EngineTestCase.java | 10 +-- 5 files changed, 36 insertions(+), 99 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java index 1156f005d5ba6..38fcdfe5f1b62 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java @@ -111,18 +111,20 @@ public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context) * {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found * */ private int getDocID(BytesRef id, Bits liveDocs) throws IOException { - int docID = DocIdSetIterator.NO_MORE_DOCS; - final PostingsEnum docsEnum = getPostingsOrNull(id); - if (docsEnum != null) { + if (termsEnum.seekExact(id)) { + int docID = DocIdSetIterator.NO_MORE_DOCS; // there may be more than one matching docID, in the case of nested docs, so we want the last one: + docsEnum = termsEnum.postings(docsEnum, 0); for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) { if (liveDocs != null && liveDocs.get(d) == false) { continue; } docID = d; } + return docID; + } else { + return DocIdSetIterator.NO_MORE_DOCS; } - return docID; } /** Return null if id is not found. */ @@ -143,15 +145,4 @@ DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOExcep return null; } } - - /** - * Returns an internal posting list of the given uid - */ - PostingsEnum getPostingsOrNull(BytesRef id) throws IOException { - if (termsEnum.seekExact(id)) { - docsEnum = termsEnum.postings(docsEnum, 0); - return docsEnum; - } - return null; - } } diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java index 21609e2926c61..9db7e3716d51a 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -23,9 +23,7 @@ import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.index.PostingsEnum; import org.apache.lucene.index.Term; -import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.util.CloseableThreadLocal; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.mapper.SeqNoFieldMapper; @@ -195,34 +193,4 @@ public static long loadVersion(IndexReader reader, Term term) throws IOException final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term); return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version; } - - /** - * Checks for the existence of the history of a pair SeqNo/PrimaryTerm in Lucene. The checking pair is considered as existed - * if there is a pair such as the seqNo equals to the checking seqNo and the primary term is at least the checking term. - */ - public static boolean hasHistoryInLucene(IndexReader reader, Term idTerm, long seqNo, long primaryTerm) throws IOException { - final PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, idTerm.field()); - final List leaves = reader.leaves(); - // iterate backwards to optimize for the frequently updated documents which are likely to be in the last segments - for (int i = leaves.size() - 1; i >= 0; i--) { - final LeafReaderContext leaf = leaves.get(i); - final PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord]; - final PostingsEnum postingsEnum = lookup.getPostingsOrNull(idTerm.bytes()); - if (postingsEnum == null) { - continue; - } - final NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); - assert seqNoDV != null : "SeqNoDV does not exist"; - final NumericDocValues primaryTermDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); - assert primaryTermDV != null : "PrimaryTermDV does not exist"; - for (int docId = postingsEnum.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = postingsEnum.nextDoc()) { - if (seqNoDV.advanceExact(docId) && primaryTermDV.advanceExact(docId)) { - if (seqNoDV.longValue() == seqNo && primaryTermDV.longValue() >= primaryTerm) { - return true; - } - } - } - } - return false; - } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 216c0c7eae7d8..93b5e1bc8f9e8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -580,35 +580,23 @@ public GetResult get(Get get, BiFunction search enum OpVsLuceneDocStatus { /** the op is more recent than the one that last modified the doc found in lucene*/ OP_NEWER, - /** the op is stale but its history is existed in Lucene */ - OP_STALE_HISTORY_EXISTS, - /** the op is stale and its history is not found in Lucene */ - OP_STALE_HISTORY_NOT_FOUND, + /** the op is older or the same as the one that last modified the doc found in lucene*/ + OP_STALE_OR_EQUAL, /** no doc was found in lucene */ LUCENE_DOC_NOT_FOUND } - private OpVsLuceneDocStatus compareToLuceneHistory(final Operation op, final Searcher searcher) throws IOException { - if (VersionsAndSeqNoResolver.hasHistoryInLucene(searcher.reader(), op.uid(), op.seqNo(), op.primaryTerm())) { - return OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS; - } else { - return OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND; - } - } - private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; final OpVsLuceneDocStatus status; VersionValue versionValue = getVersionFromMap(op.uid().bytes()); assert incrementVersionLookup(); if (versionValue != null) { - if (op.seqNo() > versionValue.seqNo || - (op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) { + if (op.seqNo() > versionValue.seqNo || + (op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) status = OpVsLuceneDocStatus.OP_NEWER; - } else if (op.seqNo() == versionValue.seqNo && op.primaryTerm() == versionValue.term) { - status = OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS; - } else { - status = OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND; + else { + status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } } else { // load from index @@ -625,10 +613,10 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) if (op.primaryTerm() > existingTerm) { status = OpVsLuceneDocStatus.OP_NEWER; } else { - status = compareToLuceneHistory(op, searcher); + status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } } else { - status = compareToLuceneHistory(op, searcher); + status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } } } @@ -852,7 +840,6 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO // unlike the primary, replicas don't really care to about creation status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return false for the created flag in favor of code simplicity - final OpVsLuceneDocStatus opVsLucene; if (index.seqNo() <= localCheckpointTracker.getCheckpoint()){ // the operation seq# is lower then the current local checkpoint and thus was already put into lucene // this can happen during recovery where older operations are sent from the translog that are already @@ -861,18 +848,15 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO // question may have been deleted in an out of order op that is not replayed. // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - opVsLucene = OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS; - } else { - opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); - } - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS) { plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); - } else if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND) { - plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version()); } else { - plan = IndexingStrategy.processNormally( - opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.seqNo(), index.version() - ); + final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); + if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { + plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version()); + } else { + plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, + index.seqNo(), index.version()); + } } } return plan; @@ -1184,7 +1168,7 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws // unlike the primary, replicas don't really care to about found status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return true for the found flag in favor of code simplicity - final OpVsLuceneDocStatus opVsLucene; + final DeletionStrategy plan; if (delete.seqNo() <= localCheckpointTracker.getCheckpoint()) { // the operation seq# is lower then the current local checkpoint and thus was already put into lucene // this can happen during recovery where older operations are sent from the translog that are already @@ -1193,21 +1177,15 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws // question may have been deleted in an out of order op that is not replayed. // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - opVsLucene = OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS; - } else { - opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); - } - - final DeletionStrategy plan; - - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS) { plan = DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version()); - } else if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND) { - plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version()); } else { - plan = DeletionStrategy.processNormally( - opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, - delete.seqNo(), delete.version()); + final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); + if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { + plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version()); + } else { + plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, + delete.seqNo(), delete.version()); + } } return plan; } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 020c8dd254b72..439e1fd89be23 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4605,9 +4605,7 @@ private void assertOperationHistoryInLucene(List operations) t engine.forceMerge(true); } } - List operationsInLucene = getOperationSeqNoInLucene(engine); - assertThat(operationsInLucene, hasSize(expectedSeqNos.size())); - assertThat(operationsInLucene, containsInAnyOrder(expectedSeqNos.toArray())); + assertThat(getOperationSeqNoInLucene(engine), containsInAnyOrder(expectedSeqNos.toArray())); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index bcb73fe8b32dc..57ebf90efcf12 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -39,12 +39,11 @@ import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Sort; +import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHitCountCollector; -import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -58,6 +57,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; @@ -93,7 +93,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.LongSupplier; @@ -673,9 +675,9 @@ public static void assertOpsOnReplica( /** * Returns a list of sequence numbers of all existing documents including soft-deleted documents in Lucene. */ - public static List getOperationSeqNoInLucene(Engine engine) throws IOException { + public static Set getOperationSeqNoInLucene(Engine engine) throws IOException { engine.refresh("test"); - final List seqNos = new ArrayList<>(); + final Set seqNos = new HashSet<>(); try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { IndexSearcher indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader())); List leaves = indexSearcher.getIndexReader().leaves(); From 687adc7c14c01b32f3f8438537ca8a6fdd38e90b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 27 Apr 2018 09:55:31 -0400 Subject: [PATCH 4/4] comment for wrapped reader --- .../src/main/java/org/elasticsearch/common/lucene/Lucene.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 676ff568b766c..f50995048eadd 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -836,7 +836,7 @@ public int length() { /** * Wraps a directory reader to include all live docs. - * The wrapped reader can be used to query documents which are soft-deleted. + * The wrapped reader can be used to query all documents. * * @param in the input directory reader * @return the wrapped reader