From fb57e9af58ec0f0fcc0fb185d73046b45b0e7869 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 5 Jul 2018 14:30:07 -0400 Subject: [PATCH 01/13] Retain soft-deleted documents for rollback An operation whose seqno is greater than the global checkpoint is subject to undoing when the primary fails over. If that operation updates or deletes existing documents in Lucene, those documents are also subject to undoing. Thus, we need to retain them during merges until they are no longer subject to rollback. --- .../index/engine/InternalEngine.java | 89 +++++++----- .../index/engine/SoftDeletesPolicy.java | 9 +- .../index/mapper/SeqNoFieldMapper.java | 1 + .../index/engine/InternalEngineTests.java | 131 +++++++++++++++--- 4 files changed, 173 insertions(+), 57 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 99dfb908711e3..85b726d8742a3 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -661,17 +661,28 @@ enum OpVsLuceneDocStatus { LUCENE_DOC_NOT_FOUND } - private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { - assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; + static final class OpVsLuceneLookupResult { final OpVsLuceneDocStatus status; + final long seqNoOfNewerDoc; //seqno of a newer version found in Lucene; otherwise -1 + OpVsLuceneLookupResult(OpVsLuceneDocStatus status, long seqNoOfNewerDoc) { + this.status = status; + this.seqNoOfNewerDoc = seqNoOfNewerDoc; + assert (status == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL && seqNoOfNewerDoc >= 0) || + (status != OpVsLuceneDocStatus.OP_STALE_OR_EQUAL && seqNoOfNewerDoc == -1) : + "status=" + status + " ,seqno_newer_doc=" + seqNoOfNewerDoc; + } + } + + private OpVsLuceneLookupResult compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { + assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; + final OpVsLuceneLookupResult result; VersionValue versionValue = getVersionFromMap(op.uid().bytes()); assert incrementVersionLookup(); if (versionValue != null) { - if (op.seqNo() > versionValue.seqNo || - (op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) - status = OpVsLuceneDocStatus.OP_NEWER; - else { - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + if (op.seqNo() > versionValue.seqNo || (op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) { + result = new OpVsLuceneLookupResult(OpVsLuceneDocStatus.OP_NEWER, -1); + } else { + result = new OpVsLuceneLookupResult(OpVsLuceneDocStatus.OP_STALE_OR_EQUAL, versionValue.seqNo); } } else { // load from index @@ -679,23 +690,23 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) { DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid()); if (docAndSeqNo == null) { - status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; + result = new OpVsLuceneLookupResult(OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, -1); } else if (op.seqNo() > docAndSeqNo.seqNo) { - status = OpVsLuceneDocStatus.OP_NEWER; + result = new OpVsLuceneLookupResult(OpVsLuceneDocStatus.OP_NEWER, -1); } else if (op.seqNo() == docAndSeqNo.seqNo) { // load term to tie break final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo, op.uid().field()); if (op.primaryTerm() > existingTerm) { - status = OpVsLuceneDocStatus.OP_NEWER; + result = new OpVsLuceneLookupResult(OpVsLuceneDocStatus.OP_NEWER, -1); } else { - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + result = new OpVsLuceneLookupResult(OpVsLuceneDocStatus.OP_STALE_OR_EQUAL, docAndSeqNo.seqNo); } } else { - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + result = new OpVsLuceneLookupResult(OpVsLuceneDocStatus.OP_STALE_OR_EQUAL, docAndSeqNo.seqNo); } } } - return status; + return result; } /** resolves the current version of the document, returning null if not found */ @@ -927,11 +938,11 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO // See testRecoveryWithOutOfOrderDelete for an example of peer recovery plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); } else { - final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version()); + final OpVsLuceneLookupResult opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); + if (opVsLucene.status == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { + plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version(), opVsLucene.seqNoOfNewerDoc); } else { - plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, + plan = IndexingStrategy.processNormally(opVsLucene.status == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.seqNo(), index.version()); } } @@ -1000,9 +1011,9 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) index.parsedDoc().version().setLongValue(plan.versionForIndexing); try { if (plan.addStaleOpToLucene) { - addStaleDocs(index.docs(), indexWriter); + addStaleDocs(index.docs(), plan.seqNoOfNewerVersion); } else if (plan.useLuceneUpdateDocument) { - updateDocs(index.uid(), index.docs(), indexWriter); + updateDocs(index.uid(), plan, index.docs()); } else { // document does not exists, we can optimize for create, but double check if assertions are running assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false); @@ -1065,9 +1076,14 @@ private void addDocs(final List docs, final IndexWriter i numDocAppends.inc(docs.size()); } - private void addStaleDocs(final List docs, final IndexWriter indexWriter) throws IOException { + private void addStaleDocs(final List docs, final long seqNoOfNewerVersion) throws IOException { assert softDeleteEnabled : "Add history documents but soft-deletes is disabled"; - docs.forEach(d -> d.add(softDeleteField)); + assert seqNoOfNewerVersion >= 0 : "Invalid newer version; seqno=" + seqNoOfNewerVersion; + NumericDocValuesField updatedBySeqNoField = new NumericDocValuesField(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, seqNoOfNewerVersion); + for (ParseContext.Document doc : docs) { + doc.add(softDeleteField); + doc.add(updatedBySeqNoField); + } if (docs.size() > 1) { indexWriter.addDocuments(docs); } else { @@ -1082,11 +1098,12 @@ protected static final class IndexingStrategy { final long versionForIndexing; final boolean indexIntoLucene; final boolean addStaleOpToLucene; + final long seqNoOfNewerVersion; // the seqno of the newer copy of this _uid if exists; otherwise -1 final Optional earlyResultOnPreFlightError; private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpdateDocument, boolean indexIntoLucene, boolean addStaleOpToLucene, long seqNoForIndexing, - long versionForIndexing, IndexResult earlyResultOnPreFlightError) { + long versionForIndexing, long seqNoOfNewerVersion, IndexResult earlyResultOnPreFlightError) { assert useLuceneUpdateDocument == false || indexIntoLucene : "use lucene update is set to true, but we're not indexing into lucene"; assert (indexIntoLucene && earlyResultOnPreFlightError != null) == false : @@ -1099,39 +1116,41 @@ private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpda this.versionForIndexing = versionForIndexing; this.indexIntoLucene = indexIntoLucene; this.addStaleOpToLucene = addStaleOpToLucene; + this.seqNoOfNewerVersion = seqNoOfNewerVersion; + assert addStaleOpToLucene == false || seqNoOfNewerVersion >= 0 : "stale op [" + seqNoForIndexing + "] with invalid newer seqno"; this.earlyResultOnPreFlightError = earlyResultOnPreFlightError == null ? Optional.empty() : Optional.of(earlyResultOnPreFlightError); } static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) { - return new IndexingStrategy(true, false, true, false, seqNoForIndexing, 1, null); + return new IndexingStrategy(true, false, true, false, seqNoForIndexing, 1, -1, null); } static IndexingStrategy skipDueToVersionConflict( VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) { final IndexResult result = new IndexResult(e, currentVersion); return new IndexingStrategy( - currentNotFoundOrDeleted, false, false, false, SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, result); + currentNotFoundOrDeleted, false, false, false, SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, -1, result); } static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, long seqNoForIndexing, long versionForIndexing) { return new IndexingStrategy(currentNotFoundOrDeleted, currentNotFoundOrDeleted == false, - true, false, seqNoForIndexing, versionForIndexing, null); + true, false, seqNoForIndexing, versionForIndexing, -1, null); } static IndexingStrategy overrideExistingAsIfNotThere( long seqNoForIndexing, long versionForIndexing) { - return new IndexingStrategy(true, true, true, false, seqNoForIndexing, versionForIndexing, null); + return new IndexingStrategy(true, true, true, false, seqNoForIndexing, versionForIndexing, -1, null); } static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing, long versionForIndexing) { - return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, seqNoForIndexing, versionForIndexing, null); + return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, seqNoForIndexing, versionForIndexing, -1, null); } - static IndexingStrategy processAsStaleOp(boolean addStaleOpToLucene, long seqNoForIndexing, long versionForIndexing) { - return new IndexingStrategy(false, false, false, addStaleOpToLucene, seqNoForIndexing, versionForIndexing, null); + static IndexingStrategy processAsStaleOp(boolean addStaleOpToLucene, long seqNoForIndexing, long versionForIndexing, long seqNoOfNewerVersion) { + return new IndexingStrategy(false, false, false, addStaleOpToLucene, seqNoForIndexing, versionForIndexing, seqNoOfNewerVersion, null); } } @@ -1157,12 +1176,13 @@ private boolean assertDocDoesNotExist(final Index index, final boolean allowDele return true; } - private void updateDocs(final Term uid, final List docs, final IndexWriter indexWriter) throws IOException { + private void updateDocs(Term uid, IndexingStrategy plan, List docs) throws IOException { if (softDeleteEnabled) { + NumericDocValuesField updatedBySeqNoField = new NumericDocValuesField(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, plan.seqNoForIndexing); if (docs.size() > 1) { - indexWriter.softUpdateDocuments(uid, docs, softDeleteField); + indexWriter.softUpdateDocuments(uid, docs, softDeleteField, updatedBySeqNoField); } else { - indexWriter.softUpdateDocument(uid, docs.get(0), softDeleteField); + indexWriter.softUpdateDocument(uid, docs.get(0), softDeleteField, updatedBySeqNoField); } } else { if (docs.size() > 1) { @@ -1256,7 +1276,7 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws // See testRecoveryWithOutOfOrderDelete for an example of peer recovery plan = DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version()); } else { - final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); + final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete).status; if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version()); } else { @@ -1314,7 +1334,8 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) if (plan.addStaleOpToLucene || plan.currentlyDeleted) { indexWriter.addDocument(doc); } else { - indexWriter.softUpdateDocument(delete.uid(), doc, softDeleteField); + NumericDocValuesField updatedBySeqNoField = new NumericDocValuesField(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, plan.seqNoOfDeletion); + indexWriter.softUpdateDocument(delete.uid(), doc, softDeleteField, updatedBySeqNoField); } } else if (plan.currentlyDeleted == false) { // any exception that comes from this is a either an ACE or a fatal exception there diff --git a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java index af2ded8c46620..1d2b83dcf8b90 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java @@ -20,6 +20,9 @@ package org.elasticsearch.index.engine; import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.Query; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.index.mapper.SeqNoFieldMapper; @@ -115,6 +118,10 @@ synchronized long getMinRetainedSeqNo() { * Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges. */ Query getRetentionQuery() { - return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinRetainedSeqNo(), Long.MAX_VALUE); + return new BooleanQuery.Builder() + .add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinRetainedSeqNo(), Long.MAX_VALUE), BooleanClause.Occur.SHOULD) + .add(NumericDocValuesField.newSlowRangeQuery(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, + globalCheckpointSupplier.getAsLong() + 1, Long.MAX_VALUE), BooleanClause.Occur.SHOULD) + .build(); } } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java index 5a0db4163bf28..0ec3578221d73 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java @@ -92,6 +92,7 @@ public static SequenceIDFields emptySeqID() { public static final String CONTENT_TYPE = "_seq_no"; public static final String PRIMARY_TERM_NAME = "_primary_term"; public static final String TOMBSTONE_NAME = "_tombstone"; + public static final String UPDATED_BY_SEQNO_NAME = "_*updated_by_seqno"; // TODO: Remove the star after upgrade to Lucene-7.5 snapshot public static class SeqNoDefaults { public static final String NAME = SeqNoFieldMapper.NAME; diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 21f1781cc65a0..7092e20ca9baf 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -53,6 +53,7 @@ import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; import org.apache.lucene.index.TieredMergePolicy; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ReferenceManager; @@ -77,6 +78,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; @@ -139,6 +141,7 @@ import java.util.Base64; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; @@ -178,6 +181,7 @@ import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; @@ -1344,24 +1348,31 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception { final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final MapperService mapperService = createMapperService("test"); - final Set liveDocs = new HashSet<>(); + final Map liveDocs = new HashMap<>(); + final Map updatedBy = new HashMap<>(); try (Store store = createStore(); - InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get))) { + InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), + newMergePolicy(), null, null, globalCheckpoint::get))) { int numDocs = scaledRandomIntBetween(10, 100); for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null); - engine.index(indexForDoc(doc)); - liveDocs.add(doc.id()); + ParsedDocument doc = testParsedDocument(Integer.toString(between(i, i+5)), null, testDocument(), B_1, null); + liveDocs.put(doc.id(), engine.index(indexForDoc(doc)).getSeqNo()); } for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null); + ParsedDocument doc = testParsedDocument(Integer.toString(between(i, i+5)), null, testDocument(), B_1, null); if (randomBoolean()) { - engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get())); + Engine.DeleteResult delete = engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get())); + if (liveDocs.containsKey(doc.id())) { + updatedBy.put(liveDocs.get(doc.id()), delete.getSeqNo()); + } liveDocs.remove(doc.id()); } if (randomBoolean()) { - engine.index(indexForDoc(doc)); - liveDocs.add(doc.id()); + Engine.IndexResult index = engine.index(indexForDoc(doc)); + if (liveDocs.containsKey(doc.id())) { + updatedBy.put(liveDocs.get(doc.id()), index.getSeqNo()); + } + liveDocs.put(doc.id(), index.getSeqNo()); } if (randomBoolean()) { engine.flush(randomBoolean(), true); @@ -1378,20 +1389,36 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception { } engine.forceMerge(true, 1, false, false, false); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); - Map ops = readAllOperationsInLucene(engine, mapperService) - .stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity())); - for (long seqno = 0; seqno <= localCheckpoint; seqno++) { - long minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitCheckpoint + 1); - String msg = "seq# [" + seqno + "], global checkpoint [" + globalCheckpoint + "], retained-ops [" + retainedExtraOps + "]"; - if (seqno < minSeqNoToRetain) { - Translog.Operation op = ops.get(seqno); - if (op != null) { - assertThat(op, instanceOf(Translog.Index.class)); - assertThat(msg, ((Translog.Index) op).id(), isIn(liveDocs)); - assertEquals(msg, ((Translog.Index) op).source(), B_1); + engine.refresh("test"); + long minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitCheckpoint + 1); + try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + DirectoryReader reader = Lucene.wrapAllDocsLive(searcher.getDirectoryReader()); + Set keptSeqNos = new HashSet<>(); + for (LeafReaderContext leaf : reader.leaves()) { + DocIdSetIterator docIterator = new IndexSearcher(reader).createWeight(new MatchAllDocsQuery(), false, 1.0f) + .scorer(leaf).iterator(); + NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + NumericDocValues updatedByDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME); + int doc; + while ((doc = docIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + assertTrue(seqNoDV.advanceExact(doc)); + keptSeqNos.add(seqNoDV.longValue()); + // documents whose seqno < min_seqno_to_retain are not merged away because + // they're live or updated by an op whose seqno is greater than global checkpoint. + if (seqNoDV.longValue() < minSeqNoToRetain && liveDocs.values().contains(seqNoDV.longValue()) == false) { + String msg = "seq_no=" + seqNoDV.longValue() + ",global_checkpoint=" + globalCheckpoint; + assertTrue(msg, updatedByDV.advanceExact(doc)); + assertThat(msg, updatedByDV.longValue(), greaterThan(globalCheckpoint.get())); + } + } + } + for (long seqno = 0; seqno < localCheckpoint; seqno++) { + if (seqno >= minSeqNoToRetain) { + assertThat("seq_no=" + seqno + " required for changes", keptSeqNos, hasItem(seqno)); + } + if (updatedBy.getOrDefault(seqno, Long.MIN_VALUE) > globalCheckpoint.get()) { + assertThat("seq_no=" + seqno + " required for rollback", keptSeqNos, hasItem(seqno)); } - } else { - assertThat(msg, ops.get(seqno), notNullValue()); } } settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0); @@ -4947,6 +4974,66 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { } } + public void testRecordUpdatedBySeqNo() throws Exception { + CheckedFunction, IOException> readUpdatedBySeqNos = (engine) -> { + engine.refresh("test"); + Map updates = new HashMap<>(); + try (Searcher engineSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()); + for (LeafReaderContext leaf : reader.leaves()) { + NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + NumericDocValues updatedByDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME); + DocIdSetIterator iterator = new IndexSearcher(reader).createWeight(new MatchAllDocsQuery(), false, 1.0f) + .scorer(leaf).iterator(); + int doc; + while((doc = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + assertThat(seqNoDV.advanceExact(doc), equalTo(true)); + if (updatedByDV.advanceExact(doc)) { + assertThat(updatedByDV.longValue(), greaterThanOrEqualTo(0L)); + updates.put(seqNoDV.longValue(), updatedByDV.longValue()); + } + } + } + } + return updates; + }; + Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); + final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); + // On replica + AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore(); + InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), + newMergePolicy(), null, null, globalCheckpoint::get))) { + engine.index(replicaIndexForDoc(createParsedDoc("1", null), 1, 0, false)); + engine.index(replicaIndexForDoc(createParsedDoc("1", null), 2, 1, false)); + // IndexWriter allows softUpdateDocuments soft-deleted docs again until it's refreshed. + engine.refresh("test"); + engine.delete(replicaDeleteForDoc("1", 4, 3, threadPool.relativeTimeInMillis())); + engine.index(replicaIndexForDoc(createParsedDoc("1", null), 3, 2, false)); + engine.index(replicaIndexForDoc(createParsedDoc("1", null), 5, 4, false)); + Map seqNos = readUpdatedBySeqNos.apply(engine); + assertThat(seqNos, hasEntry(0L, 1L)); // 0 -> 1 + assertThat(seqNos, hasEntry(1L, 4L)); // 1 -> 3 -> 4 + assertThat(seqNos, hasEntry(2L, 3L)); // 2 -> 3 (stale) + } + // On primary + try (Store store = createStore(); + InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), + newMergePolicy(), null, null, globalCheckpoint::get))) { + engine.index(indexForDoc(createParsedDoc("1", null))); + engine.index(indexForDoc(createParsedDoc("1", null))); + engine.delete(new Engine.Delete("test", "1", newUid("1"), primaryTerm.get())); + engine.index(indexForDoc(createParsedDoc("1", null))); + engine.refresh("test"); + Map seqNos = readUpdatedBySeqNos.apply(engine); + assertThat(seqNos, hasEntry(0L, 2L)); // 0 -> 1 -> 2 + assertThat(seqNos, hasEntry(1L, 2L)); // 1 -> 2 + } + } + private static void trimUnsafeCommits(EngineConfig config) throws IOException { final Store store = config.getStore(); final TranslogConfig translogConfig = config.getTranslogConfig(); From af2899ed00ed4239285459a69555193945c60f4c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 8 Jul 2018 16:29:07 -0400 Subject: [PATCH 02/13] remove fieldname trick --- .../java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java index 0ec3578221d73..9e3842183002c 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java @@ -92,7 +92,7 @@ public static SequenceIDFields emptySeqID() { public static final String CONTENT_TYPE = "_seq_no"; public static final String PRIMARY_TERM_NAME = "_primary_term"; public static final String TOMBSTONE_NAME = "_tombstone"; - public static final String UPDATED_BY_SEQNO_NAME = "_*updated_by_seqno"; // TODO: Remove the star after upgrade to Lucene-7.5 snapshot + public static final String UPDATED_BY_SEQNO_NAME = "_updated_by_seqno"; public static class SeqNoDefaults { public static final String NAME = SeqNoFieldMapper.NAME; From b71fa199091ece7353fc825c9070a0701d1a53fe Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 13 Jul 2018 15:48:08 -0400 Subject: [PATCH 03/13] Only use seqno to resolve out of oder on replica --- .../index/engine/InternalEngine.java | 75 +++++-------------- .../IndexLevelReplicationTests.java | 1 + 2 files changed, 18 insertions(+), 58 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 54ccba8a49124..0767f8ffe48d1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -94,6 +94,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -648,65 +649,25 @@ public GetResult get(Get get, BiFunction search } } - /** - * the status of the current doc version in lucene, compared to the version in an incoming - * operation - */ - enum OpVsLuceneDocStatus { - /** the op is more recent than the one that last modified the doc found in lucene*/ - OP_NEWER, - /** the op is older or the same as the one that last modified the doc found in lucene*/ - OP_STALE_OR_EQUAL, - /** no doc was found in lucene */ - LUCENE_DOC_NOT_FOUND - } - - static final class OpVsLuceneLookupResult { - final OpVsLuceneDocStatus status; - final long seqNoOfNewerDoc; //seqno of a newer version found in Lucene; otherwise -1 - OpVsLuceneLookupResult(OpVsLuceneDocStatus status, long seqNoOfNewerDoc) { - this.status = status; - this.seqNoOfNewerDoc = seqNoOfNewerDoc; - assert (status == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL && seqNoOfNewerDoc >= 0) || - (status != OpVsLuceneDocStatus.OP_STALE_OR_EQUAL && seqNoOfNewerDoc == -1) : - "status=" + status + " ,seqno_newer_doc=" + seqNoOfNewerDoc; - } - } - - private OpVsLuceneLookupResult compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { + /** resolves the last seqno of the document, returning empty if not found */ + private OptionalLong resolveDocSeqNo(final Operation op) throws IOException { + //TODO: Assert that if prev_seqno equals to op's seqno, they should have the same term after rollback is implemented. assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; - final OpVsLuceneLookupResult result; - VersionValue versionValue = getVersionFromMap(op.uid().bytes()); + final VersionValue versionValue = getVersionFromMap(op.uid().bytes()); assert incrementVersionLookup(); if (versionValue != null) { - if (op.seqNo() > versionValue.seqNo || (op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) { - result = new OpVsLuceneLookupResult(OpVsLuceneDocStatus.OP_NEWER, -1); - } else { - result = new OpVsLuceneLookupResult(OpVsLuceneDocStatus.OP_STALE_OR_EQUAL, versionValue.seqNo); - } + return OptionalLong.of(versionValue.seqNo); } else { - // load from index assert incrementIndexVersionLookup(); try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) { - DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid()); + final DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid()); if (docAndSeqNo == null) { - result = new OpVsLuceneLookupResult(OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, -1); - } else if (op.seqNo() > docAndSeqNo.seqNo) { - result = new OpVsLuceneLookupResult(OpVsLuceneDocStatus.OP_NEWER, -1); - } else if (op.seqNo() == docAndSeqNo.seqNo) { - // load term to tie break - final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo, op.uid().field()); - if (op.primaryTerm() > existingTerm) { - result = new OpVsLuceneLookupResult(OpVsLuceneDocStatus.OP_NEWER, -1); - } else { - result = new OpVsLuceneLookupResult(OpVsLuceneDocStatus.OP_STALE_OR_EQUAL, docAndSeqNo.seqNo); - } + return OptionalLong.empty(); } else { - result = new OpVsLuceneLookupResult(OpVsLuceneDocStatus.OP_STALE_OR_EQUAL, docAndSeqNo.seqNo); + return OptionalLong.of(docAndSeqNo.seqNo); } } } - return result; } /** resolves the current version of the document, returning null if not found */ @@ -938,12 +899,11 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO // See testRecoveryWithOutOfOrderDelete for an example of peer recovery plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); } else { - final OpVsLuceneLookupResult opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); - if (opVsLucene.status == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version(), opVsLucene.seqNoOfNewerDoc); + final OptionalLong prevSeqNo = resolveDocSeqNo(index); + if (prevSeqNo.isPresent() == false || prevSeqNo.getAsLong() < index.seqNo()) { + plan = IndexingStrategy.processNormally(prevSeqNo.isPresent() == false, index.seqNo(), index.version()); } else { - plan = IndexingStrategy.processNormally(opVsLucene.status == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, - index.seqNo(), index.version()); + plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version(), prevSeqNo.getAsLong()); } } } @@ -1276,12 +1236,11 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws // See testRecoveryWithOutOfOrderDelete for an example of peer recovery plan = DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version()); } else { - final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete).status; - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version()); + final OptionalLong prevSeqNo = resolveDocSeqNo(delete); + if (prevSeqNo.isPresent() == false || prevSeqNo.getAsLong() < delete.seqNo()) { + plan = DeletionStrategy.processNormally(prevSeqNo.isPresent() == false, delete.seqNo(), delete.version()); } else { - plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, - delete.seqNo(), delete.version()); + plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version()); } } return plan; diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index b4cc033ab40c4..43a89efae9ae2 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -210,6 +210,7 @@ public void testCheckpointsAdvance() throws Exception { } } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/31637") public void testConflictingOpsOnReplica() throws Exception { Map mappings = Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}"); From b2dcacdda299f07f28dbfab6f7f641e6df5c9578 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 17 Jul 2018 12:04:56 -0400 Subject: [PATCH 04/13] feedback --- .../elasticsearch/index/engine/InternalEngine.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0767f8ffe48d1..1bf99c469db9c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -971,7 +971,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) index.parsedDoc().version().setLongValue(plan.versionForIndexing); try { if (plan.addStaleOpToLucene) { - addStaleDocs(index.docs(), plan.seqNoOfNewerVersion); + addStaleDocs(index.docs(), plan.seqNoOfNewerDocIfStale); } else if (plan.useLuceneUpdateDocument) { updateDocs(index.uid(), plan, index.docs()); } else { @@ -1058,26 +1058,27 @@ protected static final class IndexingStrategy { final long versionForIndexing; final boolean indexIntoLucene; final boolean addStaleOpToLucene; - final long seqNoOfNewerVersion; // the seqno of the newer copy of this _uid if exists; otherwise -1 + final long seqNoOfNewerDocIfStale; // the seqno of the newer copy of this _uid if exists; otherwise -1 final Optional earlyResultOnPreFlightError; private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpdateDocument, boolean indexIntoLucene, boolean addStaleOpToLucene, long seqNoForIndexing, - long versionForIndexing, long seqNoOfNewerVersion, IndexResult earlyResultOnPreFlightError) { + long versionForIndexing, long seqNoOfNewerDocIfStale, IndexResult earlyResultOnPreFlightError) { assert useLuceneUpdateDocument == false || indexIntoLucene : "use lucene update is set to true, but we're not indexing into lucene"; assert (indexIntoLucene && earlyResultOnPreFlightError != null) == false : "can only index into lucene or have a preflight result but not both." + "indexIntoLucene: " + indexIntoLucene + " earlyResultOnPreFlightError:" + earlyResultOnPreFlightError; + assert (addStaleOpToLucene && seqNoOfNewerDocIfStale > seqNoForIndexing) || (addStaleOpToLucene == false && seqNoOfNewerDocIfStale == -1) : + "stale=" + addStaleOpToLucene + " ,seqno_for_indexing=" + seqNoForIndexing + " ,seqno_of_newer_doc=" + seqNoOfNewerDocIfStale; this.currentNotFoundOrDeleted = currentNotFoundOrDeleted; this.useLuceneUpdateDocument = useLuceneUpdateDocument; this.seqNoForIndexing = seqNoForIndexing; this.versionForIndexing = versionForIndexing; this.indexIntoLucene = indexIntoLucene; this.addStaleOpToLucene = addStaleOpToLucene; - this.seqNoOfNewerVersion = seqNoOfNewerVersion; - assert addStaleOpToLucene == false || seqNoOfNewerVersion >= 0 : "stale op [" + seqNoForIndexing + "] with invalid newer seqno"; + this.seqNoOfNewerDocIfStale = seqNoOfNewerDocIfStale; this.earlyResultOnPreFlightError = earlyResultOnPreFlightError == null ? Optional.empty() : Optional.of(earlyResultOnPreFlightError); From 59906d8dfbeefa869e3d7170630ad485ba15cbc3 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 17 Jul 2018 12:16:32 -0400 Subject: [PATCH 05/13] add comment to explain the slow range query --- .../java/org/elasticsearch/index/engine/SoftDeletesPolicy.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java index 1d2b83dcf8b90..7d9bd67d2abfa 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java @@ -120,6 +120,7 @@ synchronized long getMinRetainedSeqNo() { Query getRetentionQuery() { return new BooleanQuery.Builder() .add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinRetainedSeqNo(), Long.MAX_VALUE), BooleanClause.Occur.SHOULD) + // Since updated_by_seqno is an updatable DV, we have to do a linear scan to find matches of its range query. .add(NumericDocValuesField.newSlowRangeQuery(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, globalCheckpointSupplier.getAsLong() + 1, Long.MAX_VALUE), BooleanClause.Occur.SHOULD) .build(); From 789147077caa7a5bd3e150c0228122f4f3ccbe08 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 17 Jul 2018 14:27:30 -0400 Subject: [PATCH 06/13] relax the stale_seqno assertion --- .../org/elasticsearch/index/engine/InternalEngine.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1bf99c469db9c..bff95aa03554a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -903,7 +903,9 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO if (prevSeqNo.isPresent() == false || prevSeqNo.getAsLong() < index.seqNo()) { plan = IndexingStrategy.processNormally(prevSeqNo.isPresent() == false, index.seqNo(), index.version()); } else { - plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version(), prevSeqNo.getAsLong()); + final boolean addStaleOpToLucene = this.softDeleteEnabled; + plan = IndexingStrategy.processAsStaleOp(addStaleOpToLucene, index.seqNo(), index.version(), + addStaleOpToLucene ? prevSeqNo.getAsLong() : -1); } } } @@ -1070,8 +1072,9 @@ private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpda "can only index into lucene or have a preflight result but not both." + "indexIntoLucene: " + indexIntoLucene + " earlyResultOnPreFlightError:" + earlyResultOnPreFlightError; - assert (addStaleOpToLucene && seqNoOfNewerDocIfStale > seqNoForIndexing) || (addStaleOpToLucene == false && seqNoOfNewerDocIfStale == -1) : - "stale=" + addStaleOpToLucene + " ,seqno_for_indexing=" + seqNoForIndexing + " ,seqno_of_newer_doc=" + seqNoOfNewerDocIfStale; + // TODO: assert greater (i.e. remove or equals) after rollback is implemented + assert (addStaleOpToLucene && seqNoOfNewerDocIfStale >= seqNoForIndexing) || (addStaleOpToLucene == false && seqNoOfNewerDocIfStale == -1) : + "stale=" + addStaleOpToLucene + ", seqno_for_indexing=" + seqNoForIndexing + ", seqno_of_newer_doc=" + seqNoOfNewerDocIfStale; this.currentNotFoundOrDeleted = currentNotFoundOrDeleted; this.useLuceneUpdateDocument = useLuceneUpdateDocument; this.seqNoForIndexing = seqNoForIndexing; From d6ff966e5ff4d2c44d4d130f75fe5a92d7b492c3 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 18 Jul 2018 16:11:34 -0400 Subject: [PATCH 07/13] add comment to the test --- .../org/elasticsearch/index/engine/InternalEngineTests.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 7092e20ca9baf..c9b465555b167 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5018,6 +5018,8 @@ public void testRecordUpdatedBySeqNo() throws Exception { assertThat(seqNos, hasEntry(0L, 1L)); // 0 -> 1 assertThat(seqNos, hasEntry(1L, 4L)); // 1 -> 3 -> 4 assertThat(seqNos, hasEntry(2L, 3L)); // 2 -> 3 (stale) + assertThat(seqNos, not(hasKey(3L))); // delete does not have updated_by_seqno + assertThat(seqNos.keySet(), hasSize(3)); } // On primary try (Store store = createStore(); @@ -5031,6 +5033,8 @@ public void testRecordUpdatedBySeqNo() throws Exception { Map seqNos = readUpdatedBySeqNos.apply(engine); assertThat(seqNos, hasEntry(0L, 2L)); // 0 -> 1 -> 2 assertThat(seqNos, hasEntry(1L, 2L)); // 1 -> 2 + assertThat(seqNos, not(hasKey(2L))); // delete does not have updated_by_seqno + assertThat(seqNos.keySet(), hasSize(2)); } } From 4c36ee2bb0220486b89439cee20a60014b60cfab Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 23 Jul 2018 16:38:40 -0400 Subject: [PATCH 08/13] add updated_by_seqno for stale deletes --- .../index/engine/InternalEngine.java | 33 ++++++++++++------- .../index/engine/InternalEngineTests.java | 12 ++++--- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0deb5a5446a00..6b0aecd199c54 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1055,8 +1055,8 @@ private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpda "indexIntoLucene: " + indexIntoLucene + " earlyResultOnPreFlightError:" + earlyResultOnPreFlightError; // TODO: assert greater (i.e. remove or equals) after rollback is implemented - assert (addStaleOpToLucene && seqNoOfNewerDocIfStale >= seqNoForIndexing) || (addStaleOpToLucene == false && seqNoOfNewerDocIfStale == -1) : - "stale=" + addStaleOpToLucene + ", seqno_for_indexing=" + seqNoForIndexing + ", seqno_of_newer_doc=" + seqNoOfNewerDocIfStale; + assert addStaleOpToLucene == false || seqNoOfNewerDocIfStale >= seqNoForIndexing : + "stale=" + addStaleOpToLucene + " seqno_for_indexing=" + seqNoForIndexing + " seqno_of_newer_doc=" + seqNoOfNewerDocIfStale; this.currentNotFoundOrDeleted = currentNotFoundOrDeleted; this.useLuceneUpdateDocument = useLuceneUpdateDocument; this.seqNoForIndexing = seqNoForIndexing; @@ -1221,7 +1221,9 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws if (prevSeqNo.isPresent() == false || prevSeqNo.getAsLong() < delete.seqNo()) { plan = DeletionStrategy.processNormally(prevSeqNo.isPresent() == false, delete.seqNo(), delete.version()); } else { - plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version()); + boolean addStaleOpToLucene = this.softDeleteEnabled; + plan = DeletionStrategy.processAsStaleOp(addStaleOpToLucene, false, delete.seqNo(), delete.version(), + addStaleOpToLucene ? prevSeqNo.getAsLong() : -1); } } return plan; @@ -1271,6 +1273,9 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : "Delete tombstone document but _tombstone field is not set [" + doc + " ]"; doc.add(softDeleteField); + if (plan.addStaleOpToLucene) { + doc.add(new NumericDocValuesField(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, plan.seqNoOfNewerDocIfStale)); + } if (plan.addStaleOpToLucene || plan.currentlyDeleted) { indexWriter.addDocument(doc); } else { @@ -1308,20 +1313,24 @@ protected static final class DeletionStrategy { final boolean currentlyDeleted; final long seqNoOfDeletion; final long versionOfDeletion; + final long seqNoOfNewerDocIfStale; // the seqno of the newer copy of this _uid if exists; otherwise -1 final Optional earlyResultOnPreflightError; - private DeletionStrategy(boolean deleteFromLucene, boolean addStaleOpToLucene, boolean currentlyDeleted, - long seqNoOfDeletion, long versionOfDeletion, - DeleteResult earlyResultOnPreflightError) { + private DeletionStrategy(boolean deleteFromLucene, boolean addStaleOpToLucene, boolean currentlyDeleted, long seqNoOfDeletion, + long versionOfDeletion, long seqNoOfNewerDocIfStale, DeleteResult earlyResultOnPreflightError) { assert (deleteFromLucene && earlyResultOnPreflightError != null) == false : "can only delete from lucene or have a preflight result but not both." + "deleteFromLucene: " + deleteFromLucene + " earlyResultOnPreFlightError:" + earlyResultOnPreflightError; + // TODO: assert greater (i.e. remove or equals) after rollback is implemented + assert addStaleOpToLucene == false || seqNoOfNewerDocIfStale >= seqNoOfDeletion : + "stale=" + addStaleOpToLucene + " seqno_for_deletion=" + seqNoOfDeletion + " seqno_of_newer_doc=" + seqNoOfNewerDocIfStale; this.deleteFromLucene = deleteFromLucene; this.addStaleOpToLucene = addStaleOpToLucene; this.currentlyDeleted = currentlyDeleted; this.seqNoOfDeletion = seqNoOfDeletion; this.versionOfDeletion = versionOfDeletion; + this.seqNoOfNewerDocIfStale = seqNoOfNewerDocIfStale; this.earlyResultOnPreflightError = earlyResultOnPreflightError == null ? Optional.empty() : Optional.of(earlyResultOnPreflightError); } @@ -1330,22 +1339,22 @@ static DeletionStrategy skipDueToVersionConflict( VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) { final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; final DeleteResult deleteResult = new DeleteResult(e, currentVersion, unassignedSeqNo, currentlyDeleted == false); - return new DeletionStrategy(false, false, currentlyDeleted, unassignedSeqNo, Versions.NOT_FOUND, deleteResult); + return new DeletionStrategy(false, false, currentlyDeleted, unassignedSeqNo, Versions.NOT_FOUND, -1, deleteResult); } static DeletionStrategy processNormally(boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion) { - return new DeletionStrategy(true, false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null); + return new DeletionStrategy(true, false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, -1, null); } public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion) { - return new DeletionStrategy(false, false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null); + return new DeletionStrategy(false, false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, -1, null); } - static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, boolean currentlyDeleted, - long seqNoOfDeletion, long versionOfDeletion) { - return new DeletionStrategy(false, addStaleOpToLucene, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null); + static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, boolean currentlyDeleted, long seqNoOfDeletion, + long versionOfDeletion, long seqNoOfNewerDocIfStale) { + return new DeletionStrategy(false, addStaleOpToLucene, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, seqNoOfNewerDocIfStale, null); } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 84eed11a89014..7d7677158def2 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -127,6 +127,7 @@ import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; @@ -5013,13 +5014,16 @@ public void testRecordUpdatedBySeqNo() throws Exception { engine.refresh("test"); engine.delete(replicaDeleteForDoc("1", 4, 3, threadPool.relativeTimeInMillis())); engine.index(replicaIndexForDoc(createParsedDoc("1", null), 3, 2, false)); - engine.index(replicaIndexForDoc(createParsedDoc("1", null), 5, 4, false)); + engine.index(replicaIndexForDoc(createParsedDoc("1", null), 6, 5, false)); + engine.delete(replicaDeleteForDoc("1", 5, 4, threadPool.relativeTimeInMillis())); Map seqNos = readUpdatedBySeqNos.apply(engine); assertThat(seqNos, hasEntry(0L, 1L)); // 0 -> 1 - assertThat(seqNos, hasEntry(1L, 4L)); // 1 -> 3 -> 4 + assertThat(seqNos, hasEntry(1L, 5L)); // 1 -> 3 -> 5 assertThat(seqNos, hasEntry(2L, 3L)); // 2 -> 3 (stale) - assertThat(seqNos, not(hasKey(3L))); // delete does not have updated_by_seqno - assertThat(seqNos.keySet(), hasSize(3)); + assertThat(seqNos, not(hasKey(3L))); // regular delete tombstone does not have updated_by_seqno + assertThat(seqNos, hasEntry(4L, 5L)); // stale delete tombstone has updated_by_seqno + assertThat(seqNos, not(hasKey(5L))); // a live index + assertThat(seqNos.keySet(), hasSize(4)); } // On primary try (Store store = createStore(); From 29d5f71fdf2edca57c47a67a95c8ede4538758d0 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 23 Jul 2018 18:03:36 -0400 Subject: [PATCH 09/13] remove unused imports --- .../java/org/elasticsearch/index/engine/InternalEngineTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 7d7677158def2..38916322263c4 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -127,7 +127,6 @@ import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.IndexSettingsModule; -import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; From 326c51fab81d6e6fb42dfc71c05b99020adf4d48 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 25 Jul 2018 18:17:49 -0400 Subject: [PATCH 10/13] reindex tombstone --- .../index/engine/InternalEngine.java | 125 +++++++++++------- .../index/engine/InternalEngineTests.java | 45 ++++--- 2 files changed, 108 insertions(+), 62 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 6b0aecd199c54..bebbe35e9ea6d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -87,6 +87,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -94,7 +95,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -649,22 +649,35 @@ public GetResult get(Get get, BiFunction search } } - /** resolves the last seqno of the document, returning empty if not found */ - private OptionalLong resolveDocSeqNo(final Operation op) throws IOException { + private static final class SeqNoAndVersionValue { + final long seqNo; + final VersionValue versionValue; + SeqNoAndVersionValue(long seqNo, VersionValue versionValue) { + assert versionValue == null || versionValue.seqNo == seqNo : "version_value=" + versionValue + ", seqno=" + seqNo; + this.seqNo = seqNo; + this.versionValue = versionValue; + } + boolean isDelete() { + return versionValue != null && versionValue.isDelete(); + } + } + + /** resolves the last seqno and version on the version map (if exists) of the document, returning empty if not found */ + private SeqNoAndVersionValue resolveDocSeqNo(final Operation op) throws IOException { //TODO: Assert that if prev_seqno equals to op's seqno, they should have the same term after rollback is implemented. assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; final VersionValue versionValue = getVersionFromMap(op.uid().bytes()); assert incrementVersionLookup(); if (versionValue != null) { - return OptionalLong.of(versionValue.seqNo); + return new SeqNoAndVersionValue(versionValue.seqNo, versionValue); } else { assert incrementIndexVersionLookup(); try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) { final DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid()); if (docAndSeqNo == null) { - return OptionalLong.empty(); + return null; } else { - return OptionalLong.of(docAndSeqNo.seqNo); + return new SeqNoAndVersionValue(docAndSeqNo.seqNo, null); } } } @@ -881,13 +894,13 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO // See testRecoveryWithOutOfOrderDelete for an example of peer recovery plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); } else { - final OptionalLong prevSeqNo = resolveDocSeqNo(index); - if (prevSeqNo.isPresent() == false || prevSeqNo.getAsLong() < index.seqNo()) { - plan = IndexingStrategy.processNormally(prevSeqNo.isPresent() == false, index.seqNo(), index.version()); + final SeqNoAndVersionValue currentVersion = resolveDocSeqNo(index); + if (currentVersion == null || currentVersion.seqNo < index.seqNo()) { + plan = IndexingStrategy.processNormally(currentVersion == null, index.seqNo(), index.version(), currentVersion); } else { final boolean addStaleOpToLucene = this.softDeleteEnabled; plan = IndexingStrategy.processAsStaleOp(addStaleOpToLucene, index.seqNo(), index.version(), - addStaleOpToLucene ? prevSeqNo.getAsLong() : -1); + addStaleOpToLucene ? currentVersion : null); } } } @@ -935,8 +948,8 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc } else { plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted, generateSeqNoForOperation(index), - index.versionType().updateVersion(currentVersion, index.version()) - ); + index.versionType().updateVersion(currentVersion, index.version()), + versionValue == null ? null : new SeqNoAndVersionValue(versionValue.seqNo, versionValue)); } } return plan; @@ -955,13 +968,28 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) index.parsedDoc().version().setLongValue(plan.versionForIndexing); try { if (plan.addStaleOpToLucene) { - addStaleDocs(index.docs(), plan.seqNoOfNewerDocIfStale); + addStaleDocs(index.docs(), plan.currentVersionValue.seqNo); } else if (plan.useLuceneUpdateDocument) { updateDocs(index.uid(), plan, index.docs()); } else { // document does not exists, we can optimize for create, but double check if assertions are running assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false); - addDocs(index.docs(), indexWriter); + final List docs; + if (softDeleteEnabled && plan.currentVersionValue != null && plan.currentVersionValue.isDelete()) { + docs = new ArrayList<>(index.docs().size() + 1); + docs.addAll(index.docs()); + // Since the updated_by_seqno of a delete tombstone is never updated, we need to reindex another tombstone for the + // previous delete operation with updated_by_seqno equalling to seqno of this index op. This new tombstone ensures + // that we will always have a tombstone of the previous delete operation if we have to roll back this index operation. + final VersionValue currentVersion = plan.currentVersionValue.versionValue; + ParseContext.Document tombstone = createDeleteTombstone( + index.id(), index.type(), currentVersion.seqNo, currentVersion.term, currentVersion.version).docs().get(0); + tombstone.add(new NumericDocValuesField(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, plan.seqNoForIndexing)); + docs.add(tombstone); + } else { + docs = index.docs(); + } + addDocs(docs, indexWriter); } return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } catch (Exception ex) { @@ -1042,12 +1070,12 @@ protected static final class IndexingStrategy { final long versionForIndexing; final boolean indexIntoLucene; final boolean addStaleOpToLucene; - final long seqNoOfNewerDocIfStale; // the seqno of the newer copy of this _uid if exists; otherwise -1 + final SeqNoAndVersionValue currentVersionValue; final Optional earlyResultOnPreFlightError; private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpdateDocument, - boolean indexIntoLucene, boolean addStaleOpToLucene, long seqNoForIndexing, - long versionForIndexing, long seqNoOfNewerDocIfStale, IndexResult earlyResultOnPreFlightError) { + boolean indexIntoLucene, boolean addStaleOpToLucene, long seqNoForIndexing, long versionForIndexing, + SeqNoAndVersionValue currentVersionValue, IndexResult earlyResultOnPreFlightError) { assert useLuceneUpdateDocument == false || indexIntoLucene : "use lucene update is set to true, but we're not indexing into lucene"; assert (indexIntoLucene && earlyResultOnPreFlightError != null) == false : @@ -1055,48 +1083,49 @@ private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpda "indexIntoLucene: " + indexIntoLucene + " earlyResultOnPreFlightError:" + earlyResultOnPreFlightError; // TODO: assert greater (i.e. remove or equals) after rollback is implemented - assert addStaleOpToLucene == false || seqNoOfNewerDocIfStale >= seqNoForIndexing : - "stale=" + addStaleOpToLucene + " seqno_for_indexing=" + seqNoForIndexing + " seqno_of_newer_doc=" + seqNoOfNewerDocIfStale; + assert addStaleOpToLucene == false || currentVersionValue.seqNo >= seqNoForIndexing : + "stale=" + addStaleOpToLucene + " seqno_for_indexing=" + seqNoForIndexing + " current_seqno=" + currentVersionValue.seqNo; this.currentNotFoundOrDeleted = currentNotFoundOrDeleted; this.useLuceneUpdateDocument = useLuceneUpdateDocument; this.seqNoForIndexing = seqNoForIndexing; this.versionForIndexing = versionForIndexing; this.indexIntoLucene = indexIntoLucene; this.addStaleOpToLucene = addStaleOpToLucene; - this.seqNoOfNewerDocIfStale = seqNoOfNewerDocIfStale; + this.currentVersionValue = currentVersionValue; this.earlyResultOnPreFlightError = earlyResultOnPreFlightError == null ? Optional.empty() : Optional.of(earlyResultOnPreFlightError); } static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) { - return new IndexingStrategy(true, false, true, false, seqNoForIndexing, 1, -1, null); + return new IndexingStrategy(true, false, true, false, seqNoForIndexing, 1, null, null); } static IndexingStrategy skipDueToVersionConflict( VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) { final IndexResult result = new IndexResult(e, currentVersion); return new IndexingStrategy( - currentNotFoundOrDeleted, false, false, false, SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, -1, result); + currentNotFoundOrDeleted, false, false, false, SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, null, result); } - static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, - long seqNoForIndexing, long versionForIndexing) { - return new IndexingStrategy(currentNotFoundOrDeleted, currentNotFoundOrDeleted == false, - true, false, seqNoForIndexing, versionForIndexing, -1, null); + static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, long seqNoForIndexing, + long versionForIndexing, SeqNoAndVersionValue currentVersionValue) { + return new IndexingStrategy(currentNotFoundOrDeleted, currentVersionValue != null && currentVersionValue.isDelete() == false, + true, false, seqNoForIndexing, versionForIndexing, currentVersionValue, null); } static IndexingStrategy overrideExistingAsIfNotThere( long seqNoForIndexing, long versionForIndexing) { - return new IndexingStrategy(true, true, true, false, seqNoForIndexing, versionForIndexing, -1, null); + return new IndexingStrategy(true, true, true, false, seqNoForIndexing, versionForIndexing, null, null); } static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing, long versionForIndexing) { - return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, seqNoForIndexing, versionForIndexing, -1, null); + return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, seqNoForIndexing, versionForIndexing, null, null); } - static IndexingStrategy processAsStaleOp(boolean addStaleOpToLucene, long seqNoForIndexing, long versionForIndexing, long seqNoOfNewerVersion) { - return new IndexingStrategy(false, false, false, addStaleOpToLucene, seqNoForIndexing, versionForIndexing, seqNoOfNewerVersion, null); + static IndexingStrategy processAsStaleOp(boolean addStaleOpToLucene, long seqNoForIndexing, + long versionForIndexing, SeqNoAndVersionValue currentVersionValue) { + return new IndexingStrategy(false, false, false, addStaleOpToLucene, seqNoForIndexing, versionForIndexing, currentVersionValue, null); } } @@ -1217,13 +1246,13 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws // See testRecoveryWithOutOfOrderDelete for an example of peer recovery plan = DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version()); } else { - final OptionalLong prevSeqNo = resolveDocSeqNo(delete); - if (prevSeqNo.isPresent() == false || prevSeqNo.getAsLong() < delete.seqNo()) { - plan = DeletionStrategy.processNormally(prevSeqNo.isPresent() == false, delete.seqNo(), delete.version()); + final SeqNoAndVersionValue currentVersion = resolveDocSeqNo(delete); + if (currentVersion == null || currentVersion.seqNo < delete.seqNo()) { + plan = DeletionStrategy.processNormally(currentVersion == null, delete.seqNo(), delete.version()); } else { boolean addStaleOpToLucene = this.softDeleteEnabled; plan = DeletionStrategy.processAsStaleOp(addStaleOpToLucene, false, delete.seqNo(), delete.version(), - addStaleOpToLucene ? prevSeqNo.getAsLong() : -1); + addStaleOpToLucene ? currentVersion.seqNo : -1); } } return plan; @@ -1261,26 +1290,32 @@ protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOE return plan; } + private ParsedDocument createDeleteTombstone(String id, String type, long seqNo, long primaryTerm, long version) { + final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(type, id); + assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]"; + tombstone.updateSeqID(seqNo, primaryTerm); + tombstone.version().setLongValue(version); + final ParseContext.Document doc = tombstone.docs().get(0); + assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : + "Delete tombstone document but _tombstone field is not set [" + doc + " ]"; + doc.add(softDeleteField); + return tombstone; + } + private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws IOException { try { if (softDeleteEnabled) { - final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id()); - assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]"; - tombstone.updateSeqID(plan.seqNoOfDeletion, delete.primaryTerm()); - tombstone.version().setLongValue(plan.versionOfDeletion); - final ParseContext.Document doc = tombstone.docs().get(0); - assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : - "Delete tombstone document but _tombstone field is not set [" + doc + " ]"; - doc.add(softDeleteField); + final ParseContext.Document tombstone = createDeleteTombstone( + delete.id(), delete.type(), plan.seqNoOfDeletion, delete.primaryTerm(), plan.versionOfDeletion).docs().get(0); if (plan.addStaleOpToLucene) { - doc.add(new NumericDocValuesField(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, plan.seqNoOfNewerDocIfStale)); + tombstone.add(new NumericDocValuesField(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, plan.seqNoOfNewerDocIfStale)); } if (plan.addStaleOpToLucene || plan.currentlyDeleted) { - indexWriter.addDocument(doc); + indexWriter.addDocument(tombstone); } else { NumericDocValuesField updatedBySeqNoField = new NumericDocValuesField(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, plan.seqNoOfDeletion); - indexWriter.softUpdateDocument(delete.uid(), doc, softDeleteField, updatedBySeqNoField); + indexWriter.softUpdateDocument(delete.uid(), tombstone, softDeleteField, updatedBySeqNoField); } } else if (plan.currentlyDeleted == false) { // any exception that comes from this is a either an ACE or a fatal exception there diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 38916322263c4..ad4302da425f5 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -181,7 +181,6 @@ import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; @@ -4975,9 +4974,9 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { } public void testRecordUpdatedBySeqNo() throws Exception { - CheckedFunction, IOException> readUpdatedBySeqNos = (engine) -> { + CheckedFunction>, IOException> readUpdatedBySeqNos = (engine) -> { engine.refresh("test"); - Map updates = new HashMap<>(); + List> updates = new ArrayList<>(); try (Searcher engineSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()); for (LeafReaderContext leaf : reader.leaves()) { @@ -4990,7 +4989,9 @@ public void testRecordUpdatedBySeqNo() throws Exception { assertThat(seqNoDV.advanceExact(doc), equalTo(true)); if (updatedByDV.advanceExact(doc)) { assertThat(updatedByDV.longValue(), greaterThanOrEqualTo(0L)); - updates.put(seqNoDV.longValue(), updatedByDV.longValue()); + updates.add(Tuple.tuple(seqNoDV.longValue(), updatedByDV.longValue())); + }else { + updates.add(Tuple.tuple(seqNoDV.longValue(), null)); } } } @@ -5015,14 +5016,17 @@ public void testRecordUpdatedBySeqNo() throws Exception { engine.index(replicaIndexForDoc(createParsedDoc("1", null), 3, 2, false)); engine.index(replicaIndexForDoc(createParsedDoc("1", null), 6, 5, false)); engine.delete(replicaDeleteForDoc("1", 5, 4, threadPool.relativeTimeInMillis())); - Map seqNos = readUpdatedBySeqNos.apply(engine); - assertThat(seqNos, hasEntry(0L, 1L)); // 0 -> 1 - assertThat(seqNos, hasEntry(1L, 5L)); // 1 -> 3 -> 5 - assertThat(seqNos, hasEntry(2L, 3L)); // 2 -> 3 (stale) - assertThat(seqNos, not(hasKey(3L))); // regular delete tombstone does not have updated_by_seqno - assertThat(seqNos, hasEntry(4L, 5L)); // stale delete tombstone has updated_by_seqno - assertThat(seqNos, not(hasKey(5L))); // a live index - assertThat(seqNos.keySet(), hasSize(4)); + List> seqNos = readUpdatedBySeqNos.apply(engine); + // index#0 -> index#1 -> delete#3 -> index#2 -> index#5 -> delete#4 + assertThat(seqNos, containsInAnyOrder( + Tuple.tuple(0L, 1L), // 0 -> 1 - seq#0 should be updated once because of refresh + Tuple.tuple(1L, 3L), // 1 -> 5 (deleted by #3, then updated by #5) + Tuple.tuple(2L, 3L), // index#2 is stale by delete#3, thus should never be updated again. + Tuple.tuple(3L, null), // the first delete tombstone without updated_by_seqno + Tuple.tuple(3L, 5L), // the second delete tombstone without updated_by_seqno + Tuple.tuple(4L, 5L), // delete#4 is stale, thus having updated_by_seqno=5 + Tuple.tuple(5L, null) // still alive + )); } // On primary try (Store store = createStore(); @@ -5030,14 +5034,21 @@ public void testRecordUpdatedBySeqNo() throws Exception { newMergePolicy(), null, null, globalCheckpoint::get))) { engine.index(indexForDoc(createParsedDoc("1", null))); engine.index(indexForDoc(createParsedDoc("1", null))); + engine.refresh("test"); engine.delete(new Engine.Delete("test", "1", newUid("1"), primaryTerm.get())); engine.index(indexForDoc(createParsedDoc("1", null))); + engine.delete(new Engine.Delete("test", "1", newUid("1"), primaryTerm.get())); engine.refresh("test"); - Map seqNos = readUpdatedBySeqNos.apply(engine); - assertThat(seqNos, hasEntry(0L, 2L)); // 0 -> 1 -> 2 - assertThat(seqNos, hasEntry(1L, 2L)); // 1 -> 2 - assertThat(seqNos, not(hasKey(2L))); // delete does not have updated_by_seqno - assertThat(seqNos.keySet(), hasSize(2)); + List> seqNos = readUpdatedBySeqNos.apply(engine); + // index#0 -> index#1 -> delete#2 -> index#3 -> delete#4 + assertThat(seqNos, containsInAnyOrder( + Tuple.tuple(0L, 1L), // index#0 -> index#1 -> refresh + Tuple.tuple(1L, 4L), // index#1 -> delete#2 -> index#3 -> delete#4 + Tuple.tuple(2L, null), // the first delete tombstone without updated_by_seqno + Tuple.tuple(2L, 3L), // the second delete tombstone with updated_by_seqno + Tuple.tuple(3L, 4L), // index#3 -> delete#4 + Tuple.tuple(4L, null) // still alive + )); } } From bed2f523b3fd862be259286aabc82cef7d81d5c5 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 25 Jul 2018 23:13:03 -0400 Subject: [PATCH 11/13] relax tests --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +- .../elasticsearch/index/engine/LuceneChangesSnapshotTests.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index bebbe35e9ea6d..0aad58d69e1ea 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1110,7 +1110,7 @@ static IndexingStrategy skipDueToVersionConflict( static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, long seqNoForIndexing, long versionForIndexing, SeqNoAndVersionValue currentVersionValue) { - return new IndexingStrategy(currentNotFoundOrDeleted, currentVersionValue != null && currentVersionValue.isDelete() == false, + return new IndexingStrategy(currentNotFoundOrDeleted, currentNotFoundOrDeleted == false, true, false, seqNoForIndexing, versionForIndexing, currentVersionValue, null); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index 7de086a3be239..cb843043a1b9b 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -42,6 +42,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class LuceneChangesSnapshotTests extends EngineTestCase { private MapperService mapperService; @@ -187,7 +188,7 @@ public void testDedupByPrimaryTerm() throws Exception { while ((op = snapshot.next()) != null) { assertThat(op.toString(), op.primaryTerm(), equalTo(latestOperations.get(op.seqNo()))); } - assertThat(snapshot.skippedOperations(), equalTo(totalOps - latestOperations.size())); + assertThat(snapshot.skippedOperations(), greaterThanOrEqualTo(totalOps - latestOperations.size())); } } From f6487e11f66c141f618a1d4cf79f6522429e0878 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 31 Jul 2018 11:44:34 -0400 Subject: [PATCH 12/13] simulate merge --- .../index/engine/InternalEngineTests.java | 188 ++++++++++++++++++ .../index/engine/EngineTestCase.java | 14 ++ 2 files changed, 202 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index a64f970c81063..091c7531e0168 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -30,6 +30,7 @@ import java.util.Base64; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; @@ -68,6 +69,7 @@ import org.apache.lucene.document.StoredField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; @@ -82,15 +84,23 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.ReaderUtil; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; import org.apache.lucene.index.TieredMergePolicy; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; import org.apache.lucene.search.SortedSetSortField; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; @@ -102,6 +112,7 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; @@ -129,6 +140,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; @@ -160,6 +172,7 @@ import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; @@ -5054,6 +5067,181 @@ public void testRecordUpdatedBySeqNo() throws Exception { } } + @TestLogging("_ROOT:DEBUG") + public void testKeepDocsForRollback() throws Exception { + IOUtils.close(engine, store); + threadPool = spy(threadPool); + AtomicLong clockTime = new AtomicLong(); + when(threadPool.relativeTimeInMillis()).thenAnswer(i -> clockTime.incrementAndGet()); + List operations = new ArrayList<>(); + int numOps = scaledRandomIntBetween(10, 200); + for (int seqNo = 0; seqNo < numOps; seqNo++) { + String id = Integer.toString(between(1, 5)); + if (randomBoolean()) { + ParsedDocument parseDoc = createParsedDoc(id, null); + operations.add(new Engine.Index(newUid(parseDoc), parseDoc, seqNo, primaryTerm.get(), 1, null, + REPLICA, between(1, 10), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, randomBoolean())); + } else { + operations.add(new Engine.Delete("test", id, newUid(id), seqNo, primaryTerm.get(), 1, null, + REPLICA, between(1, 10))); + } + } + Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), TimeValue.timeValueMillis(between(0, 100)).getStringRep()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); + final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); + realisticShuffleOperations(operations); + long globalCheckpoint = SequenceNumbers.NO_OPS_PERFORMED; + Map processedOps = new HashMap<>(); + SetOnce indexWriter = new SetOnce<>(); + IndexWriterFactory indexWriterFactory = (iwc, dir) -> { + indexWriter.set(new IndexWriter(iwc, dir)); + return indexWriter.get(); + }; + Set lastTombstones = Collections.emptySet(); + try (Store store = createStore(); + InternalEngine engine = createEngine(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, indexWriterFactory)) { + for (Engine.Operation op : operations) { + Set tombstones = engine.getDeletedTombstones().stream().map(del -> del.seqNo).collect(Collectors.toSet()); + if (op instanceof Engine.Index) { + logger.debug("index id={} seq={} gcp={} tombstones={}", op.id(), op.seqNo(), globalCheckpoint, tombstones); + engine.index((Engine.Index) op); + } else if (op instanceof Engine.Delete) { + logger.debug("delete id={} seq={} gcp={} tombstones={}", op.id(), op.seqNo(), globalCheckpoint, tombstones); + engine.delete((Engine.Delete) op); + } + processedOps.put(op.seqNo(), op); + if (between(1, 20) == 1) { + assertDocumentsForRollback(engine, globalCheckpoint, processedOps); + } + if (between(1, 5) == 1) { + engine.maybePruneDeletes(); + } + if (between(1, 20) == 1) { + BooleanQuery retentionQuery = new BooleanQuery.Builder() + .add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, globalCheckpoint + 1, Long.MAX_VALUE), + BooleanClause.Occur.SHOULD) + .add(NumericDocValuesField.newSlowRangeQuery(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, globalCheckpoint + 1, Long.MAX_VALUE), + BooleanClause.Occur.SHOULD) + .build(); + List reclaimedOps = simulateMerge(engine, indexWriter.get(), retentionQuery) + .stream().map(processedOps::get).collect(Collectors.toList()); + for (Engine.Operation reclaimedOp : reclaimedOps) { + logger.debug("merge reclaim id={} seq={}", reclaimedOp.id(), reclaimedOp.seqNo()); + } + } + globalCheckpoint = randomLongBetween(globalCheckpoint, engine.getLocalCheckpoint()); + Set prunedTombstone = Sets.difference(lastTombstones, tombstones); + for (long prunedSeqNo : prunedTombstone) { + logger.debug("prune tombstone id={} seq={}", processedOps.get(prunedSeqNo).id(), prunedSeqNo); + } + lastTombstones = tombstones; + } + assertDocumentsForRollback(engine, globalCheckpoint, processedOps); + } + } + + /** + * Here we simulate Lucene merges for these purposes: + * - The simulation can randomly reclaim a subset of reclaimable operations instead of all docs like the actual merges + * - The simulation is more deterministic than the actual merge and can return the operations have been reclaimed. + * + * @param retentionQuery deleted documents matching this query won't be reclaimed (see {@link SoftDeletesPolicy#getRetentionQuery()} + * @return a list of operations have been reclaimed + */ + private List simulateMerge(InternalEngine engine, IndexWriter indexWriter, Query retentionQuery) throws IOException { + try (Searcher engineSearcher = engine.acquireSearcher("simulate-merge", Engine.SearcherScope.INTERNAL)) { + IndexSearcher searcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader())); + BooleanQuery reclaimQuery = new BooleanQuery.Builder() + .add(new DocValuesFieldExistsQuery(Lucene.SOFT_DELETE_FIELD), BooleanClause.Occur.MUST) + .add(retentionQuery, BooleanClause.Occur.MUST_NOT).build(); + TopDocs reclaimableDocs = searcher.search(reclaimQuery, Integer.MAX_VALUE); + if (reclaimableDocs.scoreDocs.length == 0) { + return Collections.emptyList(); + } + List docsToReclaim = randomSubsetOf(Arrays.asList(reclaimableDocs.scoreDocs)); + DirectoryReader inReader = engineSearcher.getDirectoryReader(); + while (inReader instanceof FilterDirectoryReader) { + inReader = ((FilterDirectoryReader) inReader).getDelegate(); + } + List reclaimedOps = new ArrayList<>(); + for (ScoreDoc docToReclaim : docsToReclaim) { + if (indexWriter.tryDeleteDocument(inReader, docToReclaim.doc) != -1) { + reclaimedOps.add(readSeqNo(inReader, docToReclaim.doc)); + } + } + return reclaimedOps; + } + } + + /** + * This assertion asserts that the previous copy of every operation after the global_checkpoint is retained for rollback: + * 1. If the previous copy is an index, that copy must be retained + * 2. If the previous copy is a delete, either that copy or another delete or nothing is retained, but must not an index + */ + private void assertDocumentsForRollback(InternalEngine engine, long globalCheckpoint, + Map processedOps) throws IOException { + List rollbackOps = processedOps.values().stream() + .filter(op -> op.seqNo() > globalCheckpoint).collect(Collectors.toList()); + Map previousCopies = new HashMap<>(); + for (Engine.Operation op : rollbackOps) { + processedOps.values().stream().filter(target -> target.seqNo() < op.seqNo() && target.id().equals(op.id())) + .forEach(target -> { + previousCopies.compute(op.seqNo(), (k, v) -> { + if (v == null || v.seqNo() < target.seqNo()) return target; + else return v; + }); + }); + } + engine.refresh("test", Engine.SearcherScope.INTERNAL); + try (Searcher engineSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()); + IndexSearcher searcher = new IndexSearcher(reader); + searcher.setQueryCache(null); + for (Engine.Operation rollbackOp : rollbackOps) { + Engine.Operation previousCopy = previousCopies.get(rollbackOp.seqNo()); + if (previousCopy == null) { + continue; + } + BooleanQuery previousQuery = new BooleanQuery.Builder() + .add(new TermQuery(rollbackOp.uid()), BooleanClause.Occur.FILTER) + .add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, 0, rollbackOp.seqNo() - 1), BooleanClause.Occur.FILTER) + .build(); + TopDocs previousDocs = searcher.search(previousQuery, 1, + new Sort(new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG, true))); + // If the previous copy is an index, that copy must be retained + if (previousCopy instanceof Engine.Index) { + assertThat("operation id=" + previousCopy.id() + " seq=" + previousCopy.seqNo() + " has been reclaimed", + previousDocs.totalHits, greaterThanOrEqualTo(1L)); + long foundSeqNo = readSeqNo(reader, previousDocs.scoreDocs[0].doc); + assertThat("rollback id=" + rollbackOp.id() + " seq=" + rollbackOp.seqNo(), foundSeqNo, equalTo(previousCopy.seqNo())); + // If the previous copy is a delete, either that copy or another delete or nothing is retained, but must not an index + } else { + if (previousDocs.totalHits > 0) { + long actualSeqNo = readSeqNo(reader, previousDocs.scoreDocs[0].doc); + Engine.Operation foundOp = processedOps.get(actualSeqNo); + assertThat("rollback id=" + rollbackOp.id() + " seq=" + rollbackOp.seqNo() + ", found seq=" + foundOp.seqNo() + + ", expected seq=" + previousCopy.seqNo(), foundOp, instanceOf(Engine.Delete.class)); + } + } + } + } + } + + private long readSeqNo(DirectoryReader reader, int docId) throws IOException { + List leaves = reader.leaves(); + LeafReaderContext leaf = leaves.get(ReaderUtil.subIndex(docId, leaves)); + int segmentDocId = docId - leaf.docBase; + NumericDocValues dv = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + assert dv != null : "SeqNoDV does not exist"; + if (dv.advanceExact(segmentDocId) == false) { + throw new AssertionError("doc " + docId + " does not have SeqNoDV"); + } + return dv.longValue(); + } + private static void trimUnsafeCommits(EngineConfig config) throws IOException { final Store store = config.getStore(); final TranslogConfig translogConfig = config.getTranslogConfig(); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 7e72dcf2aedf6..b3f3b8d13f3df 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -49,6 +49,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.Lucene; @@ -667,6 +668,19 @@ public static List generateSingleDocHistory(boolean forReplica return ops; } + /** + * Partitions a list of operations into a multiple sub-lists, then shuffles each sub-list. + * This method shuffles operations in a more realistic way than {@link Randomness#shuffle(List)}. + */ + public void realisticShuffleOperations(List operations) { + int index = 0; + while (index < operations.size()) { + int to = Math.min(operations.size(), index + between(10, 20)); + Randomness.shuffle(operations.subList(index, to)); // subList is a direct view + index = to; + } + } + public static void assertOpsOnReplica( final List ops, final InternalEngine replicaEngine, From 615230d758cdb8da2f9444981b10a6f72f388998 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 1 Aug 2018 12:00:23 -0400 Subject: [PATCH 13/13] address feedbacks on the random test --- .../index/engine/InternalEngineTests.java | 68 ++++++++----------- .../index/engine/EngineTestCase.java | 23 +++++-- 2 files changed, 44 insertions(+), 47 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 091c7531e0168..ec3e781bbcff4 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5035,7 +5035,7 @@ public void testRecordUpdatedBySeqNo() throws Exception { // index#0 -> index#1 -> delete#3 -> index#2 -> index#5 -> delete#4 assertThat(seqNos, containsInAnyOrder( Tuple.tuple(0L, 1L), // 0 -> 1 - seq#0 should be updated once because of refresh - Tuple.tuple(1L, 3L), // 1 -> 5 (deleted by #3, then updated by #5) + Tuple.tuple(1L, 5L), // 1 -> 5 (deleted by #3, then updated by #5) Tuple.tuple(2L, 3L), // index#2 is stale by delete#3, thus should never be updated again. Tuple.tuple(3L, null), // the first delete tombstone without updated_by_seqno Tuple.tuple(3L, 5L), // the second delete tombstone without updated_by_seqno @@ -5073,26 +5073,12 @@ public void testKeepDocsForRollback() throws Exception { threadPool = spy(threadPool); AtomicLong clockTime = new AtomicLong(); when(threadPool.relativeTimeInMillis()).thenAnswer(i -> clockTime.incrementAndGet()); - List operations = new ArrayList<>(); - int numOps = scaledRandomIntBetween(10, 200); - for (int seqNo = 0; seqNo < numOps; seqNo++) { - String id = Integer.toString(between(1, 5)); - if (randomBoolean()) { - ParsedDocument parseDoc = createParsedDoc(id, null); - operations.add(new Engine.Index(newUid(parseDoc), parseDoc, seqNo, primaryTerm.get(), 1, null, - REPLICA, between(1, 10), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, randomBoolean())); - } else { - operations.add(new Engine.Delete("test", id, newUid(id), seqNo, primaryTerm.get(), 1, null, - REPLICA, between(1, 10))); - } - } Settings.Builder settings = Settings.builder() .put(defaultSettings.getSettings()) .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), TimeValue.timeValueMillis(between(0, 100)).getStringRep()) .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); - realisticShuffleOperations(operations); long globalCheckpoint = SequenceNumbers.NO_OPS_PERFORMED; Map processedOps = new HashMap<>(); SetOnce indexWriter = new SetOnce<>(); @@ -5103,6 +5089,9 @@ public void testKeepDocsForRollback() throws Exception { Set lastTombstones = Collections.emptySet(); try (Store store = createStore(); InternalEngine engine = createEngine(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, indexWriterFactory)) { + List operations = generateMultipleDocumentsHistory(true, VersionType.EXTERNAL, 1L, + 10, 200, Arrays.asList("1", "2", "3", "4", "5"), false, () -> between(1, 100)); + realisticShuffleOperations(operations); for (Engine.Operation op : operations) { Set tombstones = engine.getDeletedTombstones().stream().map(del -> del.seqNo).collect(Collectors.toSet()); if (op instanceof Engine.Index) { @@ -5113,13 +5102,11 @@ public void testKeepDocsForRollback() throws Exception { engine.delete((Engine.Delete) op); } processedOps.put(op.seqNo(), op); - if (between(1, 20) == 1) { + if (randomInt(100) < 20) { assertDocumentsForRollback(engine, globalCheckpoint, processedOps); } - if (between(1, 5) == 1) { - engine.maybePruneDeletes(); - } - if (between(1, 20) == 1) { + // Trigger merge to reclaim some deleted documents + if (randomBoolean()) { BooleanQuery retentionQuery = new BooleanQuery.Builder() .add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, globalCheckpoint + 1, Long.MAX_VALUE), BooleanClause.Occur.SHOULD) @@ -5132,12 +5119,16 @@ public void testKeepDocsForRollback() throws Exception { logger.debug("merge reclaim id={} seq={}", reclaimedOp.id(), reclaimedOp.seqNo()); } } - globalCheckpoint = randomLongBetween(globalCheckpoint, engine.getLocalCheckpoint()); + // Prune some delete tombstone + if (randomBoolean()) { + engine.maybePruneDeletes(); + } Set prunedTombstone = Sets.difference(lastTombstones, tombstones); for (long prunedSeqNo : prunedTombstone) { logger.debug("prune tombstone id={} seq={}", processedOps.get(prunedSeqNo).id(), prunedSeqNo); } lastTombstones = tombstones; + globalCheckpoint = randomLongBetween(globalCheckpoint, engine.getLocalCheckpoint()); } assertDocumentsForRollback(engine, globalCheckpoint, processedOps); } @@ -5183,17 +5174,14 @@ private List simulateMerge(InternalEngine engine, IndexWriter indexWriter, */ private void assertDocumentsForRollback(InternalEngine engine, long globalCheckpoint, Map processedOps) throws IOException { - List rollbackOps = processedOps.values().stream() - .filter(op -> op.seqNo() > globalCheckpoint).collect(Collectors.toList()); - Map previousCopies = new HashMap<>(); - for (Engine.Operation op : rollbackOps) { - processedOps.values().stream().filter(target -> target.seqNo() < op.seqNo() && target.id().equals(op.id())) - .forEach(target -> { - previousCopies.compute(op.seqNo(), (k, v) -> { - if (v == null || v.seqNo() < target.seqNo()) return target; - else return v; - }); - }); + List rollbackOps = new ArrayList<>(); + Map restoringVersions = new HashMap<>(); + for (Engine.Operation op : processedOps.values()) { + if (op.seqNo() > globalCheckpoint) { + rollbackOps.add(op); + } else { + restoringVersions.compute(op.id(), (id, v) -> v == null || v.seqNo() < op.seqNo() ? op : v); + } } engine.refresh("test", Engine.SearcherScope.INTERNAL); try (Searcher engineSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { @@ -5201,29 +5189,29 @@ private void assertDocumentsForRollback(InternalEngine engine, long globalCheckp IndexSearcher searcher = new IndexSearcher(reader); searcher.setQueryCache(null); for (Engine.Operation rollbackOp : rollbackOps) { - Engine.Operation previousCopy = previousCopies.get(rollbackOp.seqNo()); - if (previousCopy == null) { + Engine.Operation restoringVersion = restoringVersions.get(rollbackOp.id()); + if (restoringVersion == null) { continue; } BooleanQuery previousQuery = new BooleanQuery.Builder() .add(new TermQuery(rollbackOp.uid()), BooleanClause.Occur.FILTER) - .add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, 0, rollbackOp.seqNo() - 1), BooleanClause.Occur.FILTER) + .add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, 0, globalCheckpoint), BooleanClause.Occur.FILTER) .build(); TopDocs previousDocs = searcher.search(previousQuery, 1, new Sort(new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG, true))); // If the previous copy is an index, that copy must be retained - if (previousCopy instanceof Engine.Index) { - assertThat("operation id=" + previousCopy.id() + " seq=" + previousCopy.seqNo() + " has been reclaimed", - previousDocs.totalHits, greaterThanOrEqualTo(1L)); + if (restoringVersion instanceof Engine.Index) { + assertThat("restoring operation id=" + restoringVersion.id() + " seq=" + restoringVersion.seqNo() + + " has been reclaimed", previousDocs.totalHits, greaterThanOrEqualTo(1L)); long foundSeqNo = readSeqNo(reader, previousDocs.scoreDocs[0].doc); - assertThat("rollback id=" + rollbackOp.id() + " seq=" + rollbackOp.seqNo(), foundSeqNo, equalTo(previousCopy.seqNo())); + assertThat("rollback id=" + rollbackOp.id() + " seq=" + rollbackOp.seqNo(), foundSeqNo, equalTo(restoringVersion.seqNo())); // If the previous copy is a delete, either that copy or another delete or nothing is retained, but must not an index } else { if (previousDocs.totalHits > 0) { long actualSeqNo = readSeqNo(reader, previousDocs.scoreDocs[0].doc); Engine.Operation foundOp = processedOps.get(actualSeqNo); assertThat("rollback id=" + rollbackOp.id() + " seq=" + rollbackOp.seqNo() + ", found seq=" + foundOp.seqNo() - + ", expected seq=" + previousCopy.seqNo(), foundOp, instanceOf(Engine.Delete.class)); + + ", expected seq=" + restoringVersion.seqNo(), foundOp, instanceOf(Engine.Delete.class)); } } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index b3f3b8d13f3df..54b7171c3d2f4 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -620,15 +620,24 @@ protected static void assertVisibleCount(InternalEngine engine, int numDocs, boo public static List generateSingleDocHistory(boolean forReplica, VersionType versionType, long primaryTerm, int minOpCount, int maxOpCount, String docId) { + return generateMultipleDocumentsHistory(forReplica, versionType, primaryTerm, minOpCount, maxOpCount, + Collections.singletonList(docId), true, System::currentTimeMillis); + } + + public static List generateMultipleDocumentsHistory(boolean forReplica, VersionType versionType, long primaryTerm, + int minOpCount, int maxOpCount, List docIds, + boolean allowGapInSeqNo, LongSupplier currentTimeInMsSupplier) { final int numOfOps = randomIntBetween(minOpCount, maxOpCount); final List ops = new ArrayList<>(); - final Term id = newUid(docId); final int startWithSeqNo = 0; - final String valuePrefix = (forReplica ? "r_" : "p_" ) + docId + "_"; final boolean incrementTermWhenIntroducingSeqNo = randomBoolean(); + final int seqNoGap = allowGapInSeqNo && randomBoolean() ? 2 : 1; for (int i = 0; i < numOfOps; i++) { final Engine.Operation op; final long version; + String docId = randomFrom(docIds); + final Term id = newUid(docId); + final String valuePrefix = (forReplica ? "r_" : "p_" ) + docId + "_"; switch (versionType) { case INTERNAL: version = forReplica ? i : Versions.MATCH_ANY; @@ -647,21 +656,21 @@ public static List generateSingleDocHistory(boolean forReplica } if (randomBoolean()) { op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), B_1, null), - forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, + forReplica && i >= startWithSeqNo ? i * seqNoGap : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, forReplica ? null : versionType, forReplica ? REPLICA : PRIMARY, - System.currentTimeMillis(), -1, false + currentTimeInMsSupplier.getAsLong(), -1, false ); } else { op = new Engine.Delete("test", docId, id, - forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, + forReplica && i >= startWithSeqNo ? i * seqNoGap : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, forReplica ? null : versionType, forReplica ? REPLICA : PRIMARY, - System.currentTimeMillis()); + currentTimeInMsSupplier.getAsLong()); } ops.add(op); } @@ -675,7 +684,7 @@ public static List generateSingleDocHistory(boolean forReplica public void realisticShuffleOperations(List operations) { int index = 0; while (index < operations.size()) { - int to = Math.min(operations.size(), index + between(10, 20)); + int to = Math.min(operations.size(), index + between(10, 50)); Randomness.shuffle(operations.subList(index, to)); // subList is a direct view index = to; }