From 4e543537e1aad3a3f2d7a5921c93679479b4dc2f Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 24 Apr 2018 22:47:59 +0200 Subject: [PATCH 1/5] double docs history --- .../index/engine/InternalEngineTests.java | 111 +++++++++++++----- 1 file changed, 80 insertions(+), 31 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index ea4b53f16c003..6b6d36b418833 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -133,6 +133,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -1385,18 +1386,13 @@ public void testVersioningCreateExistsException() throws IOException { } protected List generateSingleDocHistory(boolean forReplica, VersionType versionType, - boolean partialOldPrimary, long primaryTerm, - int minOpCount, int maxOpCount) { + long primaryTerm, + int minOpCount, int maxOpCount, String docId) { final int numOfOps = randomIntBetween(minOpCount, maxOpCount); final List ops = new ArrayList<>(); - final Term id = newUid("1"); - final int startWithSeqNo; - if (partialOldPrimary) { - startWithSeqNo = randomBoolean() ? numOfOps - 1 : randomIntBetween(0, numOfOps - 1); - } else { - startWithSeqNo = 0; - } - final String valuePrefix = forReplica ? "r_" : "p_"; + final Term id = newUid(docId); + final int startWithSeqNo = 0; + final String valuePrefix = (forReplica ? "r_" : "p_" ) + docId + "_"; final boolean incrementTermWhenIntroducingSeqNo = randomBoolean(); for (int i = 0; i < numOfOps; i++) { final Engine.Operation op; @@ -1418,7 +1414,7 @@ protected List generateSingleDocHistory(boolean forReplica, Ve throw new UnsupportedOperationException("unknown version type: " + versionType); } if (randomBoolean()) { - op = new Engine.Index(id, testParsedDocument("1", null, testDocumentWithTextField(valuePrefix + i), B_1, null), + op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), B_1, null), forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, @@ -1427,7 +1423,7 @@ protected List generateSingleDocHistory(boolean forReplica, Ve System.currentTimeMillis(), -1, false ); } else { - op = new Engine.Delete("test", "1", id, + op = new Engine.Delete("test", docId, id, forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, @@ -1442,7 +1438,7 @@ protected List generateSingleDocHistory(boolean forReplica, Ve public void testOutOfOrderDocsOnReplica() throws IOException { final List ops = generateSingleDocHistory(true, - randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20); + randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE), 2, 2, 20, "1"); assertOpsOnReplica(ops, replicaEngine, true); } @@ -1512,27 +1508,80 @@ private void assertOpsOnReplica(List ops, InternalEngine repli } public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedException { - final List ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 100, 300); - final Engine.Operation lastOp = ops.get(ops.size() - 1); - final String lastFieldValue; - if (lastOp instanceof Engine.Index) { - Engine.Index index = (Engine.Index) lastOp; - lastFieldValue = index.docs().get(0).get("value"); + final List opsDoc1 = + generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 100, 300, "1"); + final Engine.Operation lastOpDoc1 = opsDoc1.get(opsDoc1.size() - 1); + final String lastFieldValueDoc1; + if (lastOpDoc1 instanceof Engine.Index) { + Engine.Index index = (Engine.Index) lastOpDoc1; + lastFieldValueDoc1 = index.docs().get(0).get("value"); } else { // delete - lastFieldValue = null; + lastFieldValueDoc1 = null; + } + final List opsDoc2 = + generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 100, 300, "2"); + final Engine.Operation lastOpDoc2 = opsDoc2.get(opsDoc2.size() - 1); + final String lastFieldValueDoc2; + if (lastOpDoc2 instanceof Engine.Index) { + Engine.Index index = (Engine.Index) lastOpDoc2; + lastFieldValueDoc2 = index.docs().get(0).get("value"); + } else { + // delete + lastFieldValueDoc2 = null; } - shuffle(ops, random()); - concurrentlyApplyOps(ops, engine); + // randomly interleave + AtomicLong seqNoGenerator = new AtomicLong(); + Function seqNoUpdater = operation -> { + final long newSeqNo = seqNoGenerator.getAndIncrement(); + if (operation instanceof Engine.Index) { + Engine.Index index = (Engine.Index) operation; + return new Engine.Index(index.uid(), index.parsedDoc(), newSeqNo, index.primaryTerm(), index.version(), + index.versionType(), index.origin(), index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry()); + } else { + Engine.Delete delete = (Engine.Delete) operation; + return new Engine.Delete(delete.type(), delete.id(), delete.uid(), newSeqNo, delete.primaryTerm(), + delete.version(), delete.versionType(), delete.origin(), delete.startTime()); + } + }; + final List allOps = new ArrayList<>(); + Iterator iter1 = opsDoc1.iterator(); + Iterator iter2 = opsDoc2.iterator(); + while (iter1.hasNext() && iter2.hasNext()) { + final Engine.Operation next = randomBoolean() ? iter1.next() : iter2.next(); + allOps.add(seqNoUpdater.apply(next)); + } + iter1.forEachRemaining(o -> allOps.add(seqNoUpdater.apply(o))); + iter2.forEachRemaining(o -> allOps.add(seqNoUpdater.apply(o))); + shuffle(allOps, random()); + concurrentlyApplyOps(allOps, engine); - assertVisibleCount(engine, lastFieldValue == null ? 0 : 1); - if (lastFieldValue != null) { + + engine.refresh("test"); + + if (lastFieldValueDoc1 != null) { try (Searcher searcher = engine.acquireSearcher("test")) { final TotalHitCountCollector collector = new TotalHitCountCollector(); - searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector); + searcher.searcher().search(new TermQuery(new Term("value", lastFieldValueDoc1)), collector); assertThat(collector.getTotalHits(), equalTo(1)); } } + if (lastFieldValueDoc2 != null) { + try (Searcher searcher = engine.acquireSearcher("test")) { + final TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.searcher().search(new TermQuery(new Term("value", lastFieldValueDoc2)), collector); + assertThat(collector.getTotalHits(), equalTo(1)); + } + } + + int totalExpectedOps = 0; + if (lastFieldValueDoc1 != null) { + totalExpectedOps++; + } + if (lastFieldValueDoc2 != null) { + totalExpectedOps++; + } + assertVisibleCount(engine, totalExpectedOps); } private void concurrentlyApplyOps(List ops, InternalEngine engine) throws InterruptedException { @@ -1572,12 +1621,12 @@ private void concurrentlyApplyOps(List ops, InternalEngine eng } public void testInternalVersioningOnPrimary() throws IOException { - final List ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); + final List ops = generateSingleDocHistory(false, VersionType.INTERNAL, 2, 2, 20, "1"); assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine); } public void testVersionOnPrimaryWithConcurrentRefresh() throws Exception { - List ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 10, 100); + List ops = generateSingleDocHistory(false, VersionType.INTERNAL, 2, 10, 100, "1"); CountDownLatch latch = new CountDownLatch(1); AtomicBoolean running = new AtomicBoolean(true); Thread refreshThread = new Thread(() -> { @@ -1697,7 +1746,7 @@ public void testNonInternalVersioningOnPrimary() throws IOException { final Set nonInternalVersioning = new HashSet<>(Arrays.asList(VersionType.values())); nonInternalVersioning.remove(VersionType.INTERNAL); final VersionType versionType = randomFrom(nonInternalVersioning); - final List ops = generateSingleDocHistory(false, versionType, false, 2, 2, 20); + final List ops = generateSingleDocHistory(false, versionType, 2, 2, 20, "1"); final Engine.Operation lastOp = ops.get(ops.size() - 1); final String lastFieldValue; if (lastOp instanceof Engine.Index) { @@ -1775,8 +1824,8 @@ public void testNonInternalVersioningOnPrimary() throws IOException { } public void testVersioningPromotedReplica() throws IOException { - final List replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, false, 1, 2, 20); - List primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); + final List replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, 1, 2, 20, "1"); + List primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, 2, 2, 20, "1"); Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1); final boolean deletedOnReplica = lastReplicaOp instanceof Engine.Delete; final long finalReplicaVersion = lastReplicaOp.version(); @@ -1796,7 +1845,7 @@ public void testVersioningPromotedReplica() throws IOException { } public void testConcurrentExternalVersioningOnPrimary() throws IOException, InterruptedException { - final List ops = generateSingleDocHistory(false, VersionType.EXTERNAL, false, 2, 100, 300); + final List ops = generateSingleDocHistory(false, VersionType.EXTERNAL, 2, 100, 300, "1"); final Engine.Operation lastOp = ops.get(ops.size() - 1); final String lastFieldValue; if (lastOp instanceof Engine.Index) { From 89592f70d5ac8ffa17e466440309ee4fd7bd7c72 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 25 Apr 2018 11:30:33 +0200 Subject: [PATCH 2/5] Add duplicates --- .../org/elasticsearch/index/engine/InternalEngineTests.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 6b6d36b418833..375c4ce9068e9 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1553,6 +1553,9 @@ public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedEx } iter1.forEachRemaining(o -> allOps.add(seqNoUpdater.apply(o))); iter2.forEachRemaining(o -> allOps.add(seqNoUpdater.apply(o))); + // insert some duplicates + allOps.addAll(randomSubsetOf(allOps)); + shuffle(allOps, random()); concurrentlyApplyOps(allOps, engine); From 15d61e3f0f8d1d99fe2b8d096b962ccfa084192c Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 25 Apr 2018 11:36:41 +0200 Subject: [PATCH 3/5] test name typo --- .../org/elasticsearch/index/engine/InternalEngineTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 375c4ce9068e9..4fdbb6aa90ae2 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1507,7 +1507,7 @@ private void assertOpsOnReplica(List ops, InternalEngine repli } } - public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedException { + public void testConcurrentOutOfOrderDocsOnReplica() throws IOException, InterruptedException { final List opsDoc1 = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 100, 300, "1"); final Engine.Operation lastOpDoc1 = opsDoc1.get(opsDoc1.size() - 1); From 66cf68da96fb293e12e0172776ba0e3936452588 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 25 Apr 2018 11:40:56 +0200 Subject: [PATCH 4/5] format --- .../org/elasticsearch/index/engine/InternalEngineTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 4fdbb6aa90ae2..bb1bd99594ac8 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1549,7 +1549,7 @@ public void testConcurrentOutOfOrderDocsOnReplica() throws IOException, Interrup Iterator iter2 = opsDoc2.iterator(); while (iter1.hasNext() && iter2.hasNext()) { final Engine.Operation next = randomBoolean() ? iter1.next() : iter2.next(); - allOps.add(seqNoUpdater.apply(next)); + allOps.add(seqNoUpdater.apply(next)); } iter1.forEachRemaining(o -> allOps.add(seqNoUpdater.apply(o))); iter2.forEachRemaining(o -> allOps.add(seqNoUpdater.apply(o))); From 03f75f6ae3b4ef98e20c6c16410388a44cd5abad Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 3 May 2018 10:32:42 +0200 Subject: [PATCH 5/5] iter --- .../org/elasticsearch/index/engine/InternalEngineTests.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 26547b53e249e..979c44dd5fc8d 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1531,7 +1531,7 @@ public void testConcurrentOutOfOrderDocsOnReplica() throws IOException, Interrup lastFieldValueDoc2 = null; } // randomly interleave - AtomicLong seqNoGenerator = new AtomicLong(); + final AtomicLong seqNoGenerator = new AtomicLong(); Function seqNoUpdater = operation -> { final long newSeqNo = seqNoGenerator.getAndIncrement(); if (operation instanceof Engine.Index) { @@ -1559,7 +1559,6 @@ public void testConcurrentOutOfOrderDocsOnReplica() throws IOException, Interrup shuffle(allOps, random()); concurrentlyApplyOps(allOps, engine); - engine.refresh("test"); if (lastFieldValueDoc1 != null) {