From b0f301433c241b332dc20b3b944c6c3ba345cb06 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 4 Jul 2018 10:04:10 -0400 Subject: [PATCH] Do not add multiple copies of stale docs to Lucene Since #29679 we started adding stale operations to Lucene to have a complete history in Lucene. As the stale docs are rare, we accepted to have duplicate copies of them to keep an engine simple. However, we now need to make sure that we have a single copy per stale operation in Lucene because the Lucene rollback requires a single document for each sequence number. --- .../index/engine/InternalEngine.java | 12 ++++-- .../index/seqno/LocalCheckpointTracker.java | 15 +++++++ .../index/engine/InternalEngineTests.java | 39 +++++++++++++++++++ .../seqno/LocalCheckpointTrackerTests.java | 13 +++++++ 4 files changed, 75 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 99dfb908711e3..58522a36b3a3d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -929,7 +929,8 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO } else { final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version()); + boolean addStaleOpToLucene = softDeleteEnabled && localCheckpointTracker.isProcessed(index.seqNo()) == false; + plan = IndexingStrategy.processAsStaleOp(addStaleOpToLucene, index.seqNo(), index.version()); } else { plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.seqNo(), index.version()); @@ -1258,7 +1259,8 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws } else { final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version()); + boolean addStaleOpToLucene = softDeleteEnabled && localCheckpointTracker.isProcessed(delete.seqNo()) == false; + plan = DeletionStrategy.processAsStaleOp(addStaleOpToLucene, false, delete.seqNo(), delete.version()); } else { plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.seqNo(), delete.version()); @@ -1400,7 +1402,9 @@ public void maybePruneDeletes() { @Override public NoOpResult noOp(final NoOp noOp) { NoOpResult noOpResult; - try (ReleasableLock ignored = readLock.acquire()) { + try (ReleasableLock ignored = readLock.acquire(); + // prevent two noOps with same seqno get in at the same time + Releasable uidLock = versionMap.acquireLock(new BytesRef(Long.toString(noOp.seqNo())))) { noOpResult = innerNoOp(noOp); } catch (final Exception e) { noOpResult = new NoOpResult(noOp.seqNo(), e); @@ -1414,7 +1418,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { final long seqNo = noOp.seqNo(); try { Exception failure = null; - if (softDeleteEnabled) { + if (softDeleteEnabled && localCheckpointTracker.isProcessed(noOp.seqNo()) == false) { try { final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason()); tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm()); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index cd33c1bf046ed..b5a1fffdb028c 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -103,6 +103,21 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) { } } + /** + * Checks if the given sequence number has been processed (and tracked) in this tracker. + */ + public synchronized boolean isProcessed(long seqNo) { + if (seqNo <= checkpoint) { + return true; + } + if (seqNo >= nextSeqNo) { + return false; + } + final long bitSetKey = getBitSetKey(seqNo); + final CountedBitSet bitSet = processedSeqNo.get(bitSetKey); + return bitSet != null && bitSet.get(seqNoToBitSetOffset(seqNo)); + } + /** * Resets the checkpoint to the specified value. * diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 21f1781cc65a0..5cfa153444593 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -139,6 +139,7 @@ import java.util.Base64; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; @@ -4947,6 +4948,44 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { } } + public void testDoNotIndexDuplicateStaleDocsToLucene() throws Exception { + int numOps = scaledRandomIntBetween(10, 200); + List ops = new ArrayList<>(); + Map versions = new HashMap<>(); + for (int seqNo = 0; seqNo < numOps; seqNo++) { + String id = Integer.toString(randomIntBetween(1, 5)); + long version = versions.compute(id, (k, v) -> (v == null ? 1 : v) + between(1, 10)); + int copies = between(1, 3); + if (randomBoolean()) { + for (int i = 0; i < copies; i++) { + ops.add(replicaIndexForDoc(createParsedDoc(id, null), version, seqNo, randomBoolean())); + } + } else if (frequently()) { + for (int i = 0; i < copies; i++) { + ops.add(replicaDeleteForDoc(id, version, seqNo, randomNonNegativeLong())); + } + } else { + for (int i = 0; i < copies; i++) { + ops.add(new Engine.NoOp(seqNo, primaryTerm.get(), REPLICA, randomNonNegativeLong(), "test-" + seqNo)); + } + } + } + Settings settings = Settings.builder().put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); + IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); + try (Store store = createStore(); + InternalEngine engine = createEngine( + config(IndexSettingsModule.newIndexSettings(indexMetaData), store, createTempDir(), newMergePolicy(), null))) { + Randomness.shuffle(ops); + concurrentlyApplyOps(ops, engine); + engine.refresh("test", Engine.SearcherScope.INTERNAL); + try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + assertThat(searcher.reader().maxDoc(), equalTo(numOps)); + } + assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test")); + } + } + private static void trimUnsafeCommits(EngineConfig config) throws IOException { final Store store = config.getStore(); final TranslogConfig translogConfig = config.getTranslogConfig(); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index 932fb71790800..b6d46248796b3 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -77,12 +77,20 @@ public void testSimplePrimary() { public void testSimpleReplica() { assertThat(tracker.getCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); + assertThat(tracker.isProcessed(randomNonNegativeLong()), equalTo(false)); tracker.markSeqNoAsCompleted(0L); assertThat(tracker.getCheckpoint(), equalTo(0L)); + assertThat(tracker.isProcessed(0L), equalTo(true)); + assertThat(tracker.isProcessed(between(1, Integer.MAX_VALUE)), equalTo(false)); tracker.markSeqNoAsCompleted(2L); assertThat(tracker.getCheckpoint(), equalTo(0L)); + assertThat(tracker.isProcessed(1L), equalTo(false)); + assertThat(tracker.isProcessed(2L), equalTo(true)); + assertThat(tracker.isProcessed(between(3, Integer.MAX_VALUE)), equalTo(false)); tracker.markSeqNoAsCompleted(1L); assertThat(tracker.getCheckpoint(), equalTo(2L)); + assertThat(tracker.isProcessed(between(0, 2)), equalTo(true)); + assertThat(tracker.isProcessed(between(3, Integer.MAX_VALUE)), equalTo(false)); } public void testLazyInitialization() { @@ -199,9 +207,14 @@ protected void doRun() throws Exception { } assertThat(tracker.getMaxSeqNo(), equalTo(maxOps - 1L)); assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L)); + assertThat(tracker.isProcessed(randomValueOtherThan((int) unFinishedSeq, () -> randomFrom(seqNos))), equalTo(true)); + assertThat(tracker.isProcessed(unFinishedSeq), equalTo(false)); + assertThat(tracker.isProcessed(between(maxOps, Integer.MAX_VALUE)), equalTo(false)); tracker.markSeqNoAsCompleted(unFinishedSeq); assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L)); assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1)); + assertThat(tracker.isProcessed(randomFrom(seqNos)), equalTo(true)); + assertThat(tracker.isProcessed(between(maxOps, Integer.MAX_VALUE)), equalTo(false)); if (tracker.processedSeqNo.size() == 1) { assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE)); }