From 1e27505f83f9e668397147fb1b1246d117277aed Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 19 Oct 2018 23:28:45 -0400 Subject: [PATCH 1/5] Remove LocalCheckpointTracker#resetCheckpoint This commit removes resetCheckpoint from LocalCheckpoinTracker and rewrites testDedupByPrimaryTerm without resetting the local checkpoint. --- .../index/seqno/LocalCheckpointTracker.java | 13 --- .../engine/LuceneChangesSnapshotTests.java | 91 +++++++++++++------ .../seqno/LocalCheckpointTrackerTests.java | 32 ------- .../index/engine/EngineTestCase.java | 10 +- .../index/engine/FollowingEngineTests.java | 6 +- 5 files changed, 71 insertions(+), 81 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 87895ca7fe4ad..8249e2600ad55 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -103,19 +103,6 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) { } } - /** - * Resets the checkpoint to the specified value. - * - * @param checkpoint the local checkpoint to reset this tracker to - */ - public synchronized void resetCheckpoint(final long checkpoint) { - // TODO: remove this method as after we restore the local history on promotion. - assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO; - assert checkpoint <= this.checkpoint; - processedSeqNo.clear(); - this.checkpoint = checkpoint; - } - /** * The current checkpoint which can be advanced by {@link #markSeqNoAsCompleted(long)}. * diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index 412b91aaef200..900bdbabe5807 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -19,10 +19,11 @@ package org.elasticsearch.index.engine; +import org.elasticsearch.common.CheckedBiFunction; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.translog.SnapshotMatchers; @@ -32,7 +33,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -151,33 +151,68 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f } public void testDedupByPrimaryTerm() throws Exception { - Map latestOperations = new HashMap<>(); - List terms = Arrays.asList(between(1, 1000), between(1000, 2000)); - int totalOps = 0; - for (long term : terms) { - final List ops = generateSingleDocHistory(true, - randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE), term, 2, 20, "1"); - primaryTerm.set(Math.max(primaryTerm.get(), term)); - engine.rollTranslogGeneration(); - for (Engine.Operation op : ops) { - // We need to simulate a rollback here as only ops after local checkpoint get into the engine - if (op.seqNo() <= engine.getLocalCheckpointTracker().getCheckpoint()) { - engine.getLocalCheckpointTracker().resetCheckpoint(randomLongBetween(-1, op.seqNo() - 1)); - engine.rollTranslogGeneration(); + Map seqNoToTerm = new HashMap<>(); + final CheckedBiFunction nestedDocFactory = nestedParsedDocFactory(); + int numOps = between(1, 100); + List operations = new ArrayList<>(); + for (int seqNo = 0; seqNo < numOps; seqNo++) { + if (rarely()) { + continue; // make gap in sequence number + } + final String docId = Integer.toString(between(1, 100)); + final long startTime = randomNonNegativeLong(); + final long term = randomLongBetween(1, primaryTerm.get()); + final Engine.Operation.TYPE opType = randomFrom(Engine.Operation.TYPE.values()); + final boolean nestedDocs = randomBoolean(); + final int nestedValues = between(0, 3); + int copies = frequently() ? 1 : between(2, 5); + for (int i = 0; i < copies; i++) { + final ParsedDocument doc = nestedDocs ? nestedDocFactory.apply(docId, nestedValues) : createParsedDoc(docId, null); + final Engine.Operation op; + switch (opType) { + case INDEX: + op = new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, term, 1L, + null, Engine.Operation.Origin.REPLICA, startTime, -1, true); + break; + case DELETE: + new Engine.Delete(doc.type(), docId, EngineTestCase.newUid(docId), seqNo, term, 1L, + null, Engine.Operation.Origin.REPLICA, startTime); + case NO_OP: + op = new Engine.NoOp(seqNo, term, Engine.Operation.Origin.REPLICA, startTime, "test-" + seqNo); + break; + default: + throw new IllegalStateException("invalid operation type [" + opType + "]"); } + operations.add(op); + } + } + Randomness.shuffle(operations); + int totalOps = 0; + for (Engine.Operation op : operations) { + // Engine skips deletes or indexes below the local checkpoint + if (engine.getLocalCheckpoint() < op.seqNo() || op instanceof Engine.NoOp) { + seqNoToTerm.put(op.seqNo(), op.primaryTerm()); if (op instanceof Engine.Index) { - engine.index((Engine.Index) op); - } else if (op instanceof Engine.Delete) { - engine.delete((Engine.Delete) op); - } - latestOperations.put(op.seqNo(), op.primaryTerm()); - if (rarely()) { - engine.refresh("test"); - } - if (rarely()) { - engine.flush(); + totalOps += ((Engine.Index) op).docs().size(); + } else { + totalOps++; } - totalOps++; + } + if (op instanceof Engine.Index) { + engine.index((Engine.Index) op); + } else if (op instanceof Engine.Delete) { + engine.delete((Engine.Delete) op); + } else if (op instanceof Engine.NoOp) { + engine.noOp((Engine.NoOp) op); + } + if (rarely()) { + engine.refresh("test"); + } + if (rarely()) { + engine.rollTranslogGeneration(); + } + if (rarely()) { + engine.flush(); } } long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo(); @@ -187,9 +222,9 @@ public void testDedupByPrimaryTerm() throws Exception { searcher = null; Translog.Operation op; while ((op = snapshot.next()) != null) { - assertThat(op.toString(), op.primaryTerm(), equalTo(latestOperations.get(op.seqNo()))); + assertThat(op.toString(), op.primaryTerm(), equalTo(seqNoToTerm.get(op.seqNo()))); } - assertThat(snapshot.skippedOperations(), equalTo(totalOps - latestOperations.size())); + assertThat(snapshot.skippedOperations(), equalTo(totalOps - seqNoToTerm.size())); } finally { IOUtils.close(searcher); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index 789a60ec55dba..44b3794ea6d42 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -19,13 +19,10 @@ package org.elasticsearch.index.seqno; -import com.carrotsearch.hppc.LongObjectHashMap; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.test.ESTestCase; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; import org.junit.Before; import java.util.ArrayList; @@ -266,35 +263,6 @@ public void testWaitForOpsToComplete() throws BrokenBarrierException, Interrupte thread.join(); } - public void testResetCheckpoint() { - final int operations = 1024 - scaledRandomIntBetween(0, 1024); - int maxSeqNo = Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED); - for (int i = 0; i < operations; i++) { - if (!rarely()) { - tracker.markSeqNoAsCompleted(i); - maxSeqNo = i; - } - } - - final int localCheckpoint = - randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(tracker.getCheckpoint())); - tracker.resetCheckpoint(localCheckpoint); - assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint)); - assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo)); - assertThat(tracker.processedSeqNo, new BaseMatcher>() { - @Override - public boolean matches(Object item) { - return (item instanceof LongObjectHashMap && ((LongObjectHashMap) item).isEmpty()); - } - - @Override - public void describeTo(Description description) { - description.appendText("empty"); - } - }); - assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1))); - } - public void testContains() { final long maxSeqNo = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, 100); final long localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 3e563e6d5382e..b84bef35d1525 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -49,6 +49,7 @@ import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.AllocationId; +import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -314,18 +315,17 @@ protected static ParsedDocument testParsedDocument( mappingUpdate); } - public static CheckedFunction nestedParsedDocFactory() throws Exception { + public static CheckedBiFunction nestedParsedDocFactory() throws Exception { final MapperService mapperService = createMapperService("type"); final String nestedMapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type") .startObject("properties").startObject("nested_field").field("type", "nested").endObject().endObject() .endObject().endObject()); final DocumentMapper nestedMapper = mapperService.documentMapperParser().parse("type", new CompressedXContent(nestedMapping)); - return docId -> { + return (docId, nestedFieldValues) -> { final XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("field", "value"); - final int nestedValues = between(0, 3); - if (nestedValues > 0) { + if (nestedFieldValues > 0) { XContentBuilder nestedField = source.startObject("nested_field"); - for (int i = 0; i < nestedValues; i++) { + for (int i = 0; i < nestedFieldValues; i++) { nestedField.field("field-" + i, "value-" + i); } source.endObject(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 9e62eb6cfa102..36619d14b8b4f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -14,7 +14,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.CheckedBiConsumer; -import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -563,12 +563,12 @@ public void testProcessOnceOnPrimary() throws Exception { .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); - final CheckedFunction nestedDocFactory = EngineTestCase.nestedParsedDocFactory(); + final CheckedBiFunction nestedDocFunc = EngineTestCase.nestedParsedDocFactory(); int numOps = between(10, 100); List operations = new ArrayList<>(numOps); for (int i = 0; i < numOps; i++) { String docId = Integer.toString(between(1, 100)); - ParsedDocument doc = randomBoolean() ? EngineTestCase.createParsedDoc(docId, null) : nestedDocFactory.apply(docId); + ParsedDocument doc = randomBoolean() ? EngineTestCase.createParsedDoc(docId, null) : nestedDocFunc.apply(docId, randomInt(3)); if (randomBoolean()) { operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, i, primaryTerm.get(), 1L, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true)); From 90c7fd6196bc71d1779040475da23c4079c06cd8 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 20 Oct 2018 12:22:27 -0400 Subject: [PATCH 2/5] stylecheck --- .../elasticsearch/index/engine/LuceneChangesSnapshotTests.java | 3 ++- .../java/org/elasticsearch/index/engine/EngineTestCase.java | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index 900bdbabe5807..2c0f006d323d2 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -175,8 +175,9 @@ public void testDedupByPrimaryTerm() throws Exception { null, Engine.Operation.Origin.REPLICA, startTime, -1, true); break; case DELETE: - new Engine.Delete(doc.type(), docId, EngineTestCase.newUid(docId), seqNo, term, 1L, + op = new Engine.Delete(doc.type(), docId, EngineTestCase.newUid(docId), seqNo, term, 1L, null, Engine.Operation.Origin.REPLICA, startTime); + break; case NO_OP: op = new Engine.NoOp(seqNo, term, Engine.Operation.Origin.REPLICA, startTime, "test-" + seqNo); break; diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index b84bef35d1525..347a335362a37 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -50,7 +50,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.common.CheckedBiFunction; -import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; From fb62c5bc81be851b8b154c42aab7a2ce57f73c92 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 20 Oct 2018 18:23:44 -0400 Subject: [PATCH 3/5] Revert "TEST: Mute testDedupByPrimaryTerm" This reverts commit 7ab464807d8b31ce48a5fe37c43d922e325e0006. --- .../elasticsearch/index/engine/LuceneChangesSnapshotTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index da5f6501a2f70..2c0f006d323d2 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -150,7 +150,6 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34667") public void testDedupByPrimaryTerm() throws Exception { Map seqNoToTerm = new HashMap<>(); final CheckedBiFunction nestedDocFactory = nestedParsedDocFactory(); From 2c15089db88de120797df85d830288e09556a6a8 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 23 Oct 2018 11:47:57 -0400 Subject: [PATCH 4/5] rename --- .../index/engine/LuceneChangesSnapshotTests.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index 2c0f006d323d2..e3ca35fba60c5 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -150,7 +150,13 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f } } - public void testDedupByPrimaryTerm() throws Exception { + /** + * If an operation above the local checkpoint is delivered multiple times, an engine will add multiple copies of that operation into + * Lucene (only the first copy is non-stale; the remaining are stale and soft-deleted). Moreover, a nested document is indexed into + * Lucene as multiple documents (only the root document has both seq_no and term, non-root docs only have seq_no). This test verifies + * that {@link LuceneChangesSnapshot} returns exactly one operation per seq_no, and skip non-root nested documents or stale copies. + */ + public void testSkipStaleOrNonRootOfNestedDocuments() throws Exception { Map seqNoToTerm = new HashMap<>(); final CheckedBiFunction nestedDocFactory = nestedParsedDocFactory(); int numOps = between(1, 100); From 7e22a4f83fed76ef2362d14ab3b5cae904242cba Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 6 Dec 2018 17:22:44 -0500 Subject: [PATCH 5/5] =?UTF-8?q?use=20=E2=80=9CapplyOperation=E2=80=9D=20ut?= =?UTF-8?q?il?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../index/engine/InternalEngineTests.java | 2 +- .../engine/LuceneChangesSnapshotTests.java | 51 ++----------------- .../index/engine/EngineTestCase.java | 42 ++++++++++----- 3 files changed, 33 insertions(+), 62 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index a34bc77ac1f6e..e87d4ddc01af3 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5464,7 +5464,7 @@ public void testOpenSoftDeletesIndexWithSoftDeletesDisabled() throws Exception { final List docs; try (InternalEngine engine = createEngine( config(softDeletesEnabled, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get))) { - List ops = generateReplicaHistory(between(1, 100), randomBoolean()); + List ops = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean()); applyOperations(engine, ops); globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); engine.syncTranslog(); diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index 8f33fe466f29d..f179cd840c60e 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -19,8 +19,6 @@ package org.elasticsearch.index.engine; -import org.elasticsearch.common.CheckedBiFunction; -import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; @@ -151,49 +149,14 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f } /** - * If an operation above the local checkpoint is delivered multiple times, an engine will add multiple copies of that operation into - * Lucene (only the first copy is non-stale; the remaining are stale and soft-deleted). Moreover, a nested document is indexed into + * If an operation above the local checkpoint is delivered multiple times, an engine will add multiple copies of that operation + * into Lucene (only the first copy is non-stale; others are stale and soft-deleted). Moreover, a nested document is indexed into * Lucene as multiple documents (only the root document has both seq_no and term, non-root docs only have seq_no). This test verifies * that {@link LuceneChangesSnapshot} returns exactly one operation per seq_no, and skip non-root nested documents or stale copies. */ public void testSkipStaleOrNonRootOfNestedDocuments() throws Exception { Map seqNoToTerm = new HashMap<>(); - final CheckedBiFunction nestedDocFactory = nestedParsedDocFactory(); - int numOps = between(1, 100); - List operations = new ArrayList<>(); - for (int seqNo = 0; seqNo < numOps; seqNo++) { - if (rarely()) { - continue; // make gap in sequence number - } - final String docId = Integer.toString(between(1, 100)); - final long startTime = randomNonNegativeLong(); - final long term = randomLongBetween(1, primaryTerm.get()); - final Engine.Operation.TYPE opType = randomFrom(Engine.Operation.TYPE.values()); - final boolean nestedDocs = randomBoolean(); - final int nestedValues = between(0, 3); - int copies = frequently() ? 1 : between(2, 5); - for (int i = 0; i < copies; i++) { - final ParsedDocument doc = nestedDocs ? nestedDocFactory.apply(docId, nestedValues) : createParsedDoc(docId, null); - final Engine.Operation op; - switch (opType) { - case INDEX: - op = new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, term, 1L, - null, Engine.Operation.Origin.REPLICA, startTime, -1, true); - break; - case DELETE: - op = new Engine.Delete(doc.type(), docId, EngineTestCase.newUid(docId), seqNo, term, 1L, - null, Engine.Operation.Origin.REPLICA, startTime); - break; - case NO_OP: - op = new Engine.NoOp(seqNo, term, Engine.Operation.Origin.REPLICA, startTime, "test-" + seqNo); - break; - default: - throw new IllegalStateException("invalid operation type [" + opType + "]"); - } - operations.add(op); - } - } - Randomness.shuffle(operations); + List operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean()); int totalOps = 0; for (Engine.Operation op : operations) { // Engine skips deletes or indexes below the local checkpoint @@ -205,13 +168,7 @@ public void testSkipStaleOrNonRootOfNestedDocuments() throws Exception { totalOps++; } } - if (op instanceof Engine.Index) { - engine.index((Engine.Index) op); - } else if (op instanceof Engine.Delete) { - engine.delete((Engine.Delete) op); - } else if (op instanceof Engine.NoOp) { - engine.noOp((Engine.NoOp) op); - } + applyOperation(engine, op); if (rarely()) { engine.refresh("test"); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 0ec723d2dfd6d..2753df6fdcaeb 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -704,22 +704,36 @@ public static List generateSingleDocHistory(boolean forReplica return ops; } - public List generateReplicaHistory(int numOps, boolean allowGapInSeqNo) { + public List generateHistoryOnReplica(int numOps, boolean allowGapInSeqNo, boolean allowDuplicate, + boolean includeNestedDocs) throws Exception { long seqNo = 0; - List operations = new ArrayList<>(numOps); + final int maxIdValue = randomInt(numOps * 2); + final List operations = new ArrayList<>(numOps); + CheckedBiFunction nestedParsedDocFactory = nestedParsedDocFactory(); for (int i = 0; i < numOps; i++) { - String id = Integer.toString(between(1, 100)); - final ParsedDocument doc = EngineTestCase.createParsedDoc(id, null); - if (randomBoolean()) { - operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(), - i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), - -1, true)); - } else if (randomBoolean()) { - operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(), - i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis())); - } else { - operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA, - threadPool.relativeTimeInMillis(), "test-" + i)); + final String id = Integer.toString(randomInt(maxIdValue)); + final Engine.Operation.TYPE opType = randomFrom(Engine.Operation.TYPE.values()); + final boolean isNestedDoc = includeNestedDocs && opType == Engine.Operation.TYPE.INDEX && randomBoolean(); + final int nestedValues = between(0, 3); + final long startTime = threadPool.relativeTimeInMillis(); + final int copies = allowDuplicate && rarely() ? between(2, 4) : 1; + for (int copy = 0; copy < copies; copy++) { + final ParsedDocument doc = isNestedDoc ? nestedParsedDocFactory.apply(id, nestedValues) : createParsedDoc(id, null); + switch (opType) { + case INDEX: + operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(), + i, null, Engine.Operation.Origin.REPLICA, startTime, -1, true)); + break; + case DELETE: + operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(), + i, null, Engine.Operation.Origin.REPLICA, startTime)); + break; + case NO_OP: + operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA, startTime, "test-" + i)); + break; + default: + throw new IllegalStateException("Unknown operation type [" + opType + "]"); + } } seqNo++; if (allowGapInSeqNo && rarely()) {