diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 5fe10d8fc684f..f3cf4d756ddc5 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -27,7 +27,6 @@ import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.document.LatLonDocValuesField; -import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.FilterDirectoryReader; @@ -36,15 +35,18 @@ import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.Explanation; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.IndexSearcher; @@ -66,19 +68,23 @@ import org.apache.lucene.store.Lock; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.Version; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.analysis.AnalyzerScope; import org.elasticsearch.index.analysis.NamedAnalyzer; import org.elasticsearch.index.fielddata.IndexFieldData; import java.io.IOException; +import java.io.UncheckedIOException; import java.text.ParseException; import java.util.ArrayList; import java.util.Collection; @@ -99,6 +105,9 @@ public class Lucene { } public static final String SOFT_DELETE_FIELD = "__soft_delete"; + // We can't use hard-deletes with soft-deletes together because we can't exclude documents that hard-deleted and soft-deleted + // from liveDocs. We use this numeric docValues to exclude rolled back documents from liveDocs. + public static final String ROLLED_BACK_FIELD = "__rolled_back"; public static final NamedAnalyzer STANDARD_ANALYZER = new NamedAnalyzer("_standard", AnalyzerScope.GLOBAL, new StandardAnalyzer()); public static final NamedAnalyzer KEYWORD_ANALYZER = new NamedAnalyzer("_keyword", AnalyzerScope.GLOBAL, new KeywordAnalyzer()); @@ -837,28 +846,120 @@ public int length() { } /** - * Wraps a directory reader to include all live docs. - * The wrapped reader can be used to query all documents. - * - * @param in the input directory reader - * @return the wrapped reader + * Creates a wrapper that wraps a provided directory reader to make all docs as live except those marked as rolled back. + * The wrapped reader should be used to query history in Lucene index. */ - public static DirectoryReader wrapAllDocsLive(DirectoryReader in) throws IOException { - return new DirectoryReaderWithAllLiveDocs(in); - } + public static CheckedFunction noDeletesReaderWrapper() { + NoDeletesReaderWrapper wrapper = new NoDeletesReaderWrapper(); + return wrapper::wrapNoDeletes; + } + + // pkg level for testing + static final class NoDeletesReaderWrapper { + // Docs are marked as rolled back only via changing the rollback docValues field. This update will increase the docValuesGen. + // We can use CoreCacheKey of the reader as the key, then recompute if the associated liveDocs has a different docValuesGen. + final Map liveDocsCache = ConcurrentCollections.newConcurrentMap(); + + DirectoryReader wrapNoDeletes(DirectoryReader in) throws IOException { + return new NoDeletesDirectoryReader(in); + } + + private static final class NoDeletesLiveDocs { + final int numDocs; + final FixedBitSet bits; + final long docValueGen; + NoDeletesLiveDocs(FixedBitSet bits, int numDocs, long docValuesGen) { + assert numDocs == bits.cardinality(); + this.numDocs = numDocs; + this.bits = bits; + this.docValueGen = docValuesGen; + } + } + + private final class NoDeletesDirectoryReader extends FilterDirectoryReader { + NoDeletesDirectoryReader(DirectoryReader in) throws IOException { + super(in, new NoDeletesSubReaderWrapper()); + } + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { + return new NoDeletesDirectoryReader(in); + } + @Override + public CacheHelper getReaderCacheHelper() { + return null; // Modifying liveDocs + } + } - private static final class DirectoryReaderWithAllLiveDocs extends FilterDirectoryReader { - static final class SubReaderWithAllLiveDocs extends FilterLeafReader { - SubReaderWithAllLiveDocs(LeafReader in) { + private final class NoDeletesSubReaderWrapper extends FilterDirectoryReader.SubReaderWrapper { + @Override + public LeafReader wrap(LeafReader leaf) { + try { + if (leaf.getLiveDocs() == null) { + return leaf; + } + DocIdSetIterator rollbackDocs = DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator(ROLLED_BACK_FIELD, leaf); + if (rollbackDocs == null) { + return new SubReaderWithLiveDocs(leaf, null, leaf.maxDoc()); + } + final SegmentReader segmentReader = segmentReader(leaf); + final long docValuesGen = segmentReader.getSegmentInfo().getDocValuesGen(); + final IndexReader.CacheHelper cacheHelper = segmentReader.getCoreCacheHelper(); + final NoDeletesLiveDocs liveDocs; + if (cacheHelper != null) { + liveDocs = liveDocsCache.compute(cacheHelper.getKey(), (k, v) -> { + if (v == null) { + // only need to register for the first time + cacheHelper.addClosedListener(liveDocsCache::remove); + } + if (v == null || v.docValueGen != docValuesGen) { + return makeLiveDocs(segmentReader, rollbackDocs, docValuesGen); + } else { + assert v.bits.equals(makeLiveDocs(leaf, rollbackDocs, docValuesGen).bits); + return v; + } + }); + } else { + liveDocs = makeLiveDocs(segmentReader, rollbackDocs, docValuesGen); + } + return new SubReaderWithLiveDocs(leaf, liveDocs.bits, liveDocs.numDocs); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + NoDeletesLiveDocs makeLiveDocs(LeafReader leaf, DocIdSetIterator rollbackDocs, long docValuesGen) { + try { + int rollbackCount = 0; + FixedBitSet liveDocs = new FixedBitSet(leaf.maxDoc()); + liveDocs.set(0, liveDocs.length()); + int docId; + while ((docId = rollbackDocs.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + assert leaf.getLiveDocs().get(docId) == false : "doc [" + docId + "] is rolled back but not deleted"; + liveDocs.clear(docId); + rollbackCount++; + } + return new NoDeletesLiveDocs(liveDocs, liveDocs.length() - rollbackCount, docValuesGen); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + } + + private static final class SubReaderWithLiveDocs extends FilterLeafReader { + final Bits liveDocs; + final int numDocs; + SubReaderWithLiveDocs(LeafReader in, Bits liveDocs, int numDocs) { super(in); + this.liveDocs = liveDocs; + this.numDocs = numDocs; } @Override public Bits getLiveDocs() { - return null; + return liveDocs; } @Override public int numDocs() { - return maxDoc(); + return numDocs; } @Override public CacheHelper getCoreCacheHelper() { @@ -869,29 +970,17 @@ public CacheHelper getReaderCacheHelper() { return null; // Modifying liveDocs } } - DirectoryReaderWithAllLiveDocs(DirectoryReader in) throws IOException { - super(in, new FilterDirectoryReader.SubReaderWrapper() { - @Override - public LeafReader wrap(LeafReader leaf) { - return new SubReaderWithAllLiveDocs(leaf); - } - }); - } - @Override - protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { - return wrapAllDocsLive(in); - } - - @Override - public CacheHelper getReaderCacheHelper() { - return null; // Modifying liveDocs - } } - /** - * Returns a numeric docvalues which can be used to soft-delete documents. - */ - public static NumericDocValuesField newSoftDeleteField() { - return new NumericDocValuesField(SOFT_DELETE_FIELD, 1); + /** A shortcut allows reading numeric docValues once. The docId must have value in the accessing DV */ + public static long readNumericDV(LeafReader leafReader, String field, int docId) throws IOException { + NumericDocValues dv = leafReader.getNumericDocValues(field); + if (dv == null) { + throw new IllegalStateException("missing docValues [" + field + "]"); + } + if (dv.advanceExact(docId) == false) { + throw new IllegalStateException("doc [" + docId + "] does not exist in [" + field + "] docValues"); + } + return dv.longValue(); } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 81fe7f60bc5d6..dd4ba3e8853d1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -22,25 +22,38 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.document.Field; +import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.ReaderUtil; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.SearcherFactory; import org.apache.lucene.search.SearcherManager; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.LockObtainFailedException; @@ -49,6 +62,7 @@ import org.elasticsearch.Assertions; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.lease.Releasable; @@ -65,12 +79,15 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.fieldvisitor.FieldsVisitor; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.seqno.LocalCheckpointTracker; @@ -88,7 +105,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -149,10 +165,13 @@ public class InternalEngine extends Engine { private final CounterMetric numDocDeletes = new CounterMetric(); private final CounterMetric numDocAppends = new CounterMetric(); private final CounterMetric numDocUpdates = new CounterMetric(); - private final NumericDocValuesField softDeleteField = Lucene.newSoftDeleteField(); + private final NumericDocValuesField softDeleteField = new NumericDocValuesField(Lucene.SOFT_DELETE_FIELD, 1); + private final NumericDocValuesField unsoftDeleteField = new NumericDocValuesField(Lucene.SOFT_DELETE_FIELD, null); + private final NumericDocValuesField rolledbackField = new NumericDocValuesField(Lucene.ROLLED_BACK_FIELD, 1); private final boolean softDeleteEnabled; private final SoftDeletesPolicy softDeletesPolicy; private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener; + private final CheckedFunction noDeletesReaderWrapper = Lucene.noDeletesReaderWrapper(); /** * How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this @@ -1792,8 +1811,8 @@ void clearDeletedTombstones() { } // for testing - final Collection getDeletedTombstones() { - return versionMap.getAllTombstones().values(); + final Map getDeletedTombstones() { + return new HashMap<>(versionMap.getAllTombstones()); } @Override @@ -2467,8 +2486,8 @@ public Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService m } Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL); try { - LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot( - searcher, mapperService, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, minSeqNo, maxSeqNo, requiredFullRange); + LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(noDeletesReaderWrapper.apply(searcher.getDirectoryReader()), + mapperService, searcher, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, minSeqNo, maxSeqNo, requiredFullRange); searcher = null; return snapshot; } finally { @@ -2581,4 +2600,166 @@ public void afterRefresh(boolean didRefresh) { } } } + + /** + * Try to rollback the colliding operation (whose seqno equals to the newOp's) in this engine. + * If the colliding operation matches the newOp, this method won't undo the existing operation and return false. + * @return {@code true} if there is a difference between the colliding operation and the new operation + */ + boolean maybeRollback(MapperService mapperService, Operation newOp) throws IOException { + try (ReleasableLock ignored = readLock.acquire(); + Searcher engineSearcher = acquireSearcher("rollback", SearcherScope.INTERNAL)) { + IndexSearcher searcher = new IndexSearcher(noDeletesReaderWrapper.apply(engineSearcher.getDirectoryReader())); + searcher.setQueryCache(null); + CollidingDoc collidingDoc = findCollidingDoc(searcher, mapperService, newOp.seqNo(), newOp.primaryTerm(), false); + if (collidingDoc != null && collidingDoc.term == newOp.primaryTerm()) { + return false; + } + } + Searcher engineSearcher = null; + try (ReleasableLock ignored = writeLock.acquire()) { + refresh("rollback", SearcherScope.INTERNAL); + engineSearcher = acquireSearcher("rollback", SearcherScope.INTERNAL); + IndexSearcher searcher = new IndexSearcher(noDeletesReaderWrapper.apply(engineSearcher.getDirectoryReader())); + searcher.setQueryCache(null); + CollidingDoc collidingDoc = findCollidingDoc(searcher, mapperService, newOp.seqNo(), newOp.primaryTerm(), true); + if (collidingDoc != null && collidingDoc.term == newOp.primaryTerm()) { + return false; + } + // delete the current version by marking it as rolled back + final Query currentVersionQuery = LongPoint.newExactQuery(SeqNoFieldMapper.NAME, newOp.seqNo()); + while (tryUpdateDocValues(searcher, currentVersionQuery, softDeleteField, rolledbackField) == false) { + final Searcher oldSearcher = engineSearcher; + engineSearcher = null; + IOUtils.close(oldSearcher); + refresh("rollback", SearcherScope.INTERNAL); + engineSearcher = acquireSearcher("rollback", SearcherScope.INTERNAL); + searcher = new IndexSearcher(noDeletesReaderWrapper.apply(engineSearcher.getDirectoryReader())); + searcher.setQueryCache(null); + } + // restore the previous version + if (collidingDoc != null && collidingDoc.uid != null) { + final BytesRef uid = Uid.encodeId(collidingDoc.uid.id()); + try (Releasable uidLock = versionMap.acquireLock(uid)) { + restorePreviousVersion(searcher, uid, newOp.seqNo()); + } + } + return true; + } catch (IOException e) { + try { + maybeFailEngine("rollback", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw e; + } finally { + IOUtils.close(engineSearcher); + } + } + + private void restorePreviousVersion(IndexSearcher searcher, BytesRef uid, long seqNo) throws IOException { + assert writeLock.isHeldByCurrentThread() : Thread.currentThread().getName(); + final BooleanQuery newerVersionQuery = new BooleanQuery.Builder() + .add(new TermQuery(new Term(IdFieldMapper.NAME, uid)), BooleanClause.Occur.FILTER) + .add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, seqNo + 1, Long.MAX_VALUE), BooleanClause.Occur.FILTER).build(); + // we should only restore if the current is the latest version. + if (searcher.count(newerVersionQuery) > 0) { + return; + } + final BooleanQuery prevVersionQuery = new BooleanQuery.Builder() + .add(new TermQuery(new Term(IdFieldMapper.NAME, uid)), BooleanClause.Occur.FILTER) + .add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, 0, seqNo - 1), BooleanClause.Occur.FILTER).build(); + final TopDocs prevDocs = searcher.search(prevVersionQuery, 1, + new Sort(new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG, true))); + if (prevDocs.totalHits == 0) { + // this is the latest version and was rolled back - remove its versionMap/tombstone + versionMap.removeIndexAndDeleteUnderLock(uid); + return; + } + final List leaves = searcher.getIndexReader().leaves(); + final LeafReaderContext prevSegment = leaves.get(ReaderUtil.subIndex(prevDocs.scoreDocs[0].doc, leaves)); + final int prevSegmentDocId = prevDocs.scoreDocs[0].doc - prevSegment.docBase; + final long prevSeqNo = Lucene.readNumericDV(prevSegment.reader(), SeqNoFieldMapper.NAME, prevSegmentDocId); + // we should not restore if the previous version was a delete because a delete should always be soft-deleted. + // however, we should restore the tombstone of that delete on the version map. + final NumericDocValues tombstoneDV = prevSegment.reader().getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); + if (tombstoneDV == null || (tombstoneDV.advanceExact(prevSegmentDocId) == false)) { + // restore the previous version by un-deleting it. + final Query restoringQuery = LongPoint.newExactQuery(SeqNoFieldMapper.NAME, prevSeqNo); + Engine.Searcher engineSearcher = null; + try { + while (tryUpdateDocValues(searcher, restoringQuery, unsoftDeleteField) == false) { + final Searcher oldEngineSearcher = engineSearcher; + engineSearcher = null; + IOUtils.close(oldEngineSearcher); + refresh("rollback", SearcherScope.INTERNAL); + engineSearcher = acquireSearcher("rollback", SearcherScope.INTERNAL); + searcher = new IndexSearcher(noDeletesReaderWrapper.apply(engineSearcher.getDirectoryReader())); + searcher.setQueryCache(null); + } + // if the update was aborted/retried, we need to refresh here to avoid exposing a partial change. + if (engineSearcher != null) { + refresh("rollback", SearcherScope.INTERNAL); + } + } finally { + IOUtils.close(engineSearcher); + } + versionMap.removeIndexAndDeleteUnderLock(uid); + } else { + long prevVersion = Lucene.readNumericDV(prevSegment.reader(), VersionFieldMapper.NAME, prevSegmentDocId); + long prevTerm = Lucene.readNumericDV(prevSegment.reader(), SeqNoFieldMapper.PRIMARY_TERM_NAME, prevSegmentDocId); + versionMap.putDeleteUnderLock(uid, new DeleteVersionValue( + prevVersion, prevSeqNo, prevTerm, config().getThreadPool().relativeTimeInMillis())); + } + } + + /** @return false if the update was aborted due to merges, the caller need to refresh an engine, then retry */ + private boolean tryUpdateDocValues(IndexSearcher searcher, Query query, NumericDocValuesField... fields) throws IOException { + ScoreDoc lastDoc = null; + do { + final TopDocs topDocs = searcher.searchAfter(lastDoc, query, 10); + lastDoc = null; + final DirectoryReader unwrapped = FilterDirectoryReader.unwrap((DirectoryReader) searcher.getIndexReader()); + for (ScoreDoc scoreDoc : topDocs.scoreDocs) { + if (indexWriter.tryUpdateDocValue(unwrapped, scoreDoc.doc, fields) == -1) { + return false; + } + lastDoc = scoreDoc; + } + } while (lastDoc != null); + return true; + } + + private static class CollidingDoc { + final long term; + final Uid uid; + CollidingDoc(long term, Uid uid) { + this.term = term; + this.uid = uid; + } + } + + private CollidingDoc findCollidingDoc(IndexSearcher searcher, MapperService mapperService, + long seqNo, long newTerm, boolean loadUid) throws IOException { + TopDocs topDocs = searcher.search(LongPoint.newExactQuery(SeqNoFieldMapper.NAME, seqNo), 1, + // we need to sort to find the primary term in case the target is a nested doc. + new Sort(new SortedNumericSortField(SeqNoFieldMapper.PRIMARY_TERM_NAME, SortField.Type.LONG, true))); + if (topDocs.totalHits == 0) { + return null; + } + List leaves = searcher.getIndexReader().leaves(); + LeafReaderContext leaf = leaves.get(ReaderUtil.subIndex(topDocs.scoreDocs[0].doc, leaves)); + int docId = topDocs.scoreDocs[0].doc - leaf.docBase; + long existingTerm = Lucene.readNumericDV(leaf.reader(), SeqNoFieldMapper.PRIMARY_TERM_NAME, docId); + if (existingTerm == newTerm || loadUid == false) { + // if term matches, we don't need load _uid because we won't rollback + return new CollidingDoc(existingTerm, null); + } else { + final FieldsVisitor fields; + fields = new FieldsVisitor(false); + leaf.reader().document(docId, fields); + fields.postProcess(mapperService); + return new CollidingDoc(existingTerm, fields.uid()); + } + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index 18d3cedb37e60..d589ab663c5dd 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -169,7 +169,9 @@ void adjustRam(long value) { void remove(BytesRef uid, DeleteVersionValue deleted) { VersionValue previousValue = current.remove(uid); - current.updateMinDeletedTimestamp(deleted); + if (deleted != null) { + current.updateMinDeletedTimestamp(deleted); + } if (previousValue != null) { long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; adjustRam(-(BASE_BYTES_PER_CHM_ENTRY + previousValue.ramBytesUsed() + uidRAMBytesUsed)); @@ -360,7 +362,7 @@ private void putTombstone(BytesRef uid, DeleteVersionValue version) { /** * Removes this uid from the pending deletes map. */ - void removeTombstoneUnderLock(BytesRef uid) { + boolean removeTombstoneUnderLock(BytesRef uid) { assert assertKeyedLockHeldByCurrentThread(uid); long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; final VersionValue prev = tombstones.remove(uid); @@ -369,6 +371,17 @@ void removeTombstoneUnderLock(BytesRef uid) { long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prev.ramBytesUsed() + uidRAMBytesUsed)); assert v >= 0 : "bytes=" + v; } + return prev != null; + } + + /** + * Removes this uid from the current version map and the pending deletes map. + */ + void removeIndexAndDeleteUnderLock(BytesRef uid) { + assert assertKeyedLockHeldByCurrentThread(uid); + if (removeTombstoneUnderLock(uid) == false) { + maps.remove(uid, null); + } } private boolean canRemoveTombstone(long maxTimestampToPrune, long maxSeqNoToPrune, DeleteVersionValue versionValue) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index 6b0848e79b5a2..6bd5e953796d2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.engine; import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; @@ -75,15 +76,16 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { /** * Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range. * - * @param engineSearcher the internal engine searcher which will be taken over if the snapshot is opened successfully + * @param directoryReader the directory reader which will be used to search history documents in Lucene index * @param mapperService the mapper service which will be mainly used to resolve the document's type and uid + * @param onClose a call back that will be called when this snapshot is closed * @param searchBatchSize the number of documents should be returned by each search * @param fromSeqNo the min requesting seq# - inclusive * @param toSeqNo the maximum requesting seq# - inclusive * @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo */ - LuceneChangesSnapshot(Engine.Searcher engineSearcher, MapperService mapperService, int searchBatchSize, - long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { + LuceneChangesSnapshot(DirectoryReader directoryReader, MapperService mapperService, Closeable onClose, + int searchBatchSize, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) { throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]"); } @@ -93,7 +95,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { final AtomicBoolean closed = new AtomicBoolean(); this.onClose = () -> { if (closed.compareAndSet(false, true)) { - IOUtils.close(engineSearcher); + IOUtils.close(onClose); } }; this.mapperService = mapperService; @@ -102,7 +104,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { this.toSeqNo = toSeqNo; this.lastSeenSeqNo = fromSeqNo - 1; this.requiredFullRange = requiredFullRange; - this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader())); + this.indexSearcher = new IndexSearcher(directoryReader); this.indexSearcher.setQueryCache(null); this.parallelArray = new ParallelArray(searchBatchSize); final TopDocs topDocs = searchOperations(null); diff --git a/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java b/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java index 753aedea01e02..54165e8c40843 100644 --- a/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java +++ b/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java @@ -23,6 +23,7 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.DirectoryReader; @@ -33,26 +34,38 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.RandomIndexWriter; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.Weight; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.Bits; +import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.test.ESTestCase; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.core.IsNot.not; + + public class LuceneTests extends ESTestCase { public void testWaitForIndex() throws Exception { final MockDirectoryWrapper dir = newMockDirectory(); @@ -406,4 +419,91 @@ public void testMMapHackSupported() throws Exception { // add assume's here if needed for certain platforms, but we should know if it does not work. assertTrue("MMapDirectory does not support unmapping: " + MMapDirectory.UNMAP_NOT_SUPPORTED_REASON, MMapDirectory.UNMAP_SUPPORTED); } + + public void testNoDeletesReaderWrapper() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig config = newIndexWriterConfig().setSoftDeletesField(Lucene.SOFT_DELETE_FIELD) + .setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, MatchAllDocsQuery::new, newMergePolicy())); + IndexWriter writer = new IndexWriter(dir, config); + int numDocs = between(10, 100); + Set liveDocs = new HashSet<>(); + Set rollbacks = new HashSet<>(); + for (int i = 0; i < numDocs; i++) { + String id = Integer.toString(i); + Document doc = new Document(); + doc.add(new StringField("id", id, Store.YES)); + writer.addDocument(doc); + liveDocs.add(id); + } + writer.flush(); + NumericDocValuesField softDeletesField = new NumericDocValuesField(Lucene.SOFT_DELETE_FIELD, 1); + NumericDocValuesField rollbackField = new NumericDocValuesField(Lucene.ROLLED_BACK_FIELD, 1); + for (int i = 0; i < numDocs; i++) { + if (randomBoolean()) { + String id = Integer.toString(i); + Document doc = new Document(); + doc.add(new StringField("id", "v2-" + id, Store.YES)); + if (randomBoolean()) { + writer.softUpdateDocument(new Term("id", id), doc, softDeletesField); + } else { + writer.softUpdateDocument(new Term("id", id), doc, softDeletesField, rollbackField); + liveDocs.remove(id); // exclude the rolled back doc + rollbacks.add(id); + } + liveDocs.add("v2-" + id); + } + } + CheckedFunction, IOException> getDocIds = reader -> { + IndexSearcher searcher = new IndexSearcher(reader); + TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE); + Set actualDocs = new HashSet<>(); + for (ScoreDoc scoreDoc : topDocs.scoreDocs) { + actualDocs.add(reader.document(scoreDoc.doc).get("id")); + } + return actualDocs; + }; + + Lucene.NoDeletesReaderWrapper wrapper = new Lucene.NoDeletesReaderWrapper(); + try (DirectoryReader reader = DirectoryReader.open(writer)) { + DirectoryReader wrapped = wrapper.wrapNoDeletes(reader); + if (rollbacks.isEmpty()) { + assertThat(wrapper.liveDocsCache.size(), equalTo(0)); + } else { + assertThat(wrapper.liveDocsCache.size(), greaterThan(0)); + } + assertThat(wrapped.numDocs(), equalTo(liveDocs.size())); + assertThat(getDocIds.apply(wrapped), equalTo(liveDocs)); + Map currentCache = new HashMap<>(wrapper.liveDocsCache); + assertThat(getDocIds.apply(wrapper.wrapNoDeletes(reader)), equalTo(liveDocs)); + assertThat("liveDocs cache should not change", wrapper.liveDocsCache, equalTo(currentCache)); + } + int updates = between(1, 10); + for (int i = 0; i < updates && liveDocs.isEmpty() == false; i++) { + String rollbackId = randomFrom(liveDocs); + boolean shouldRetry; + Map prevCache; + do { + shouldRetry = false; + try (DirectoryReader reader = DirectoryReader.open(writer)) { + DirectoryReader wrapped = wrapper.wrapNoDeletes(reader); + prevCache = new HashMap<>(wrapper.liveDocsCache); + IndexSearcher searcher = new IndexSearcher(wrapped); + TopDocs topDocs = searcher.search(new TermQuery(new Term("id", rollbackId)), Integer.MAX_VALUE); + for (ScoreDoc scoreDoc : topDocs.scoreDocs) { + if (writer.tryUpdateDocValue(reader, scoreDoc.doc, softDeletesField, rollbackField) == -1) { + shouldRetry = true; + break; + } + } + } + } while (shouldRetry); + liveDocs.remove(rollbackId); // exclude the rolled back doc + try (DirectoryReader reader = DirectoryReader.open(writer)) { + assertThat(getDocIds.apply(wrapper.wrapNoDeletes(reader)), equalTo(liveDocs)); + assertThat("liveDocs cache should be invalidated when updated", prevCache, not(equalTo(wrapper.liveDocsCache))); + } + } + IOUtils.close(writer, dir); + assertThat(wrapper.liveDocsCache.size(), equalTo(0)); + } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 21f1781cc65a0..bbe652f4e5d17 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -83,6 +83,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; @@ -113,6 +114,7 @@ import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -139,6 +141,7 @@ import java.util.Base64; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; @@ -171,6 +174,7 @@ import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; @@ -178,6 +182,7 @@ import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; @@ -4156,6 +4161,68 @@ public void assertNotSameReader(Searcher left, Searcher right) { } } + public void assertSameLuceneHistory(InternalEngine primary, InternalEngine replica) throws IOException { + MapperService mapper = createMapperService("test"); + try (Translog.Snapshot pSnapshot = primary.newLuceneChangesSnapshot("test", mapper, 0, Long.MAX_VALUE, false)) { + try (Translog.Snapshot rSnapshot = replica.newLuceneChangesSnapshot("test", mapper, 0, Long.MAX_VALUE, false)) { + assertThat(pSnapshot.totalOperations(), equalTo(rSnapshot.totalOperations())); + Translog.Operation pOp; + while ((pOp = pSnapshot.next()) != null) { + Translog.Operation rOp = rSnapshot.next(); + assertThat(rOp, notNullValue()); + BytesStreamOutput pOut = new BytesStreamOutput(); + Translog.Operation.writeOperation(pOut, pOp); + BytesStreamOutput rOut = new BytesStreamOutput(); + Translog.Operation.writeOperation(rOut, rOp); + assertThat("pOp[" + pOp + "], rOp[" + rOp + "]", rOut.bytes(), equalTo(pOut.bytes())); + } + } + } + } + + /** + * This assertion asserts that the delete tombstones map satisfy these requirements: + * 1. Live docs must not exist in the tombstones map + * 2. All deletes after the local checkpoint must be retained in the tombstones map + * 3. Any retained deletes must reflect the latest version of that document + */ + public void assertDeleteTombstoneRequirements(InternalEngine engine) throws IOException { + Map tombstones = engine.getDeletedTombstones().entrySet().stream() + .collect(Collectors.toMap(e -> Uid.decodeId(e.getKey().bytes), e -> e.getValue().seqNo)); + Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + Map liveDocs = new HashMap<>(); // must not have any delete tombstone for live docs + Map deletedDocs = new HashMap<>(); // deletes after global checkpoint must be retained + try (Translog.Snapshot snapshot = new LuceneChangesSnapshot( + Lucene.noDeletesReaderWrapper().apply(searcher.getDirectoryReader()), + createMapperService("test"), searcher, 100, 0, Long.MAX_VALUE, false)) { + searcher = null; + Translog.Operation op; + while ((op = snapshot.next()) != null) { + if (op instanceof Translog.Index) { + String id = ((Translog.Index) op).id(); + deletedDocs.remove(id); + liveDocs.put(id, op.seqNo()); + } else if (op instanceof Translog.Delete) { + String id = ((Translog.Delete) op).id(); + liveDocs.remove(id); + deletedDocs.put(id, op.seqNo()); + } + } + } finally { + IOUtils.close(searcher); + } + for (Map.Entry entry : liveDocs.entrySet()) { + assertThat("live entry [" + entry + "]", tombstones, not(hasKey(entry.getKey()))); + } + for (Map.Entry entry : deletedDocs.entrySet()) { + if (entry.getValue() <= engine.getLocalCheckpoint()) { + assertThat(tombstones, anyOf(not(hasKey(entry.getKey())), hasEntry(entry.getKey(), entry.getValue()))); + } else { + assertThat(tombstones, hasEntry(entry.getKey(), entry.getValue())); + } + } + } + public void testRefreshScopedSearcher() throws IOException { try (Store store = createStore(); InternalEngine engine = @@ -4658,18 +4725,18 @@ public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception { engine.delete(replicaDeleteForDoc(UUIDs.randomBase64UUID(), 1, seqno, threadPool.relativeTimeInMillis())); } } - List tombstones = new ArrayList<>(engine.getDeletedTombstones()); + List tombstones = new ArrayList<>(engine.getDeletedTombstones().values()); engine.config().setEnableGcDeletes(true); // Prune tombstones whose seqno < gap_seqno and timestamp < clock-gcInterval. clock.set(randomLongBetween(gcInterval, deleteBatch + gcInterval)); engine.refresh("test"); tombstones.removeIf(v -> v.seqNo < gapSeqNo && v.time < clock.get() - gcInterval); - assertThat(engine.getDeletedTombstones(), containsInAnyOrder(tombstones.toArray())); + assertThat(engine.getDeletedTombstones().values(), containsInAnyOrder(tombstones.toArray())); // Prune tombstones whose seqno at most the local checkpoint (eg. seqno < gap_seqno). clock.set(randomLongBetween(deleteBatch + gcInterval * 4/3, 100)); // Need a margin for gcInterval/4. engine.refresh("test"); tombstones.removeIf(v -> v.seqNo < gapSeqNo); - assertThat(engine.getDeletedTombstones(), containsInAnyOrder(tombstones.toArray())); + assertThat(engine.getDeletedTombstones().values(), containsInAnyOrder(tombstones.toArray())); // Fill the seqno gap - should prune all tombstones. clock.set(between(0, 100)); if (randomBoolean()) { @@ -4679,7 +4746,7 @@ public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception { } clock.set(randomLongBetween(100 + gcInterval * 4/3, Long.MAX_VALUE)); // Need a margin for gcInterval/4. engine.refresh("test"); - assertThat(engine.getDeletedTombstones(), empty()); + assertThat(engine.getDeletedTombstones().keySet(), empty()); } } @@ -4947,6 +5014,119 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { } } + public void testMaybeRollback() throws Exception { + IOUtils.close(engine, store); + threadPool = spy(threadPool); + AtomicLong clockTime = new AtomicLong(); + when(threadPool.relativeTimeInMillis()).thenAnswer(i -> clockTime.incrementAndGet()); + List newPrimaryOps = new ArrayList<>(); + List replicaOps = new ArrayList<>(); + List resyncOps = new ArrayList<>(); + int numOps = scaledRandomIntBetween(1, 500); + Map versions = new HashMap<>(); + long oldTerm = between(1, 10); + long newTerm = oldTerm + between(1, 10); + long startRollbackSeqNo = between(0, numOps - 1); + int numOpsToRollback = 0; + for (int i = 0; i < numOps; i++) { + // create a hole in seqno to prevent the local checkpoint from advancing + if (i > startRollbackSeqNo && rarely()) { + continue; + } + long seqNo = i; + String id = Integer.toString(randomIntBetween(1, 10)); + long version = versions.compute(id, (k, v) -> (v == null ? 1 : v) + between(1, 10)); + BiFunction nextOp = (type, primaryTerm) -> { + if (type == Engine.Operation.TYPE.INDEX) { + ParsedDocument doc = createParsedDoc(id, null); + return new Engine.Index(newUid(doc), doc, seqNo, primaryTerm, version, VersionType.EXTERNAL, REPLICA, + threadPool.relativeTimeInMillis(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, randomBoolean()); + } else if (type == Engine.Operation.TYPE.DELETE) { + return new Engine.Delete("test", id, newUid(id), seqNo, primaryTerm, version, VersionType.EXTERNAL, + Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis()); + } else { + return new Engine.NoOp(seqNo, primaryTerm, REPLICA, threadPool.relativeTimeInMillis(), "test-" + seqNo); + } + }; + Engine.Operation.TYPE primaryOpType = randomFrom(Engine.Operation.TYPE.values()); + if (seqNo <= startRollbackSeqNo) { + replicaOps.add(nextOp.apply(primaryOpType, oldTerm)); + newPrimaryOps.add(nextOp.apply(primaryOpType, oldTerm)); + } else { + boolean notArrivedYetOnReplica = randomBoolean(); + if (notArrivedYetOnReplica) { + long term = randomFrom(oldTerm, newTerm); + resyncOps.add(nextOp.apply(primaryOpType, term)); + newPrimaryOps.add(nextOp.apply(primaryOpType, term)); + numOpsToRollback++; + } else { + boolean rollbackWithSameOp = randomBoolean(); + if (rollbackWithSameOp) { + newPrimaryOps.add(nextOp.apply(primaryOpType, oldTerm)); + resyncOps.add(nextOp.apply(primaryOpType, oldTerm)); + replicaOps.add(nextOp.apply(primaryOpType, oldTerm)); + } else { + newPrimaryOps.add(nextOp.apply(primaryOpType, newTerm)); + resyncOps.add(nextOp.apply(primaryOpType, newTerm)); + replicaOps.add(nextOp.apply(randomFrom(Engine.Operation.TYPE.values()), oldTerm)); + numOpsToRollback++; + } + } + } + } + + Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), TimeValue.timeValueMillis(between(0, 100)).getStringRep()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); + final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); + primaryTerm.set(newTerm); + IndexWriterFactory iwf = (dir, conf) -> new IndexWriter(dir, conf) { + @Override + public long tryUpdateDocValue(IndexReader readerIn, int docID, Field... fields) throws IOException { + if (rarely()) { + // make tryUpdateDocValue aborted + forceMerge(between(1, 3), randomBoolean()); + } + return super.tryUpdateDocValue(readerIn, docID, fields); + } + }; + Randomness.shuffle(newPrimaryOps); + Randomness.shuffle(replicaOps); + Randomness.shuffle(resyncOps); + MapperService mapperService = createMapperService("test"); + + try (Store newPrimaryStore = createStore(); + InternalEngine newPrimaryEngine = createEngine(indexSettings, newPrimaryStore, createTempDir(), newMergePolicy()); + Store replicaStore = createStore(); + InternalEngine replicaEngine = createEngine(indexSettings, replicaStore, createTempDir(), newMergePolicy(), iwf)) { + applyOperations(newPrimaryOps, newPrimaryEngine); + applyOperations(replicaOps, replicaEngine); + replicaEngine.getLocalCheckpointTracker().resetCheckpoint(startRollbackSeqNo); + replicaEngine.rollTranslogGeneration(); + replicaEngine.refresh("test"); + // Rollback + int rolledBackCount = 0; + for (Engine.Operation resyncOp : resyncOps) { + if (replicaEngine.maybeRollback(mapperService, resyncOp)) { + rolledBackCount++; + applyOperations(Collections.singletonList(resyncOp), replicaEngine); + replicaEngine.refresh("test", Engine.SearcherScope.INTERNAL); + }else { + replicaEngine.getLocalCheckpointTracker().markSeqNoAsCompleted(resyncOp.seqNo()); + } + } + assertThat(rolledBackCount, equalTo(numOpsToRollback)); + assertThat(replicaEngine.getLocalCheckpoint(), equalTo(newPrimaryEngine.getLocalCheckpoint())); + newPrimaryEngine.refresh("test"); + replicaEngine.refresh("test"); + assertSameLuceneHistory(newPrimaryEngine, replicaEngine); + assertDeleteTombstoneRequirements(replicaEngine); + assertConsistentHistoryBetweenTranslogAndLuceneIndex(newPrimaryEngine, mapperService); + } + } + private static void trimUnsafeCommits(EngineConfig config) throws IOException { final Store store = config.getStore(); final TranslogConfig translogConfig = config.getTranslogConfig(); diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index 7de086a3be239..57e219a43b9a5 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -19,6 +19,9 @@ package org.elasticsearch.index.engine; +import org.apache.lucene.index.DirectoryReader; +import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; @@ -59,6 +62,7 @@ protected Settings indexSettings() { } public void testBasics() throws Exception { + CheckedFunction noDeletesReaderWrapper = Lucene.noDeletesReaderWrapper(); long fromSeqNo = randomNonNegativeLong(); long toSeqNo = randomLongBetween(fromSeqNo, Long.MAX_VALUE); // Empty engine @@ -94,8 +98,8 @@ public void testBasics() throws Exception { toSeqNo = randomLongBetween(fromSeqNo, numOps * 2); Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); - try (Translog.Snapshot snapshot = new LuceneChangesSnapshot( - searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false)) { + try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(noDeletesReaderWrapper.apply(searcher.getDirectoryReader()), + mapperService, searcher, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false)) { searcher = null; assertThat(snapshot, SnapshotMatchers.size(0)); } finally { @@ -103,40 +107,40 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f } searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); - try (Translog.Snapshot snapshot = new LuceneChangesSnapshot( - searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) { + try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(noDeletesReaderWrapper.apply(searcher.getDirectoryReader()), + mapperService, searcher, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) { searcher = null; IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); assertThat(error.getMessage(), containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found")); - }finally { + } finally { IOUtils.close(searcher); } } else { fromSeqNo = randomLongBetween(0, refreshedSeqNo); toSeqNo = randomLongBetween(refreshedSeqNo + 1, numOps * 2); Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); - try (Translog.Snapshot snapshot = new LuceneChangesSnapshot( - searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false)) { + try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(noDeletesReaderWrapper.apply(searcher.getDirectoryReader()), + mapperService, searcher, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false)) { searcher = null; assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, refreshedSeqNo)); } finally { IOUtils.close(searcher); } searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); - try (Translog.Snapshot snapshot = new LuceneChangesSnapshot( - searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) { + try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(noDeletesReaderWrapper.apply(searcher.getDirectoryReader()), + mapperService, searcher, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) { searcher = null; IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); assertThat(error.getMessage(), containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found")); - }finally { + } finally { IOUtils.close(searcher); } toSeqNo = randomLongBetween(fromSeqNo, refreshedSeqNo); searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); - try (Translog.Snapshot snapshot = new LuceneChangesSnapshot( - searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) { + try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(noDeletesReaderWrapper.apply(searcher.getDirectoryReader()), + mapperService, searcher, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) { searcher = null; assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo)); } finally { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index d8efe734fe6da..fb7221236bde2 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -62,7 +62,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -3175,30 +3174,4 @@ public void testSupplyTombstoneDoc() throws Exception { closeShards(shard); } - - public void testSearcherIncludesSoftDeletes() throws Exception { - Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) - .build(); - IndexMetaData metaData = IndexMetaData.builder("test") - .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") - .settings(settings) - .primaryTerm(0, 1).build(); - IndexShard shard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); - recoverShardFromStore(shard); - indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}"); - indexDoc(shard, "test", "1", "{\"foo\" : \"baz\"}"); - deleteDoc(shard, "test", "0"); - shard.refresh("test"); - try (Engine.Searcher searcher = shard.acquireSearcher("test")) { - IndexSearcher searchWithSoftDeletes = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader())); - assertThat(searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10).totalHits, equalTo(0L)); - assertThat(searchWithSoftDeletes.search(new TermQuery(new Term("foo", "bar")), 10).totalHits, equalTo(1L)); - assertThat(searcher.searcher().search(new TermQuery(new Term("foo", "baz")), 10).totalHits, equalTo(1L)); - assertThat(searchWithSoftDeletes.search(new TermQuery(new Term("foo", "baz")), 10).totalHits, equalTo(1L)); - } - closeShards(shard); - } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index b751652c2ab3f..51095e530db68 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -780,6 +780,29 @@ protected void concurrentlyApplyOps(List ops, InternalEngine e } } + protected void applyOperations(List ops, InternalEngine engine) { + for (int i = 0; i < ops.size(); i++) { + try { + final Engine.Operation op = ops.get(i); + if (op instanceof Engine.Index) { + engine.index((Engine.Index) op); + } else if (op instanceof Engine.Delete) { + engine.delete((Engine.Delete) op); + } else { + engine.noOp((Engine.NoOp) op); + } + if ((i + 1) % 4 == 0) { + engine.refresh("test"); + } + if (rarely()) { + engine.flush(); + } + } catch (IOException e) { + throw new AssertionError(e); + } + } + } + /** * Gets all docId from the given engine. */