From aeec04f85e7e42ee971c69690d53962c1368f478 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 25 Jun 2019 15:02:08 -0400 Subject: [PATCH 01/10] Enable MSU optimization in InternalEngine --- .../index/engine/InternalEngine.java | 16 ++++++--- .../index/engine/InternalEngineTests.java | 33 +++++++++++++++---- .../IndexLevelReplicationTests.java | 29 ++++++++++++++++ .../index/engine/EngineTestCase.java | 7 ++++ .../ccr/index/engine/FollowingEngine.java | 17 ---------- .../ShardFollowTaskReplicationTests.java | 4 +-- .../index/engine/FollowingEngineTests.java | 31 +++++++++-------- 7 files changed, 94 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 97d8d154ab099..382695a84fb0d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -963,11 +963,12 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO assert maxSeqNoOfNonAppendOnlyOperations.get() >= index.seqNo() : "max_seqno of non-append-only was not updated;" + "max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]"; } - versionMap.enforceSafeAccess(); // unlike the primary, replicas don't really care to about creation status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return false for the created flag in favor of code simplicity - if (index.seqNo() <= localCheckpointTracker.getProcessedCheckpoint()){ + final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); + final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); + if (index.seqNo() <= localCheckpoint) { // the operation seq# is lower then the current local checkpoint and thus was already put into lucene // this can happen during recovery where older operations are sent from the translog that are already // part of the lucene commit (either from a peer recovery or a local translog) @@ -976,13 +977,18 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery // See testRecoveryWithOutOfOrderDelete for an example of peer recovery plan = IndexingStrategy.processButSkipLucene(false, index.version()); + + // See Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers + } else if (softDeleteEnabled && maxSeqNoOfUpdatesOrDeletes <= localCheckpoint && hasBeenProcessedBefore(index) == false) { + assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes; + plan = IndexingStrategy.optimizedAppendOnly(index.version()); } else { + versionMap.enforceSafeAccess(); final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.version()); } else { - plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, - index.version()); + plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version()); } } } @@ -1166,7 +1172,7 @@ private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpda Optional.of(earlyResultOnPreFlightError); } - public static IndexingStrategy optimizedAppendOnly(long versionForIndexing) { + static IndexingStrategy optimizedAppendOnly(long versionForIndexing) { return new IndexingStrategy(true, false, true, false, versionForIndexing, null); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 59bbee9f1bbf5..c4ebb165452fe 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3596,32 +3596,42 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { public void testDoubleDeliveryReplica() throws IOException { final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); + final boolean msuOptimization; + if (engine.config().getIndexSettings().isSoftDeleteEnabled() == false) { + msuOptimization = false; + } else if (randomBoolean()) { + engine.advanceMaxSeqNoOfUpdatesOrDeletes(randomLongBetween(20, Long.MAX_VALUE)); // disable MSU + msuOptimization = false; + } else { + msuOptimization = true; + } + Engine.Index operation = replicaIndexForDoc(doc, 1, 20, false); Engine.Index duplicate = replicaIndexForDoc(doc, 1, 20, true); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(1, engine.getNumVersionLookups()); + assertEquals(msuOptimization ? 0 : 1, engine.getNumVersionLookups()); assertNotNull(indexResult.getTranslogLocation()); if (randomBoolean()) { engine.refresh("test"); } Engine.IndexResult retryResult = engine.index(duplicate); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(2, engine.getNumVersionLookups()); + assertEquals(msuOptimization ? 1 : 2, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); } else { Engine.IndexResult retryResult = engine.index(duplicate); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(1, engine.getNumVersionLookups()); + assertEquals(msuOptimization ? 0 : 1, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); if (randomBoolean()) { engine.refresh("test"); } Engine.IndexResult indexResult = engine.index(operation); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(2, engine.getNumVersionLookups()); + assertEquals(msuOptimization ? 1 : 2, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); } @@ -5275,6 +5285,7 @@ public void testTrackMaxSeqNoOfNonAppendOnlyOperations() throws Exception { public void testSkipOptimizeForExposedAppendOnlyOperations() throws Exception { long lookupTimes = 0L; final int initDocs = between(0, 10); + boolean msuOptimization = engine.config().getIndexSettings().isSoftDeleteEnabled(); for (int i = 0; i < initDocs; i++) { index(engine, i); lookupTimes++; @@ -5282,20 +5293,30 @@ public void testSkipOptimizeForExposedAppendOnlyOperations() throws Exception { // doc1 is delayed and arrived after a non-append-only op. final long seqNoAppendOnly1 = generateNewSeqNo(engine); final long seqnoNormalOp = generateNewSeqNo(engine); + if (msuOptimization && randomBoolean()) { + msuOptimization = false; + engine.advanceMaxSeqNoOfUpdatesOrDeletes(randomLongBetween(seqnoNormalOp, Long.MAX_VALUE)); + } if (randomBoolean()) { engine.index(replicaIndexForDoc( testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, seqnoNormalOp, false)); + if (msuOptimization == false) { + lookupTimes++; + } } else { engine.delete(replicaDeleteForDoc("d", 1, seqnoNormalOp, randomNonNegativeLong())); + msuOptimization = false; + lookupTimes++; } - lookupTimes++; assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(seqnoNormalOp)); // should not optimize for doc1 and process as a regular doc (eg. look up in version map) engine.index(appendOnlyReplica(testParsedDocument("append-only-1", null, testDocumentWithTextField(), SOURCE, null), false, randomNonNegativeLong(), seqNoAppendOnly1)); - lookupTimes++; + if (msuOptimization == false) { + lookupTimes++; + } assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); // optimize for other append-only 2 (its seqno > max_seqno of non-append-only) - do not look up in version map. diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index e25557eaabcf6..5f6abea9a312a 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineTests; import org.elasticsearch.index.engine.SegmentsStats; @@ -59,8 +60,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Future; @@ -645,4 +648,30 @@ public void testOutOfOrderDeliveryForAppendOnlyOperations() throws Exception { shards.assertAllEqual(0); } } + + public void testIndexingOptimizationUsingSequenceNumbers() throws Exception { + final Set liveDocs = new HashSet<>(); + try (ReplicationGroup group = createGroup(2)) { + boolean softDeleteEnabled = group.getPrimary().indexSettings().isSoftDeleteEnabled(); + group.startAll(); + int numDocs = randomIntBetween(1, 100); + long versionLookups = 0; + for (int i = 0; i < numDocs; i++) { + String id = Integer.toString(randomIntBetween(1, 100)); + if (randomBoolean()) { + group.index(new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON)); + if (softDeleteEnabled == false || liveDocs.add(id) == false) { + versionLookups++; + } + } else { + group.delete(new DeleteRequest(index.getName(), "type", id)); + liveDocs.remove(id); + versionLookups++; + } + } + for (IndexShard replica : group.getReplicas()) { + assertThat(EngineTestCase.getNumVersionLookups(getEngine(replica)), equalTo(versionLookups)); + } + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index e68ae70921263..3aa8700fe0097 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1187,4 +1187,11 @@ static long maxSeqNosInReader(DirectoryReader reader) throws IOException { } return maxSeqNo; } + + /** + * Returns the number of times a version was looked up either from version map or from the index. + */ + public static long getNumVersionLookups(Engine engine) { + return ((InternalEngine) engine).getNumVersionLookups(); + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 8d4f0b219bd2c..799309751ab27 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -18,7 +18,6 @@ import org.apache.lucene.search.TopDocs; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.InternalEngine; @@ -36,7 +35,6 @@ */ public final class FollowingEngine extends InternalEngine { - private final CounterMetric numOfOptimizedIndexing = new CounterMetric(); /** * Construct a new following engine with the specified engine configuration. @@ -68,9 +66,6 @@ private void preFlight(final Operation operation) { @Override protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException { preFlight(index); - // NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers. - final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); - assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; if (hasBeenProcessedBefore(index)) { if (logger.isTraceEnabled()) { logger.trace("index operation [id={} seq_no={} origin={}] was processed before", index.id(), index.seqNo(), index.origin()); @@ -90,10 +85,6 @@ protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Ind } else { return IndexingStrategy.processButSkipLucene(false, index.version()); } - } else if (maxSeqNoOfUpdatesOrDeletes <= getProcessedLocalCheckpoint()) { - assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : "seq_no[" + index.seqNo() + "] <= msu[" + maxSeqNoOfUpdatesOrDeletes + "]"; - numOfOptimizedIndexing.inc(); - return InternalEngine.IndexingStrategy.optimizedAppendOnly(index.version()); } else { return planIndexingAsNonPrimary(index); } @@ -202,14 +193,6 @@ private OptionalLong lookupPrimaryTerm(final long seqNo) throws IOException { } } - /** - * Returns the number of indexing operations that have been optimized (bypass version lookup) using sequence numbers in this engine. - * This metric is not persisted, and started from 0 when the engine is opened. - */ - public long getNumberOfOptimizedIndexing() { - return numOfOptimizedIndexing.count(); - } - @Override public void verifyEngineBeforeIndexClosing() throws IllegalStateException { // the value of the global checkpoint is not verified when the following engine is closed, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index f88b6542392c8..1d3e5ad7e4f6a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IndexShard; @@ -55,7 +56,6 @@ import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; -import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; import java.io.IOException; @@ -110,7 +110,7 @@ public void testSimpleCcrReplication() throws Exception { followerGroup.assertAllEqual(indexedDocIds.size()); }); for (IndexShard shard : followerGroup) { - assertThat(((FollowingEngine) (getEngine(shard))).getNumberOfOptimizedIndexing(), equalTo((long) docCount)); + assertThat(EngineTestCase.getNumVersionLookups(getEngine(shard)), equalTo(0L)); } // Deletes should be replicated to the follower List deleteDocIds = randomSubsetOf(indexedDocIds); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 98bfa1b2068bb..dccfe405bf5b9 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -59,6 +59,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.index.engine.EngineTestCase.getDocIds; +import static org.elasticsearch.index.engine.EngineTestCase.getNumVersionLookups; import static org.elasticsearch.index.engine.EngineTestCase.getTranslog; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -339,23 +340,27 @@ public void testBasicOptimization() throws Exception { } EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L)); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs)); + assertThat(getNumVersionLookups(follower), equalTo(0L)); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); // Do not apply optimization for deletes or updates + long versionLookUps = 0; for (int i = 0; i < numDocs; i++) { if (randomBoolean()) { + versionLookUps++; leader.index(indexForPrimary(Integer.toString(i))); } else if (randomBoolean()) { + versionLookUps++; leader.delete(deleteForPrimary(Integer.toString(i))); } } EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); + assertThat(getNumVersionLookups(follower), greaterThanOrEqualTo(versionLookUps)); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes())); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs)); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); // Apply optimization for documents that do not exist long moreDocs = between(1, 100); + versionLookUps = getNumVersionLookups(follower); Set docIds = getDocIds(follower, true).stream().map(doc -> doc.getId()).collect(Collectors.toSet()); for (int i = 0; i < moreDocs; i++) { String docId = randomValueOtherThanMany(docIds::contains, () -> Integer.toString(between(1, 1000))); @@ -364,7 +369,7 @@ public void testBasicOptimization() throws Exception { } EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes())); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs + moreDocs)); + assertThat(getNumVersionLookups(follower), equalTo(versionLookUps)); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); }); } @@ -379,7 +384,7 @@ public void testOptimizeAppendOnly() throws Exception { EngineTestCase.concurrentlyApplyOps(ops, leader); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L)); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo((long) numOps)); + assertThat(getNumVersionLookups(follower), equalTo(0L)); }); } @@ -397,13 +402,14 @@ public void testOptimizeMultipleVersions() throws Exception { runFollowTest((leader, follower) -> { EngineTestCase.concurrentlyApplyOps(ops, leader); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); + long numVersionLookups = getNumVersionLookups(follower); final List appendOps = new ArrayList<>(); for (int numAppends = scaledRandomIntBetween(0, 100), i = 0; i < numAppends; i++) { appendOps.add(indexForPrimary("append-" + i)); } EngineTestCase.concurrentlyApplyOps(appendOps, leader); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), greaterThanOrEqualTo((long) appendOps.size())); + assertThat(getNumVersionLookups(follower), equalTo(numVersionLookups)); }); } @@ -411,19 +417,19 @@ public void testOptimizeSingleDocSequentially() throws Exception { runFollowTest((leader, follower) -> { leader.index(indexForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(1L)); + assertThat(getNumVersionLookups(follower), equalTo(0L)); leader.delete(deleteForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(1L)); + assertThat(getNumVersionLookups(follower), equalTo(1L)); leader.index(indexForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(2L)); + assertThat(getNumVersionLookups(follower), equalTo(1L)); leader.index(indexForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(2L)); + assertThat(getNumVersionLookups(follower), equalTo(2L)); }); } @@ -434,19 +440,18 @@ public void testOptimizeSingleDocConcurrently() throws Exception { EngineTestCase.concurrentlyApplyOps(ops, leader); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); - long numOptimized = follower.getNumberOfOptimizedIndexing(); leader.delete(deleteForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized)); + long numVersionLookups = getNumVersionLookups(follower); leader.index(indexForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized + 1L)); + assertThat(getNumVersionLookups(follower), equalTo(numVersionLookups)); leader.index(indexForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized + 1L)); + assertThat(getNumVersionLookups(follower), equalTo(numVersionLookups + 1L)); }); } From 1e2cecba40e0cf685cc1026c633302f450171e11 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 26 Jun 2019 11:53:20 -0400 Subject: [PATCH 02/10] =?UTF-8?q?don=E2=80=99t=20require=20soft-deletes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 382695a84fb0d..e95351e6dc86a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -978,8 +978,8 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO // See testRecoveryWithOutOfOrderDelete for an example of peer recovery plan = IndexingStrategy.processButSkipLucene(false, index.version()); - // See Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers - } else if (softDeleteEnabled && maxSeqNoOfUpdatesOrDeletes <= localCheckpoint && hasBeenProcessedBefore(index) == false) { + } else if (maxSeqNoOfUpdatesOrDeletes <= localCheckpoint && hasBeenProcessedBefore(index) == false) { + // see Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes; plan = IndexingStrategy.optimizedAppendOnly(index.version()); } else { From b09a3dd4f55ad28091ae619a35ce66c2816c995e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 26 Jun 2019 13:16:14 -0400 Subject: [PATCH 03/10] deactive append-only optimization on replicas --- .../index/engine/InternalEngine.java | 89 ++++------- .../index/engine/InternalEngineTests.java | 139 ++---------------- .../IndexLevelReplicationTests.java | 9 +- .../ccr/index/engine/FollowingEngine.java | 31 ++-- 4 files changed, 57 insertions(+), 211 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index e95351e6dc86a..32c030bab059d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -158,7 +158,6 @@ public class InternalEngine extends Engine { private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1); - private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); // max_seq_no_of_updates_or_deletes tracks the max seq_no of update or delete operations that have been processed in this engine. // An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index. // The value of this marker never goes backwards, and is tracked/updated differently on primary and replica. @@ -408,17 +407,11 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException { private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) { for (Map.Entry entry : writer.getLiveCommitData()) { - final String key = entry.getKey(); - if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) { + if (MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID.equals(entry.getKey())) { assert maxUnsafeAutoIdTimestamp.get() == -1 : "max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]"; updateAutoIdTimestamp(Long.parseLong(entry.getValue()), true); } - if (key.equals(SequenceNumbers.MAX_SEQ_NO)) { - assert maxSeqNoOfNonAppendOnlyOperations.get() == -1 : - "max unsafe append-only seq# was assigned already [" + maxSeqNoOfNonAppendOnlyOperations.get() + "]"; - maxSeqNoOfNonAppendOnlyOperations.set(Long.parseLong(entry.getValue())); - } } } @@ -944,52 +937,35 @@ public IndexResult index(Index index) throws IOException { protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException { assert assertNonPrimaryOrigin(index); + // needs to maintain the auto_id timestamp in case this replica becomes primary + if (canOptimizeAddDocument(index)) { + mayHaveBeenIndexedBefore(index); + } final IndexingStrategy plan; - final boolean appendOnlyRequest = canOptimizeAddDocument(index); - if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) { - /* - * As soon as an append-only request was indexed into the primary, it can be exposed to a search then users can issue - * a follow-up operation on it. In rare cases, the follow up operation can be arrived and processed on a replica before - * the original append-only. In this case we can't simply proceed with the append only without consulting the version map. - * If a replica has seen a non-append-only operation with a higher seqno than the seqno of an append-only, it may have seen - * the document of that append-only request. However if the seqno of an append-only is higher than seqno of any non-append-only - * requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization. - */ - assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]"; - plan = IndexingStrategy.optimizedAppendOnly(1L); + // unlike the primary, replicas don't really care to about creation status of documents + // this allows to ignore the case where a document was found in the live version maps in + // a delete state and return false for the created flag in favor of code simplicity + final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); + if (hasBeenProcessedBefore(index)) { + // the operation seq# is lower then the current local checkpoint and thus was already put into lucene + // this can happen during recovery where older operations are sent from the translog that are already + // part of the lucene commit (either from a peer recovery or a local translog) + // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in + // question may have been deleted in an out of order op that is not replayed. + // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery + // See testRecoveryWithOutOfOrderDelete for an example of peer recovery + plan = IndexingStrategy.processButSkipLucene(false, index.version()); + } else if (maxSeqNoOfUpdatesOrDeletes <= localCheckpointTracker.getProcessedCheckpoint()) { + // see Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers + assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes; + plan = IndexingStrategy.optimizedAppendOnly(index.version()); } else { - if (appendOnlyRequest == false) { - maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr)); - assert maxSeqNoOfNonAppendOnlyOperations.get() >= index.seqNo() : "max_seqno of non-append-only was not updated;" + - "max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]"; - } - // unlike the primary, replicas don't really care to about creation status of documents - // this allows to ignore the case where a document was found in the live version maps in - // a delete state and return false for the created flag in favor of code simplicity - final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); - final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); - if (index.seqNo() <= localCheckpoint) { - // the operation seq# is lower then the current local checkpoint and thus was already put into lucene - // this can happen during recovery where older operations are sent from the translog that are already - // part of the lucene commit (either from a peer recovery or a local translog) - // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in - // question may have been deleted in an out of order op that is not replayed. - // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery - // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - plan = IndexingStrategy.processButSkipLucene(false, index.version()); - - } else if (maxSeqNoOfUpdatesOrDeletes <= localCheckpoint && hasBeenProcessedBefore(index) == false) { - // see Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers - assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes; - plan = IndexingStrategy.optimizedAppendOnly(index.version()); + versionMap.enforceSafeAccess(); + final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); + if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { + plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.version()); } else { - versionMap.enforceSafeAccess(); - final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.version()); - } else { - plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version()); - } + plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version()); } } return plan; @@ -1119,11 +1095,6 @@ private boolean mayHaveBeenIndexedBefore(Index index) { return mayHaveBeenIndexBefore; } - // for testing - long getMaxSeqNoOfNonAppendOnlyOperations() { - return maxSeqNoOfNonAppendOnlyOperations.get(); - } - private void addDocs(final List docs, final IndexWriter indexWriter) throws IOException { if (docs.size() > 1) { indexWriter.addDocuments(docs); @@ -1317,12 +1288,6 @@ protected DeletionStrategy deletionStrategyForOperation(final Delete delete) thr protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException { assert assertNonPrimaryOrigin(delete); - maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr)); - assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" + - "max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]"; - // unlike the primary, replicas don't really care to about found status of documents - // this allows to ignore the case where a document was found in the live version maps in - // a delete state and return true for the found flag in favor of code simplicity final DeletionStrategy plan; if (delete.seqNo() <= localCheckpointTracker.getProcessedCheckpoint()) { // the operation seq# is lower then the current local checkpoint and thus was already put into lucene diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index c4ebb165452fe..b758481af1201 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3493,7 +3493,7 @@ public void testDoubleDeliveryReplicaAppendingAndDeleteOnly() throws IOException operation.versionType(), REPLICA, operation.startTime()+1, UNASSIGNED_SEQ_NO, 0); // operations with a seq# equal or lower to the local checkpoint are not indexed to lucene // and the version lookup is skipped - final boolean belowLckp = operation.seqNo() == 0 && retry.seqNo() == 0; + final boolean sameSeqNo = operation.seqNo() == retry.seqNo(); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertLuceneOperations(engine, 1, 0, 0); @@ -3503,19 +3503,19 @@ public void testDoubleDeliveryReplicaAppendingAndDeleteOnly() throws IOException assertEquals(1, engine.getNumVersionLookups()); assertLuceneOperations(engine, 1, 0, 1); Engine.IndexResult retryResult = engine.index(retry); - assertEquals(belowLckp ? 1 : 2, engine.getNumVersionLookups()); + assertEquals(sameSeqNo ? 1 : 2, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); } else { Engine.IndexResult retryResult = engine.index(retry); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(1, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); engine.delete(delete); assertLuceneOperations(engine, 1, 0, 1); - assertEquals(2, engine.getNumVersionLookups()); + assertEquals(1, engine.getNumVersionLookups()); Engine.IndexResult indexResult = engine.index(operation); - assertEquals(belowLckp ? 2 : 3, engine.getNumVersionLookups()); + assertEquals(sameSeqNo ? 1 : 2, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); } @@ -3552,7 +3552,7 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { } else { Engine.IndexResult retryResult = engine.index(retry); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(1, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); Engine.IndexResult indexResult = engine.index(operation); if (operation.seqNo() > retry.seqNo()) { @@ -3560,7 +3560,7 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { } else { assertLuceneOperations(engine, 1, 0, 0); } - assertEquals(belowLckp ? 1 : 2, engine.getNumVersionLookups()); + assertEquals(belowLckp ? 0 : 1, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); } @@ -3596,42 +3596,32 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { public void testDoubleDeliveryReplica() throws IOException { final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); - final boolean msuOptimization; - if (engine.config().getIndexSettings().isSoftDeleteEnabled() == false) { - msuOptimization = false; - } else if (randomBoolean()) { - engine.advanceMaxSeqNoOfUpdatesOrDeletes(randomLongBetween(20, Long.MAX_VALUE)); // disable MSU - msuOptimization = false; - } else { - msuOptimization = true; - } - Engine.Index operation = replicaIndexForDoc(doc, 1, 20, false); Engine.Index duplicate = replicaIndexForDoc(doc, 1, 20, true); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(msuOptimization ? 0 : 1, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumVersionLookups()); assertNotNull(indexResult.getTranslogLocation()); if (randomBoolean()) { engine.refresh("test"); } Engine.IndexResult retryResult = engine.index(duplicate); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(msuOptimization ? 1 : 2, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); } else { Engine.IndexResult retryResult = engine.index(duplicate); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(msuOptimization ? 0 : 1, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); if (randomBoolean()) { engine.refresh("test"); } Engine.IndexResult indexResult = engine.index(operation); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(msuOptimization ? 1 : 2, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); } @@ -5219,113 +5209,6 @@ public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception { } } - public void testTrackMaxSeqNoOfNonAppendOnlyOperations() throws Exception { - IOUtils.close(engine, store); - store = createStore(); - final Path translogPath = createTempDir(); - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) { - final CountDownLatch latch = new CountDownLatch(1); - final Thread appendOnlyIndexer = new Thread(() -> { - try { - latch.countDown(); - final int numDocs = scaledRandomIntBetween(100, 1000); - for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = - testParsedDocument("append-only" + i, null, testDocumentWithTextField(), SOURCE, null); - if (randomBoolean()) { - engine.index(appendOnlyReplica(doc, randomBoolean(), 1, generateNewSeqNo(engine))); - } else { - engine.index(appendOnlyPrimary(doc, randomBoolean(), randomNonNegativeLong())); - } - } - } catch (Exception ex) { - throw new RuntimeException("Failed to index", ex); - } - }); - appendOnlyIndexer.setName("append-only indexer"); - appendOnlyIndexer.start(); - latch.await(); - long maxSeqNoOfNonAppendOnly = SequenceNumbers.NO_OPS_PERFORMED; - final int numOps = scaledRandomIntBetween(100, 1000); - for (int i = 0; i < numOps; i++) { - ParsedDocument parsedDocument = - testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), SOURCE, null); - if (randomBoolean()) { // On replica - update max_seqno for non-append-only operations - final long seqno = generateNewSeqNo(engine); - final Engine.Index doc = replicaIndexForDoc(parsedDocument, 1, seqno, randomBoolean()); - if (randomBoolean()) { - engine.index(doc); - } else { - engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid(), seqno, doc.primaryTerm(), - doc.version(), doc.versionType(), doc.origin(), threadPool.relativeTimeInMillis(), UNASSIGNED_SEQ_NO, 0)); - } - maxSeqNoOfNonAppendOnly = seqno; - } else { // On primary - do not update max_seqno for non-append-only operations - if (randomBoolean()) { - engine.index(indexForDoc(parsedDocument)); - } else { - engine.delete(new Engine.Delete(parsedDocument.type(), parsedDocument.id(), - newUid(parsedDocument.id()), primaryTerm.get())); - } - } - } - appendOnlyIndexer.join(120_000); - assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(maxSeqNoOfNonAppendOnly)); - engine.syncTranslog(); - globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); - engine.flush(); - } - try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) { - assertThat("max_seqno from non-append-only was not bootstrap from the safe commit", - engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(globalCheckpoint.get())); - } - } - - public void testSkipOptimizeForExposedAppendOnlyOperations() throws Exception { - long lookupTimes = 0L; - final int initDocs = between(0, 10); - boolean msuOptimization = engine.config().getIndexSettings().isSoftDeleteEnabled(); - for (int i = 0; i < initDocs; i++) { - index(engine, i); - lookupTimes++; - } - // doc1 is delayed and arrived after a non-append-only op. - final long seqNoAppendOnly1 = generateNewSeqNo(engine); - final long seqnoNormalOp = generateNewSeqNo(engine); - if (msuOptimization && randomBoolean()) { - msuOptimization = false; - engine.advanceMaxSeqNoOfUpdatesOrDeletes(randomLongBetween(seqnoNormalOp, Long.MAX_VALUE)); - } - if (randomBoolean()) { - engine.index(replicaIndexForDoc( - testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, seqnoNormalOp, false)); - if (msuOptimization == false) { - lookupTimes++; - } - } else { - engine.delete(replicaDeleteForDoc("d", 1, seqnoNormalOp, randomNonNegativeLong())); - msuOptimization = false; - lookupTimes++; - } - assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); - assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(seqnoNormalOp)); - - // should not optimize for doc1 and process as a regular doc (eg. look up in version map) - engine.index(appendOnlyReplica(testParsedDocument("append-only-1", null, testDocumentWithTextField(), SOURCE, null), - false, randomNonNegativeLong(), seqNoAppendOnly1)); - if (msuOptimization == false) { - lookupTimes++; - } - assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); - - // optimize for other append-only 2 (its seqno > max_seqno of non-append-only) - do not look up in version map. - engine.index(appendOnlyReplica(testParsedDocument("append-only-2", null, - testDocumentWithTextField(), SOURCE, null), - false, randomNonNegativeLong(), generateNewSeqNo(engine))); - assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); - } - public void testTrimUnsafeCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final int maxSeqNo = 40; diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 5f6abea9a312a..55ce82e0f4885 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -89,6 +89,9 @@ public void testSimpleReplication() throws Exception { final int docCount = randomInt(50); shards.indexDocs(docCount); shards.assertAllEqual(docCount); + for (IndexShard replica : shards.getReplicas()) { + assertThat(EngineTestCase.getNumVersionLookups(getEngine(replica)), equalTo(0L)); + } } } @@ -98,6 +101,9 @@ public void testSimpleAppendOnlyReplication() throws Exception { final int docCount = randomInt(50); shards.appendDocs(docCount); shards.assertAllEqual(docCount); + for (IndexShard replica : shards.getReplicas()) { + assertThat(EngineTestCase.getNumVersionLookups(getEngine(replica)), equalTo(0L)); + } } } @@ -652,7 +658,6 @@ public void testOutOfOrderDeliveryForAppendOnlyOperations() throws Exception { public void testIndexingOptimizationUsingSequenceNumbers() throws Exception { final Set liveDocs = new HashSet<>(); try (ReplicationGroup group = createGroup(2)) { - boolean softDeleteEnabled = group.getPrimary().indexSettings().isSoftDeleteEnabled(); group.startAll(); int numDocs = randomIntBetween(1, 100); long versionLookups = 0; @@ -660,7 +665,7 @@ public void testIndexingOptimizationUsingSequenceNumbers() throws Exception { String id = Integer.toString(randomIntBetween(1, 100)); if (randomBoolean()) { group.index(new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON)); - if (softDeleteEnabled == false || liveDocs.add(id) == false) { + if (liveDocs.add(id) == false) { versionLookups++; } } else { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 799309751ab27..fd9c449de7e0c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -66,25 +66,18 @@ private void preFlight(final Operation operation) { @Override protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException { preFlight(index); - if (hasBeenProcessedBefore(index)) { - if (logger.isTraceEnabled()) { - logger.trace("index operation [id={} seq_no={} origin={}] was processed before", index.id(), index.seqNo(), index.origin()); - } - if (index.origin() == Operation.Origin.PRIMARY) { - /* - * The existing operation in this engine was probably assigned the term of the previous primary shard which is different - * from the term of the current operation. If the current operation arrives on replicas before the previous operation, - * then the Lucene content between the primary and replicas are not identical (primary terms are different). We can safely - * skip the existing operations below the global checkpoint, however must replicate the ones above the global checkpoint - * but with the previous primary term (not the current term of the operation) in order to guarantee the consistency - * between the primary and replicas (see TransportBulkShardOperationsAction#shardOperationOnPrimary). - */ - final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( - shardId, index.seqNo(), lookupPrimaryTerm(index.seqNo())); - return IndexingStrategy.skipDueToVersionConflict(error, false, index.version(), index.primaryTerm()); - } else { - return IndexingStrategy.processButSkipLucene(false, index.version()); - } + if (index.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(index)) { + /* + * The existing operation in this engine was probably assigned the term of the previous primary shard which is different + * from the term of the current operation. If the current operation arrives on replicas before the previous operation, + * then the Lucene content between the primary and replicas are not identical (primary terms are different). We can safely + * skip the existing operations below the global checkpoint, however must replicate the ones above the global checkpoint + * but with the previous primary term (not the current term of the operation) in order to guarantee the consistency + * between the primary and replicas (see TransportBulkShardOperationsAction#shardOperationOnPrimary). + */ + final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( + shardId, index.seqNo(), lookupPrimaryTerm(index.seqNo())); + return IndexingStrategy.skipDueToVersionConflict(error, false, index.version(), index.primaryTerm()); } else { return planIndexingAsNonPrimary(index); } From 500f4194aa90c0af75ca0806e76e8cf990de6c0c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 26 Jun 2019 23:32:18 -0400 Subject: [PATCH 04/10] fix test --- .../org/elasticsearch/index/engine/InternalEngineTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index b758481af1201..6cfa6cbb3e985 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3534,7 +3534,7 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { Engine.Index retry = appendOnlyReplica(doc.get(), true, 1, randomIntBetween(0, 5)); // operations with a seq# equal or lower to the local checkpoint are not indexed to lucene // and the version lookup is skipped - final boolean belowLckp = operation.seqNo() == 0 && retry.seqNo() == 0; + final boolean sameSeqNo = operation.seqNo() == retry.seqNo(); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertLuceneOperations(engine, 1, 0, 0); @@ -3546,7 +3546,7 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { } else { assertLuceneOperations(engine, 1, 0, 0); } - assertEquals(belowLckp ? 0 : 1, engine.getNumVersionLookups()); + assertEquals(sameSeqNo ? 0 : 1, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); } else { @@ -3560,7 +3560,7 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { } else { assertLuceneOperations(engine, 1, 0, 0); } - assertEquals(belowLckp ? 0 : 1, engine.getNumVersionLookups()); + assertEquals(sameSeqNo ? 0 : 1, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); } From 86beef4b6cf8bac4d8cf649a70114e7bfb9d8cfe Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 26 Jun 2019 23:35:52 -0400 Subject: [PATCH 05/10] wording --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +- .../org/elasticsearch/index/engine/InternalEngineTests.java | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 32c030bab059d..076d0f324b37f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -947,7 +947,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO // a delete state and return false for the created flag in favor of code simplicity final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); if (hasBeenProcessedBefore(index)) { - // the operation seq# is lower then the current local checkpoint and thus was already put into lucene + // the operation seq# was processed and thus the same operation was already put into lucene // this can happen during recovery where older operations are sent from the translog that are already // part of the lucene commit (either from a peer recovery or a local translog) // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 6cfa6cbb3e985..21e32ffe07736 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -262,7 +262,9 @@ public void testVersionMapAfterAutoIDDocument() throws IOException { try (Engine.Searcher searcher = engine.acquireSearcher("test")) { assertEquals(2, searcher.reader().numDocs()); } - assertFalse("safe access should NOT be required last indexing round was only append only", engine.isSafeAccessRequired()); + if (operation.origin() == PRIMARY) { + assertFalse("safe access should NOT be required last indexing round was only append only", engine.isSafeAccessRequired()); + } engine.delete(new Engine.Delete(operation.type(), operation.id(), operation.uid(), primaryTerm.get())); assertTrue("safe access should be required", engine.isSafeAccessRequired()); engine.refresh("test"); From b855a3a82260785ac83523d84c30a78244c4611b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 27 Jun 2019 07:29:33 -0400 Subject: [PATCH 06/10] at most one document per sequence number --- .../index/engine/InternalEngine.java | 6 +-- .../engine/LuceneChangesSnapshotTests.java | 12 +++--- .../index/engine/EngineTestCase.java | 38 +++++++++++++++++++ .../index/shard/IndexShardTestCase.java | 8 ++++ .../test/InternalTestCluster.java | 1 + .../index/engine/FollowingEngineTests.java | 3 ++ 6 files changed, 58 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 076d0f324b37f..2df96bd7773ca 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1289,8 +1289,8 @@ protected DeletionStrategy deletionStrategyForOperation(final Delete delete) thr protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException { assert assertNonPrimaryOrigin(delete); final DeletionStrategy plan; - if (delete.seqNo() <= localCheckpointTracker.getProcessedCheckpoint()) { - // the operation seq# is lower then the current local checkpoint and thus was already put into lucene + if (hasBeenProcessedBefore(delete)) { + // the operation seq# was processed thus this operation was already put into lucene // this can happen during recovery where older operations are sent from the translog that are already // part of the lucene commit (either from a peer recovery or a local translog) // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in @@ -1467,7 +1467,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { noOpResult = new NoOpResult(getPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, preFlightError.get()); } else { markSeqNoAsSeen(noOp.seqNo()); - if (softDeleteEnabled) { + if (softDeleteEnabled && hasBeenProcessedBefore(noOp) == false) { try { final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason()); tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm()); diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index f6327e8132cea..59b45fae38071 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -150,18 +150,16 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f } /** - * If an operation above the local checkpoint is delivered multiple times, an engine will add multiple copies of that operation - * into Lucene (only the first copy is non-stale; others are stale and soft-deleted). Moreover, a nested document is indexed into - * Lucene as multiple documents (only the root document has both seq_no and term, non-root docs only have seq_no). This test verifies - * that {@link LuceneChangesSnapshot} returns exactly one operation per seq_no, and skip non-root nested documents or stale copies. + * A nested document is indexed into Lucene as multiple documents. While the root document has both sequence number and primary term, + * non-root documents don't have primary term but only sequence numbers. This test verifies that {@link LuceneChangesSnapshot} + * correctly skip non-root documents and returns at most one operation per sequence number. */ - public void testSkipStaleOrNonRootOfNestedDocuments() throws Exception { + public void testSkipNonRootOfNestedDocuments() throws Exception { Map seqNoToTerm = new HashMap<>(); List operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean()); int totalOps = 0; for (Engine.Operation op : operations) { - // Engine skips deletes or indexes below the local checkpoint - if (engine.getProcessedLocalCheckpoint() < op.seqNo() || op instanceof Engine.NoOp) { + if (engine.getLocalCheckpointTracker().hasProcessed(op.seqNo()) == false) { seqNoToTerm.put(op.seqNo(), op.primaryTerm()); if (op instanceof Engine.Index) { totalOps += ((Engine.Index) op).docs().size(); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 3aa8700fe0097..8e074cdba616c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -44,6 +44,7 @@ import org.apache.lucene.search.Sort; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TotalHitCountCollector; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; @@ -75,6 +76,7 @@ import org.elasticsearch.index.MapperTestUtils; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.fieldvisitor.IdOnlyFieldVisitor; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; @@ -112,6 +114,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -265,11 +268,13 @@ public void tearDown() throws Exception { engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test")); assertMaxSeqNoInCommitUserData(engine); + assertAtMostOneLuceneDocumentPerSequenceNumber(engine); } if (replicaEngine != null && replicaEngine.isClosed.get() == false) { replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test")); assertMaxSeqNoInCommitUserData(replicaEngine); + assertAtMostOneLuceneDocumentPerSequenceNumber(replicaEngine); } assertThat(engine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L)); assertThat(replicaEngine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L)); @@ -1118,6 +1123,39 @@ public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exceptio } } + public static void assertAtMostOneLuceneDocumentPerSequenceNumber(Engine engine) throws IOException { + if (engine.config().getIndexSettings().isSoftDeleteEnabled() == false) { + return; + } + try { + engine.refresh("test"); + try (Engine.Searcher searcher = engine.acquireSearcher("test")) { + DirectoryReader reader = Lucene.wrapAllDocsLive(searcher.getDirectoryReader()); + Set seqNos = new HashSet<>(); + for (LeafReaderContext leaf : reader.leaves()) { + NumericDocValues primaryTermDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + int docId; + while ((docId = seqNoDocValues.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + assertTrue(seqNoDocValues.advanceExact(docId)); + if (primaryTermDocValues.advanceExact(docId)) { + assertThat(primaryTermDocValues.longValue(), greaterThanOrEqualTo(1L)); + long seqNo = seqNoDocValues.longValue(); + assertThat(seqNo, greaterThanOrEqualTo(0L)); + if (seqNos.add(seqNo) == false) { + final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor(); + leaf.reader().document(docId, idFieldVisitor); + throw new AssertionError("found multiple documents for seq=" + seqNo + " id=" + idFieldVisitor.getId()); + } + } + } + } + } + } catch (AlreadyClosedException ignored) { + + } + } + public static MapperService createMapperService(String type) throws IOException { IndexMetaData indexMetaData = IndexMetaData.builder("test") .settings(Settings.builder() diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 4b5be29205778..ee8627fab7ded 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -518,6 +518,7 @@ protected void closeShard(IndexShard shard, boolean assertConsistencyBetweenTran if (assertConsistencyBetweenTranslogAndLucene) { assertConsistentHistoryBetweenTranslogAndLucene(shard); } + assertAtMostOneLuceneDocumentPerSequenceNumber(shard); } finally { IOUtils.close(() -> shard.close("test", false), shard.store()); } @@ -730,6 +731,13 @@ public static void assertConsistentHistoryBetweenTranslogAndLucene(IndexShard sh } } + public static void assertAtMostOneLuceneDocumentPerSequenceNumber(IndexShard shard) throws IOException { + final Engine engine = shard.getEngineOrNull(); + if (engine != null) { + EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber(engine); + } + } + protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id) throws IOException { return indexDoc(shard, type, id, "{}"); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index ef894c3cfd3ed..46bf3c3f797d1 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1236,6 +1236,7 @@ public void assertConsistentHistoryBetweenTranslogAndLuceneIndex() throws IOExce for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { try { + IndexShardTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber(indexShard); IndexShardTestCase.assertConsistentHistoryBetweenTranslogAndLucene(indexShard); } catch (AlreadyClosedException ignored) { // shard is closed diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index dccfe405bf5b9..e6fba3f5741ae 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static org.elasticsearch.index.engine.EngineTestCase.createMapperService; import static org.elasticsearch.index.engine.EngineTestCase.getDocIds; import static org.elasticsearch.index.engine.EngineTestCase.getNumVersionLookups; import static org.elasticsearch.index.engine.EngineTestCase.getTranslog; @@ -485,6 +486,8 @@ private void runFollowTest(CheckedBiConsumer Date: Thu, 27 Jun 2019 12:43:21 -0400 Subject: [PATCH 07/10] init primary term in tests --- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 1 + .../java/org/elasticsearch/index/engine/EngineTestCase.java | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index f07b8c977c7fb..5b04d6866bf6c 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -4018,6 +4018,7 @@ public void testTypelessDelete() throws IOException { IndexMetaData metaData = IndexMetaData.builder("index") .putMapping("some_type", "{ \"properties\": {}}") .settings(settings) + .primaryTerm(0, 1) .build(); IndexShard shard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); recoverShardFromStore(shard); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 8e074cdba616c..e0c6863a3106c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1138,10 +1138,10 @@ public static void assertAtMostOneLuceneDocumentPerSequenceNumber(Engine engine) int docId; while ((docId = seqNoDocValues.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { assertTrue(seqNoDocValues.advanceExact(docId)); + long seqNo = seqNoDocValues.longValue(); + assertThat(seqNo, greaterThanOrEqualTo(0L)); if (primaryTermDocValues.advanceExact(docId)) { assertThat(primaryTermDocValues.longValue(), greaterThanOrEqualTo(1L)); - long seqNo = seqNoDocValues.longValue(); - assertThat(seqNo, greaterThanOrEqualTo(0L)); if (seqNos.add(seqNo) == false) { final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor(); leaf.reader().document(docId, idFieldVisitor); From 782383c16d2cf36ffea8314535649b6f3ab839e6 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 27 Jun 2019 14:24:00 -0400 Subject: [PATCH 08/10] remove the primary term assertion - many tests failing will address in a follow up --- .../main/java/org/elasticsearch/index/engine/EngineTestCase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index e0c6863a3106c..ec73fe1a89dea 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1141,7 +1141,6 @@ public static void assertAtMostOneLuceneDocumentPerSequenceNumber(Engine engine) long seqNo = seqNoDocValues.longValue(); assertThat(seqNo, greaterThanOrEqualTo(0L)); if (primaryTermDocValues.advanceExact(docId)) { - assertThat(primaryTermDocValues.longValue(), greaterThanOrEqualTo(1L)); if (seqNos.add(seqNo) == false) { final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor(); leaf.reader().document(docId, idFieldVisitor); From 90b69c61faaf1d7ac4d387dff6c507ddb61ccbc0 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 27 Jun 2019 15:33:38 -0400 Subject: [PATCH 09/10] do not one doc per seq_no in shrink tests --- .../index/shard/IndexShardTestCase.java | 12 ++++-------- .../org/elasticsearch/test/InternalTestCluster.java | 1 - 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index ee8627fab7ded..10545fbc95f9c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -518,7 +518,10 @@ protected void closeShard(IndexShard shard, boolean assertConsistencyBetweenTran if (assertConsistencyBetweenTranslogAndLucene) { assertConsistentHistoryBetweenTranslogAndLucene(shard); } - assertAtMostOneLuceneDocumentPerSequenceNumber(shard); + final Engine engine = shard.getEngineOrNull(); + if (engine != null) { + EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber(engine); + } } finally { IOUtils.close(() -> shard.close("test", false), shard.store()); } @@ -731,13 +734,6 @@ public static void assertConsistentHistoryBetweenTranslogAndLucene(IndexShard sh } } - public static void assertAtMostOneLuceneDocumentPerSequenceNumber(IndexShard shard) throws IOException { - final Engine engine = shard.getEngineOrNull(); - if (engine != null) { - EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber(engine); - } - } - protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id) throws IOException { return indexDoc(shard, type, id, "{}"); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 46bf3c3f797d1..ef894c3cfd3ed 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1236,7 +1236,6 @@ public void assertConsistentHistoryBetweenTranslogAndLuceneIndex() throws IOExce for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { try { - IndexShardTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber(indexShard); IndexShardTestCase.assertConsistentHistoryBetweenTranslogAndLucene(indexShard); } catch (AlreadyClosedException ignored) { // shard is closed From 127b2fe83dc2e90ef25192d14ddaf70129eb76c5 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 27 Jun 2019 16:14:22 -0400 Subject: [PATCH 10/10] assert only internal engine --- .../java/org/elasticsearch/index/engine/EngineTestCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index ec73fe1a89dea..fe9660d92a3df 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1124,7 +1124,7 @@ public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exceptio } public static void assertAtMostOneLuceneDocumentPerSequenceNumber(Engine engine) throws IOException { - if (engine.config().getIndexSettings().isSoftDeleteEnabled() == false) { + if (engine.config().getIndexSettings().isSoftDeleteEnabled() == false || engine instanceof InternalEngine == false) { return; } try {