diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1427509a2ec7c..0aad58d69e1ea 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -87,6 +87,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -648,54 +649,38 @@ public GetResult get(Get get, BiFunction search } } - /** - * the status of the current doc version in lucene, compared to the version in an incoming - * operation - */ - enum OpVsLuceneDocStatus { - /** the op is more recent than the one that last modified the doc found in lucene*/ - OP_NEWER, - /** the op is older or the same as the one that last modified the doc found in lucene*/ - OP_STALE_OR_EQUAL, - /** no doc was found in lucene */ - LUCENE_DOC_NOT_FOUND + private static final class SeqNoAndVersionValue { + final long seqNo; + final VersionValue versionValue; + SeqNoAndVersionValue(long seqNo, VersionValue versionValue) { + assert versionValue == null || versionValue.seqNo == seqNo : "version_value=" + versionValue + ", seqno=" + seqNo; + this.seqNo = seqNo; + this.versionValue = versionValue; + } + boolean isDelete() { + return versionValue != null && versionValue.isDelete(); + } } - private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { + /** resolves the last seqno and version on the version map (if exists) of the document, returning empty if not found */ + private SeqNoAndVersionValue resolveDocSeqNo(final Operation op) throws IOException { + //TODO: Assert that if prev_seqno equals to op's seqno, they should have the same term after rollback is implemented. assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; - final OpVsLuceneDocStatus status; - VersionValue versionValue = getVersionFromMap(op.uid().bytes()); + final VersionValue versionValue = getVersionFromMap(op.uid().bytes()); assert incrementVersionLookup(); if (versionValue != null) { - if (op.seqNo() > versionValue.seqNo || - (op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) - status = OpVsLuceneDocStatus.OP_NEWER; - else { - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; - } + return new SeqNoAndVersionValue(versionValue.seqNo, versionValue); } else { - // load from index assert incrementIndexVersionLookup(); try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) { - DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid()); + final DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid()); if (docAndSeqNo == null) { - status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; - } else if (op.seqNo() > docAndSeqNo.seqNo) { - status = OpVsLuceneDocStatus.OP_NEWER; - } else if (op.seqNo() == docAndSeqNo.seqNo) { - // load term to tie break - final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo, op.uid().field()); - if (op.primaryTerm() > existingTerm) { - status = OpVsLuceneDocStatus.OP_NEWER; - } else { - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; - } + return null; } else { - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + return new SeqNoAndVersionValue(docAndSeqNo.seqNo, null); } } } - return status; } /** resolves the current version of the document, returning null if not found */ @@ -909,12 +894,13 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO // See testRecoveryWithOutOfOrderDelete for an example of peer recovery plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); } else { - final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version()); + final SeqNoAndVersionValue currentVersion = resolveDocSeqNo(index); + if (currentVersion == null || currentVersion.seqNo < index.seqNo()) { + plan = IndexingStrategy.processNormally(currentVersion == null, index.seqNo(), index.version(), currentVersion); } else { - plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, - index.seqNo(), index.version()); + final boolean addStaleOpToLucene = this.softDeleteEnabled; + plan = IndexingStrategy.processAsStaleOp(addStaleOpToLucene, index.seqNo(), index.version(), + addStaleOpToLucene ? currentVersion : null); } } } @@ -962,8 +948,8 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc } else { plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted, generateSeqNoForOperation(index), - index.versionType().updateVersion(currentVersion, index.version()) - ); + index.versionType().updateVersion(currentVersion, index.version()), + versionValue == null ? null : new SeqNoAndVersionValue(versionValue.seqNo, versionValue)); } } return plan; @@ -982,13 +968,28 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) index.parsedDoc().version().setLongValue(plan.versionForIndexing); try { if (plan.addStaleOpToLucene) { - addStaleDocs(index.docs(), indexWriter); + addStaleDocs(index.docs(), plan.currentVersionValue.seqNo); } else if (plan.useLuceneUpdateDocument) { - updateDocs(index.uid(), index.docs(), indexWriter); + updateDocs(index.uid(), plan, index.docs()); } else { // document does not exists, we can optimize for create, but double check if assertions are running assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false); - addDocs(index.docs(), indexWriter); + final List docs; + if (softDeleteEnabled && plan.currentVersionValue != null && plan.currentVersionValue.isDelete()) { + docs = new ArrayList<>(index.docs().size() + 1); + docs.addAll(index.docs()); + // Since the updated_by_seqno of a delete tombstone is never updated, we need to reindex another tombstone for the + // previous delete operation with updated_by_seqno equalling to seqno of this index op. This new tombstone ensures + // that we will always have a tombstone of the previous delete operation if we have to roll back this index operation. + final VersionValue currentVersion = plan.currentVersionValue.versionValue; + ParseContext.Document tombstone = createDeleteTombstone( + index.id(), index.type(), currentVersion.seqNo, currentVersion.term, currentVersion.version).docs().get(0); + tombstone.add(new NumericDocValuesField(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, plan.seqNoForIndexing)); + docs.add(tombstone); + } else { + docs = index.docs(); + } + addDocs(docs, indexWriter); } return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } catch (Exception ex) { @@ -1047,9 +1048,14 @@ private void addDocs(final List docs, final IndexWriter i numDocAppends.inc(docs.size()); } - private void addStaleDocs(final List docs, final IndexWriter indexWriter) throws IOException { + private void addStaleDocs(final List docs, final long seqNoOfNewerVersion) throws IOException { assert softDeleteEnabled : "Add history documents but soft-deletes is disabled"; - docs.forEach(d -> d.add(softDeleteField)); + assert seqNoOfNewerVersion >= 0 : "Invalid newer version; seqno=" + seqNoOfNewerVersion; + NumericDocValuesField updatedBySeqNoField = new NumericDocValuesField(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, seqNoOfNewerVersion); + for (ParseContext.Document doc : docs) { + doc.add(softDeleteField); + doc.add(updatedBySeqNoField); + } if (docs.size() > 1) { indexWriter.addDocuments(docs); } else { @@ -1064,56 +1070,62 @@ protected static final class IndexingStrategy { final long versionForIndexing; final boolean indexIntoLucene; final boolean addStaleOpToLucene; + final SeqNoAndVersionValue currentVersionValue; final Optional earlyResultOnPreFlightError; private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpdateDocument, - boolean indexIntoLucene, boolean addStaleOpToLucene, long seqNoForIndexing, - long versionForIndexing, IndexResult earlyResultOnPreFlightError) { + boolean indexIntoLucene, boolean addStaleOpToLucene, long seqNoForIndexing, long versionForIndexing, + SeqNoAndVersionValue currentVersionValue, IndexResult earlyResultOnPreFlightError) { assert useLuceneUpdateDocument == false || indexIntoLucene : "use lucene update is set to true, but we're not indexing into lucene"; assert (indexIntoLucene && earlyResultOnPreFlightError != null) == false : "can only index into lucene or have a preflight result but not both." + "indexIntoLucene: " + indexIntoLucene + " earlyResultOnPreFlightError:" + earlyResultOnPreFlightError; + // TODO: assert greater (i.e. remove or equals) after rollback is implemented + assert addStaleOpToLucene == false || currentVersionValue.seqNo >= seqNoForIndexing : + "stale=" + addStaleOpToLucene + " seqno_for_indexing=" + seqNoForIndexing + " current_seqno=" + currentVersionValue.seqNo; this.currentNotFoundOrDeleted = currentNotFoundOrDeleted; this.useLuceneUpdateDocument = useLuceneUpdateDocument; this.seqNoForIndexing = seqNoForIndexing; this.versionForIndexing = versionForIndexing; this.indexIntoLucene = indexIntoLucene; this.addStaleOpToLucene = addStaleOpToLucene; + this.currentVersionValue = currentVersionValue; this.earlyResultOnPreFlightError = earlyResultOnPreFlightError == null ? Optional.empty() : Optional.of(earlyResultOnPreFlightError); } static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) { - return new IndexingStrategy(true, false, true, false, seqNoForIndexing, 1, null); + return new IndexingStrategy(true, false, true, false, seqNoForIndexing, 1, null, null); } static IndexingStrategy skipDueToVersionConflict( VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) { final IndexResult result = new IndexResult(e, currentVersion); return new IndexingStrategy( - currentNotFoundOrDeleted, false, false, false, SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, result); + currentNotFoundOrDeleted, false, false, false, SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, null, result); } - static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, - long seqNoForIndexing, long versionForIndexing) { + static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, long seqNoForIndexing, + long versionForIndexing, SeqNoAndVersionValue currentVersionValue) { return new IndexingStrategy(currentNotFoundOrDeleted, currentNotFoundOrDeleted == false, - true, false, seqNoForIndexing, versionForIndexing, null); + true, false, seqNoForIndexing, versionForIndexing, currentVersionValue, null); } static IndexingStrategy overrideExistingAsIfNotThere( long seqNoForIndexing, long versionForIndexing) { - return new IndexingStrategy(true, true, true, false, seqNoForIndexing, versionForIndexing, null); + return new IndexingStrategy(true, true, true, false, seqNoForIndexing, versionForIndexing, null, null); } static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing, long versionForIndexing) { - return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, seqNoForIndexing, versionForIndexing, null); + return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, seqNoForIndexing, versionForIndexing, null, null); } - static IndexingStrategy processAsStaleOp(boolean addStaleOpToLucene, long seqNoForIndexing, long versionForIndexing) { - return new IndexingStrategy(false, false, false, addStaleOpToLucene, seqNoForIndexing, versionForIndexing, null); + static IndexingStrategy processAsStaleOp(boolean addStaleOpToLucene, long seqNoForIndexing, + long versionForIndexing, SeqNoAndVersionValue currentVersionValue) { + return new IndexingStrategy(false, false, false, addStaleOpToLucene, seqNoForIndexing, versionForIndexing, currentVersionValue, null); } } @@ -1139,12 +1151,13 @@ private boolean assertDocDoesNotExist(final Index index, final boolean allowDele return true; } - private void updateDocs(final Term uid, final List docs, final IndexWriter indexWriter) throws IOException { + private void updateDocs(Term uid, IndexingStrategy plan, List docs) throws IOException { if (softDeleteEnabled) { + NumericDocValuesField updatedBySeqNoField = new NumericDocValuesField(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, plan.seqNoForIndexing); if (docs.size() > 1) { - indexWriter.softUpdateDocuments(uid, docs, softDeleteField); + indexWriter.softUpdateDocuments(uid, docs, softDeleteField, updatedBySeqNoField); } else { - indexWriter.softUpdateDocument(uid, docs.get(0), softDeleteField); + indexWriter.softUpdateDocument(uid, docs.get(0), softDeleteField, updatedBySeqNoField); } } else { if (docs.size() > 1) { @@ -1233,12 +1246,13 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws // See testRecoveryWithOutOfOrderDelete for an example of peer recovery plan = DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version()); } else { - final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version()); + final SeqNoAndVersionValue currentVersion = resolveDocSeqNo(delete); + if (currentVersion == null || currentVersion.seqNo < delete.seqNo()) { + plan = DeletionStrategy.processNormally(currentVersion == null, delete.seqNo(), delete.version()); } else { - plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, - delete.seqNo(), delete.version()); + boolean addStaleOpToLucene = this.softDeleteEnabled; + plan = DeletionStrategy.processAsStaleOp(addStaleOpToLucene, false, delete.seqNo(), delete.version(), + addStaleOpToLucene ? currentVersion.seqNo : -1); } } return plan; @@ -1276,22 +1290,32 @@ protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOE return plan; } + private ParsedDocument createDeleteTombstone(String id, String type, long seqNo, long primaryTerm, long version) { + final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(type, id); + assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]"; + tombstone.updateSeqID(seqNo, primaryTerm); + tombstone.version().setLongValue(version); + final ParseContext.Document doc = tombstone.docs().get(0); + assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : + "Delete tombstone document but _tombstone field is not set [" + doc + " ]"; + doc.add(softDeleteField); + return tombstone; + } + private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws IOException { try { if (softDeleteEnabled) { - final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id()); - assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]"; - tombstone.updateSeqID(plan.seqNoOfDeletion, delete.primaryTerm()); - tombstone.version().setLongValue(plan.versionOfDeletion); - final ParseContext.Document doc = tombstone.docs().get(0); - assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : - "Delete tombstone document but _tombstone field is not set [" + doc + " ]"; - doc.add(softDeleteField); + final ParseContext.Document tombstone = createDeleteTombstone( + delete.id(), delete.type(), plan.seqNoOfDeletion, delete.primaryTerm(), plan.versionOfDeletion).docs().get(0); + if (plan.addStaleOpToLucene) { + tombstone.add(new NumericDocValuesField(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, plan.seqNoOfNewerDocIfStale)); + } if (plan.addStaleOpToLucene || plan.currentlyDeleted) { - indexWriter.addDocument(doc); + indexWriter.addDocument(tombstone); } else { - indexWriter.softUpdateDocument(delete.uid(), doc, softDeleteField); + NumericDocValuesField updatedBySeqNoField = new NumericDocValuesField(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, plan.seqNoOfDeletion); + indexWriter.softUpdateDocument(delete.uid(), tombstone, softDeleteField, updatedBySeqNoField); } } else if (plan.currentlyDeleted == false) { // any exception that comes from this is a either an ACE or a fatal exception there @@ -1324,20 +1348,24 @@ protected static final class DeletionStrategy { final boolean currentlyDeleted; final long seqNoOfDeletion; final long versionOfDeletion; + final long seqNoOfNewerDocIfStale; // the seqno of the newer copy of this _uid if exists; otherwise -1 final Optional earlyResultOnPreflightError; - private DeletionStrategy(boolean deleteFromLucene, boolean addStaleOpToLucene, boolean currentlyDeleted, - long seqNoOfDeletion, long versionOfDeletion, - DeleteResult earlyResultOnPreflightError) { + private DeletionStrategy(boolean deleteFromLucene, boolean addStaleOpToLucene, boolean currentlyDeleted, long seqNoOfDeletion, + long versionOfDeletion, long seqNoOfNewerDocIfStale, DeleteResult earlyResultOnPreflightError) { assert (deleteFromLucene && earlyResultOnPreflightError != null) == false : "can only delete from lucene or have a preflight result but not both." + "deleteFromLucene: " + deleteFromLucene + " earlyResultOnPreFlightError:" + earlyResultOnPreflightError; + // TODO: assert greater (i.e. remove or equals) after rollback is implemented + assert addStaleOpToLucene == false || seqNoOfNewerDocIfStale >= seqNoOfDeletion : + "stale=" + addStaleOpToLucene + " seqno_for_deletion=" + seqNoOfDeletion + " seqno_of_newer_doc=" + seqNoOfNewerDocIfStale; this.deleteFromLucene = deleteFromLucene; this.addStaleOpToLucene = addStaleOpToLucene; this.currentlyDeleted = currentlyDeleted; this.seqNoOfDeletion = seqNoOfDeletion; this.versionOfDeletion = versionOfDeletion; + this.seqNoOfNewerDocIfStale = seqNoOfNewerDocIfStale; this.earlyResultOnPreflightError = earlyResultOnPreflightError == null ? Optional.empty() : Optional.of(earlyResultOnPreflightError); } @@ -1346,22 +1374,22 @@ static DeletionStrategy skipDueToVersionConflict( VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) { final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; final DeleteResult deleteResult = new DeleteResult(e, currentVersion, unassignedSeqNo, currentlyDeleted == false); - return new DeletionStrategy(false, false, currentlyDeleted, unassignedSeqNo, Versions.NOT_FOUND, deleteResult); + return new DeletionStrategy(false, false, currentlyDeleted, unassignedSeqNo, Versions.NOT_FOUND, -1, deleteResult); } static DeletionStrategy processNormally(boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion) { - return new DeletionStrategy(true, false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null); + return new DeletionStrategy(true, false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, -1, null); } public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion) { - return new DeletionStrategy(false, false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null); + return new DeletionStrategy(false, false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, -1, null); } - static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, boolean currentlyDeleted, - long seqNoOfDeletion, long versionOfDeletion) { - return new DeletionStrategy(false, addStaleOpToLucene, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null); + static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, boolean currentlyDeleted, long seqNoOfDeletion, + long versionOfDeletion, long seqNoOfNewerDocIfStale) { + return new DeletionStrategy(false, addStaleOpToLucene, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, seqNoOfNewerDocIfStale, null); } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java index af2ded8c46620..7d9bd67d2abfa 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java @@ -20,6 +20,9 @@ package org.elasticsearch.index.engine; import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.Query; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.index.mapper.SeqNoFieldMapper; @@ -115,6 +118,11 @@ synchronized long getMinRetainedSeqNo() { * Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges. */ Query getRetentionQuery() { - return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinRetainedSeqNo(), Long.MAX_VALUE); + return new BooleanQuery.Builder() + .add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinRetainedSeqNo(), Long.MAX_VALUE), BooleanClause.Occur.SHOULD) + // Since updated_by_seqno is an updatable DV, we have to do a linear scan to find matches of its range query. + .add(NumericDocValuesField.newSlowRangeQuery(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, + globalCheckpointSupplier.getAsLong() + 1, Long.MAX_VALUE), BooleanClause.Occur.SHOULD) + .build(); } } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java index 5a0db4163bf28..9e3842183002c 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java @@ -92,6 +92,7 @@ public static SequenceIDFields emptySeqID() { public static final String CONTENT_TYPE = "_seq_no"; public static final String PRIMARY_TERM_NAME = "_primary_term"; public static final String TOMBSTONE_NAME = "_tombstone"; + public static final String UPDATED_BY_SEQNO_NAME = "_updated_by_seqno"; public static class SeqNoDefaults { public static final String NAME = SeqNoFieldMapper.NAME; diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index d1a6fc858d358..ec3e781bbcff4 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -30,6 +30,7 @@ import java.util.Base64; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; @@ -68,6 +69,7 @@ import org.apache.lucene.document.StoredField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; @@ -82,14 +84,23 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.ReaderUtil; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; import org.apache.lucene.index.TieredMergePolicy; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; import org.apache.lucene.search.SortedSetSortField; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; @@ -101,6 +112,7 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; @@ -110,6 +122,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; @@ -127,6 +140,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; @@ -158,6 +172,7 @@ import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; @@ -1343,24 +1358,31 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception { final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final MapperService mapperService = createMapperService("test"); - final Set liveDocs = new HashSet<>(); + final Map liveDocs = new HashMap<>(); + final Map updatedBy = new HashMap<>(); try (Store store = createStore(); - InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get))) { + InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), + newMergePolicy(), null, null, globalCheckpoint::get))) { int numDocs = scaledRandomIntBetween(10, 100); for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null); - engine.index(indexForDoc(doc)); - liveDocs.add(doc.id()); + ParsedDocument doc = testParsedDocument(Integer.toString(between(i, i+5)), null, testDocument(), B_1, null); + liveDocs.put(doc.id(), engine.index(indexForDoc(doc)).getSeqNo()); } for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null); + ParsedDocument doc = testParsedDocument(Integer.toString(between(i, i+5)), null, testDocument(), B_1, null); if (randomBoolean()) { - engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get())); + Engine.DeleteResult delete = engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get())); + if (liveDocs.containsKey(doc.id())) { + updatedBy.put(liveDocs.get(doc.id()), delete.getSeqNo()); + } liveDocs.remove(doc.id()); } if (randomBoolean()) { - engine.index(indexForDoc(doc)); - liveDocs.add(doc.id()); + Engine.IndexResult index = engine.index(indexForDoc(doc)); + if (liveDocs.containsKey(doc.id())) { + updatedBy.put(liveDocs.get(doc.id()), index.getSeqNo()); + } + liveDocs.put(doc.id(), index.getSeqNo()); } if (randomBoolean()) { engine.flush(randomBoolean(), true); @@ -1377,20 +1399,36 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception { } engine.forceMerge(true, 1, false, false, false); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); - Map ops = readAllOperationsInLucene(engine, mapperService) - .stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity())); - for (long seqno = 0; seqno <= localCheckpoint; seqno++) { - long minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitCheckpoint + 1); - String msg = "seq# [" + seqno + "], global checkpoint [" + globalCheckpoint + "], retained-ops [" + retainedExtraOps + "]"; - if (seqno < minSeqNoToRetain) { - Translog.Operation op = ops.get(seqno); - if (op != null) { - assertThat(op, instanceOf(Translog.Index.class)); - assertThat(msg, ((Translog.Index) op).id(), isIn(liveDocs)); - assertEquals(msg, ((Translog.Index) op).source(), B_1); + engine.refresh("test"); + long minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitCheckpoint + 1); + try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + DirectoryReader reader = Lucene.wrapAllDocsLive(searcher.getDirectoryReader()); + Set keptSeqNos = new HashSet<>(); + for (LeafReaderContext leaf : reader.leaves()) { + DocIdSetIterator docIterator = new IndexSearcher(reader).createWeight(new MatchAllDocsQuery(), false, 1.0f) + .scorer(leaf).iterator(); + NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + NumericDocValues updatedByDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME); + int doc; + while ((doc = docIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + assertTrue(seqNoDV.advanceExact(doc)); + keptSeqNos.add(seqNoDV.longValue()); + // documents whose seqno < min_seqno_to_retain are not merged away because + // they're live or updated by an op whose seqno is greater than global checkpoint. + if (seqNoDV.longValue() < minSeqNoToRetain && liveDocs.values().contains(seqNoDV.longValue()) == false) { + String msg = "seq_no=" + seqNoDV.longValue() + ",global_checkpoint=" + globalCheckpoint; + assertTrue(msg, updatedByDV.advanceExact(doc)); + assertThat(msg, updatedByDV.longValue(), greaterThan(globalCheckpoint.get())); + } + } + } + for (long seqno = 0; seqno < localCheckpoint; seqno++) { + if (seqno >= minSeqNoToRetain) { + assertThat("seq_no=" + seqno + " required for changes", keptSeqNos, hasItem(seqno)); + } + if (updatedBy.getOrDefault(seqno, Long.MIN_VALUE) > globalCheckpoint.get()) { + assertThat("seq_no=" + seqno + " required for rollback", keptSeqNos, hasItem(seqno)); } - } else { - assertThat(msg, ops.get(seqno), notNullValue()); } } settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0); @@ -4950,6 +4988,248 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { } } + public void testRecordUpdatedBySeqNo() throws Exception { + CheckedFunction>, IOException> readUpdatedBySeqNos = (engine) -> { + engine.refresh("test"); + List> updates = new ArrayList<>(); + try (Searcher engineSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()); + for (LeafReaderContext leaf : reader.leaves()) { + NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + NumericDocValues updatedByDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME); + DocIdSetIterator iterator = new IndexSearcher(reader).createWeight(new MatchAllDocsQuery(), false, 1.0f) + .scorer(leaf).iterator(); + int doc; + while((doc = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + assertThat(seqNoDV.advanceExact(doc), equalTo(true)); + if (updatedByDV.advanceExact(doc)) { + assertThat(updatedByDV.longValue(), greaterThanOrEqualTo(0L)); + updates.add(Tuple.tuple(seqNoDV.longValue(), updatedByDV.longValue())); + }else { + updates.add(Tuple.tuple(seqNoDV.longValue(), null)); + } + } + } + } + return updates; + }; + Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); + final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); + // On replica + AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore(); + InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), + newMergePolicy(), null, null, globalCheckpoint::get))) { + engine.index(replicaIndexForDoc(createParsedDoc("1", null), 1, 0, false)); + engine.index(replicaIndexForDoc(createParsedDoc("1", null), 2, 1, false)); + // IndexWriter allows softUpdateDocuments soft-deleted docs again until it's refreshed. + engine.refresh("test"); + engine.delete(replicaDeleteForDoc("1", 4, 3, threadPool.relativeTimeInMillis())); + engine.index(replicaIndexForDoc(createParsedDoc("1", null), 3, 2, false)); + engine.index(replicaIndexForDoc(createParsedDoc("1", null), 6, 5, false)); + engine.delete(replicaDeleteForDoc("1", 5, 4, threadPool.relativeTimeInMillis())); + List> seqNos = readUpdatedBySeqNos.apply(engine); + // index#0 -> index#1 -> delete#3 -> index#2 -> index#5 -> delete#4 + assertThat(seqNos, containsInAnyOrder( + Tuple.tuple(0L, 1L), // 0 -> 1 - seq#0 should be updated once because of refresh + Tuple.tuple(1L, 5L), // 1 -> 5 (deleted by #3, then updated by #5) + Tuple.tuple(2L, 3L), // index#2 is stale by delete#3, thus should never be updated again. + Tuple.tuple(3L, null), // the first delete tombstone without updated_by_seqno + Tuple.tuple(3L, 5L), // the second delete tombstone without updated_by_seqno + Tuple.tuple(4L, 5L), // delete#4 is stale, thus having updated_by_seqno=5 + Tuple.tuple(5L, null) // still alive + )); + } + // On primary + try (Store store = createStore(); + InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), + newMergePolicy(), null, null, globalCheckpoint::get))) { + engine.index(indexForDoc(createParsedDoc("1", null))); + engine.index(indexForDoc(createParsedDoc("1", null))); + engine.refresh("test"); + engine.delete(new Engine.Delete("test", "1", newUid("1"), primaryTerm.get())); + engine.index(indexForDoc(createParsedDoc("1", null))); + engine.delete(new Engine.Delete("test", "1", newUid("1"), primaryTerm.get())); + engine.refresh("test"); + List> seqNos = readUpdatedBySeqNos.apply(engine); + // index#0 -> index#1 -> delete#2 -> index#3 -> delete#4 + assertThat(seqNos, containsInAnyOrder( + Tuple.tuple(0L, 1L), // index#0 -> index#1 -> refresh + Tuple.tuple(1L, 4L), // index#1 -> delete#2 -> index#3 -> delete#4 + Tuple.tuple(2L, null), // the first delete tombstone without updated_by_seqno + Tuple.tuple(2L, 3L), // the second delete tombstone with updated_by_seqno + Tuple.tuple(3L, 4L), // index#3 -> delete#4 + Tuple.tuple(4L, null) // still alive + )); + } + } + + @TestLogging("_ROOT:DEBUG") + public void testKeepDocsForRollback() throws Exception { + IOUtils.close(engine, store); + threadPool = spy(threadPool); + AtomicLong clockTime = new AtomicLong(); + when(threadPool.relativeTimeInMillis()).thenAnswer(i -> clockTime.incrementAndGet()); + Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), TimeValue.timeValueMillis(between(0, 100)).getStringRep()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); + final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); + long globalCheckpoint = SequenceNumbers.NO_OPS_PERFORMED; + Map processedOps = new HashMap<>(); + SetOnce indexWriter = new SetOnce<>(); + IndexWriterFactory indexWriterFactory = (iwc, dir) -> { + indexWriter.set(new IndexWriter(iwc, dir)); + return indexWriter.get(); + }; + Set lastTombstones = Collections.emptySet(); + try (Store store = createStore(); + InternalEngine engine = createEngine(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, indexWriterFactory)) { + List operations = generateMultipleDocumentsHistory(true, VersionType.EXTERNAL, 1L, + 10, 200, Arrays.asList("1", "2", "3", "4", "5"), false, () -> between(1, 100)); + realisticShuffleOperations(operations); + for (Engine.Operation op : operations) { + Set tombstones = engine.getDeletedTombstones().stream().map(del -> del.seqNo).collect(Collectors.toSet()); + if (op instanceof Engine.Index) { + logger.debug("index id={} seq={} gcp={} tombstones={}", op.id(), op.seqNo(), globalCheckpoint, tombstones); + engine.index((Engine.Index) op); + } else if (op instanceof Engine.Delete) { + logger.debug("delete id={} seq={} gcp={} tombstones={}", op.id(), op.seqNo(), globalCheckpoint, tombstones); + engine.delete((Engine.Delete) op); + } + processedOps.put(op.seqNo(), op); + if (randomInt(100) < 20) { + assertDocumentsForRollback(engine, globalCheckpoint, processedOps); + } + // Trigger merge to reclaim some deleted documents + if (randomBoolean()) { + BooleanQuery retentionQuery = new BooleanQuery.Builder() + .add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, globalCheckpoint + 1, Long.MAX_VALUE), + BooleanClause.Occur.SHOULD) + .add(NumericDocValuesField.newSlowRangeQuery(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, globalCheckpoint + 1, Long.MAX_VALUE), + BooleanClause.Occur.SHOULD) + .build(); + List reclaimedOps = simulateMerge(engine, indexWriter.get(), retentionQuery) + .stream().map(processedOps::get).collect(Collectors.toList()); + for (Engine.Operation reclaimedOp : reclaimedOps) { + logger.debug("merge reclaim id={} seq={}", reclaimedOp.id(), reclaimedOp.seqNo()); + } + } + // Prune some delete tombstone + if (randomBoolean()) { + engine.maybePruneDeletes(); + } + Set prunedTombstone = Sets.difference(lastTombstones, tombstones); + for (long prunedSeqNo : prunedTombstone) { + logger.debug("prune tombstone id={} seq={}", processedOps.get(prunedSeqNo).id(), prunedSeqNo); + } + lastTombstones = tombstones; + globalCheckpoint = randomLongBetween(globalCheckpoint, engine.getLocalCheckpoint()); + } + assertDocumentsForRollback(engine, globalCheckpoint, processedOps); + } + } + + /** + * Here we simulate Lucene merges for these purposes: + * - The simulation can randomly reclaim a subset of reclaimable operations instead of all docs like the actual merges + * - The simulation is more deterministic than the actual merge and can return the operations have been reclaimed. + * + * @param retentionQuery deleted documents matching this query won't be reclaimed (see {@link SoftDeletesPolicy#getRetentionQuery()} + * @return a list of operations have been reclaimed + */ + private List simulateMerge(InternalEngine engine, IndexWriter indexWriter, Query retentionQuery) throws IOException { + try (Searcher engineSearcher = engine.acquireSearcher("simulate-merge", Engine.SearcherScope.INTERNAL)) { + IndexSearcher searcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader())); + BooleanQuery reclaimQuery = new BooleanQuery.Builder() + .add(new DocValuesFieldExistsQuery(Lucene.SOFT_DELETE_FIELD), BooleanClause.Occur.MUST) + .add(retentionQuery, BooleanClause.Occur.MUST_NOT).build(); + TopDocs reclaimableDocs = searcher.search(reclaimQuery, Integer.MAX_VALUE); + if (reclaimableDocs.scoreDocs.length == 0) { + return Collections.emptyList(); + } + List docsToReclaim = randomSubsetOf(Arrays.asList(reclaimableDocs.scoreDocs)); + DirectoryReader inReader = engineSearcher.getDirectoryReader(); + while (inReader instanceof FilterDirectoryReader) { + inReader = ((FilterDirectoryReader) inReader).getDelegate(); + } + List reclaimedOps = new ArrayList<>(); + for (ScoreDoc docToReclaim : docsToReclaim) { + if (indexWriter.tryDeleteDocument(inReader, docToReclaim.doc) != -1) { + reclaimedOps.add(readSeqNo(inReader, docToReclaim.doc)); + } + } + return reclaimedOps; + } + } + + /** + * This assertion asserts that the previous copy of every operation after the global_checkpoint is retained for rollback: + * 1. If the previous copy is an index, that copy must be retained + * 2. If the previous copy is a delete, either that copy or another delete or nothing is retained, but must not an index + */ + private void assertDocumentsForRollback(InternalEngine engine, long globalCheckpoint, + Map processedOps) throws IOException { + List rollbackOps = new ArrayList<>(); + Map restoringVersions = new HashMap<>(); + for (Engine.Operation op : processedOps.values()) { + if (op.seqNo() > globalCheckpoint) { + rollbackOps.add(op); + } else { + restoringVersions.compute(op.id(), (id, v) -> v == null || v.seqNo() < op.seqNo() ? op : v); + } + } + engine.refresh("test", Engine.SearcherScope.INTERNAL); + try (Searcher engineSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()); + IndexSearcher searcher = new IndexSearcher(reader); + searcher.setQueryCache(null); + for (Engine.Operation rollbackOp : rollbackOps) { + Engine.Operation restoringVersion = restoringVersions.get(rollbackOp.id()); + if (restoringVersion == null) { + continue; + } + BooleanQuery previousQuery = new BooleanQuery.Builder() + .add(new TermQuery(rollbackOp.uid()), BooleanClause.Occur.FILTER) + .add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, 0, globalCheckpoint), BooleanClause.Occur.FILTER) + .build(); + TopDocs previousDocs = searcher.search(previousQuery, 1, + new Sort(new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG, true))); + // If the previous copy is an index, that copy must be retained + if (restoringVersion instanceof Engine.Index) { + assertThat("restoring operation id=" + restoringVersion.id() + " seq=" + restoringVersion.seqNo() + + " has been reclaimed", previousDocs.totalHits, greaterThanOrEqualTo(1L)); + long foundSeqNo = readSeqNo(reader, previousDocs.scoreDocs[0].doc); + assertThat("rollback id=" + rollbackOp.id() + " seq=" + rollbackOp.seqNo(), foundSeqNo, equalTo(restoringVersion.seqNo())); + // If the previous copy is a delete, either that copy or another delete or nothing is retained, but must not an index + } else { + if (previousDocs.totalHits > 0) { + long actualSeqNo = readSeqNo(reader, previousDocs.scoreDocs[0].doc); + Engine.Operation foundOp = processedOps.get(actualSeqNo); + assertThat("rollback id=" + rollbackOp.id() + " seq=" + rollbackOp.seqNo() + ", found seq=" + foundOp.seqNo() + + ", expected seq=" + restoringVersion.seqNo(), foundOp, instanceOf(Engine.Delete.class)); + } + } + } + } + } + + private long readSeqNo(DirectoryReader reader, int docId) throws IOException { + List leaves = reader.leaves(); + LeafReaderContext leaf = leaves.get(ReaderUtil.subIndex(docId, leaves)); + int segmentDocId = docId - leaf.docBase; + NumericDocValues dv = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + assert dv != null : "SeqNoDV does not exist"; + if (dv.advanceExact(segmentDocId) == false) { + throw new AssertionError("doc " + docId + " does not have SeqNoDV"); + } + return dv.longValue(); + } + private static void trimUnsafeCommits(EngineConfig config) throws IOException { final Store store = config.getStore(); final TranslogConfig translogConfig = config.getTranslogConfig(); diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index 7de086a3be239..cb843043a1b9b 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -42,6 +42,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class LuceneChangesSnapshotTests extends EngineTestCase { private MapperService mapperService; @@ -187,7 +188,7 @@ public void testDedupByPrimaryTerm() throws Exception { while ((op = snapshot.next()) != null) { assertThat(op.toString(), op.primaryTerm(), equalTo(latestOperations.get(op.seqNo()))); } - assertThat(snapshot.skippedOperations(), equalTo(totalOps - latestOperations.size())); + assertThat(snapshot.skippedOperations(), greaterThanOrEqualTo(totalOps - latestOperations.size())); } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index a6e7d7d23543f..c28117fa8fe13 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -210,6 +210,7 @@ public void testCheckpointsAdvance() throws Exception { } } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/31637") public void testConflictingOpsOnReplica() throws Exception { Map mappings = Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}"); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 7e72dcf2aedf6..54b7171c3d2f4 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -49,6 +49,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.Lucene; @@ -619,15 +620,24 @@ protected static void assertVisibleCount(InternalEngine engine, int numDocs, boo public static List generateSingleDocHistory(boolean forReplica, VersionType versionType, long primaryTerm, int minOpCount, int maxOpCount, String docId) { + return generateMultipleDocumentsHistory(forReplica, versionType, primaryTerm, minOpCount, maxOpCount, + Collections.singletonList(docId), true, System::currentTimeMillis); + } + + public static List generateMultipleDocumentsHistory(boolean forReplica, VersionType versionType, long primaryTerm, + int minOpCount, int maxOpCount, List docIds, + boolean allowGapInSeqNo, LongSupplier currentTimeInMsSupplier) { final int numOfOps = randomIntBetween(minOpCount, maxOpCount); final List ops = new ArrayList<>(); - final Term id = newUid(docId); final int startWithSeqNo = 0; - final String valuePrefix = (forReplica ? "r_" : "p_" ) + docId + "_"; final boolean incrementTermWhenIntroducingSeqNo = randomBoolean(); + final int seqNoGap = allowGapInSeqNo && randomBoolean() ? 2 : 1; for (int i = 0; i < numOfOps; i++) { final Engine.Operation op; final long version; + String docId = randomFrom(docIds); + final Term id = newUid(docId); + final String valuePrefix = (forReplica ? "r_" : "p_" ) + docId + "_"; switch (versionType) { case INTERNAL: version = forReplica ? i : Versions.MATCH_ANY; @@ -646,27 +656,40 @@ public static List generateSingleDocHistory(boolean forReplica } if (randomBoolean()) { op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), B_1, null), - forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, + forReplica && i >= startWithSeqNo ? i * seqNoGap : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, forReplica ? null : versionType, forReplica ? REPLICA : PRIMARY, - System.currentTimeMillis(), -1, false + currentTimeInMsSupplier.getAsLong(), -1, false ); } else { op = new Engine.Delete("test", docId, id, - forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, + forReplica && i >= startWithSeqNo ? i * seqNoGap : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, forReplica ? null : versionType, forReplica ? REPLICA : PRIMARY, - System.currentTimeMillis()); + currentTimeInMsSupplier.getAsLong()); } ops.add(op); } return ops; } + /** + * Partitions a list of operations into a multiple sub-lists, then shuffles each sub-list. + * This method shuffles operations in a more realistic way than {@link Randomness#shuffle(List)}. + */ + public void realisticShuffleOperations(List operations) { + int index = 0; + while (index < operations.size()) { + int to = Math.min(operations.size(), index + between(10, 50)); + Randomness.shuffle(operations.subList(index, to)); // subList is a direct view + index = to; + } + } + public static void assertOpsOnReplica( final List ops, final InternalEngine replicaEngine,