From 6030953d72fd2f14220d719e5b47b811eb4c0c34 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 28 Mar 2018 08:30:38 +0100 Subject: [PATCH 1/8] Double-check local checkpoint for staleness Today, when determining if an operation is stale, we compare the seqno against the local checkpoint before looking in the version map. However, in between these two checks the local checkpoint could advance, causing some tombstones to become stale, and then the stale tombstones could be collected. In this situation we might incorrectly decide that the operation is fresh and apply it. To avoid this situation, check the local checkpoint again after calling getVersionFromMap(). Since it only ever increases, this gives the right result despite other concurrent activity. --- .../elasticsearch/index/engine/InternalEngine.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0fda2f04ac5a4..e0e342f0df090 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -589,8 +589,17 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) final OpVsLuceneDocStatus status; VersionValue versionValue = getVersionFromMap(op.uid().bytes()); assert incrementVersionLookup(); - if (versionValue != null) { - if (op.seqNo() > versionValue.seqNo || + if (op.seqNo() <= localCheckpointTracker.getCheckpoint()) { + /* Subtle point: we already compared the seqNo vs the local checkpoint, but the local checkpoint could have + * advanced after that comparison was done. If it did, it may have made some tombstones stale, and if those + * tombstones included one for the present op and were cleared before the call to getVersionFromMap() above, + * then the present op would appear to be fresh and would be applied a second time. To avoid this, we must + * check the local checkpoint again: since the local checkpoint only ever advances, checking it after the + * call to getVersionFromMap() gives the correct result. + */ + status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } else if (versionValue != null) { + if (op.seqNo() > versionValue.seqNo || (op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) status = OpVsLuceneDocStatus.OP_NEWER; else { From 02d3b988c6b1104e96049aff6e6ae9fab7a635c6 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 4 Apr 2018 08:42:24 +0100 Subject: [PATCH 2/8] Add first LCP check to compareOpToLuceneDocBasedOnSeqNo() --- .../elasticsearch/index/engine/InternalEngine.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index e5f433faa722d..318cf11c97c10 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -580,6 +580,17 @@ enum OpVsLuceneDocStatus { private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; final OpVsLuceneDocStatus status; + if (op.seqNo() <= localCheckpointTracker.getCheckpoint()) { + // The operation seq# is lower then the current local checkpoint and thus was already put into Lucene. This + // can happen during recovery where older operations are sent from the translog that are already part of the + // Lucene commit (either from a peer recovery or a local translog) or due to concurrent indexing & recovery. + // For the former it is important to skip Lucene as the operation in question may have been deleted in an + // out of order op that is not replayed. + // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery + // See testRecoveryWithOutOfOrderDelete for an example of peer recovery + return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } + VersionValue versionValue = getVersionFromMap(op.uid().bytes()); assert incrementVersionLookup(); if (op.seqNo() <= localCheckpointTracker.getCheckpoint()) { From a438e4b0b4ad32260557160fa6a92d9e1c707a6a Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 4 Apr 2018 08:42:36 +0100 Subject: [PATCH 3/8] Consistentify comment delimiters --- .../elasticsearch/index/engine/InternalEngine.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 318cf11c97c10..6f5846b39e10c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -594,13 +594,12 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) VersionValue versionValue = getVersionFromMap(op.uid().bytes()); assert incrementVersionLookup(); if (op.seqNo() <= localCheckpointTracker.getCheckpoint()) { - /* Subtle point: we already compared the seqNo vs the local checkpoint, but the local checkpoint could have - * advanced after that comparison was done. If it did, it may have made some tombstones stale, and if those - * tombstones included one for the present op and were cleared before the call to getVersionFromMap() above, - * then the present op would appear to be fresh and would be applied a second time. To avoid this, we must - * check the local checkpoint again: since the local checkpoint only ever advances, checking it after the - * call to getVersionFromMap() gives the correct result. - */ + // Subtle point: we already compared the seqNo vs the local checkpoint, but the local checkpoint could have + // advanced after that comparison was done. If it did, it may have made some tombstones stale, and if those + // tombstones included one for the present op and were cleared before the call to getVersionFromMap() above, + // then the present op would appear to be fresh and would be applied a second time. To avoid this, we must + // check the local checkpoint again: since the local checkpoint only ever advances, checking it after the + // call to getVersionFromMap() gives the correct result. status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } else if (versionValue != null) { if (op.seqNo() > versionValue.seqNo || From bd9544bea0bf5fb6032b700e9915ad7b1751f3fc Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 4 Apr 2018 08:43:33 +0100 Subject: [PATCH 4/8] Remove duplicated check from indexing flow --- .../elasticsearch/index/engine/InternalEngine.java | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 6f5846b39e10c..264f00bbe2778 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -850,19 +850,7 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio // unlike the primary, replicas don't really care to about creation status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return false for the created flag in favor of code simplicity - final OpVsLuceneDocStatus opVsLucene; - if (index.seqNo() <= localCheckpointTracker.getCheckpoint()){ - // the operation seq# is lower then the current local checkpoint and thus was already put into lucene - // this can happen during recovery where older operations are sent from the translog that are already - // part of the lucene commit (either from a peer recovery or a local translog) - // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in - // question may have been deleted in an out of order op that is not replayed. - // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery - // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; - } else { - opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); - } + final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); } else { From dd4521c27d72511f773920bfb30166adb5ad8648 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 4 Apr 2018 08:44:03 +0100 Subject: [PATCH 5/8] Remove duplicated check from deletion flow --- .../elasticsearch/index/engine/InternalEngine.java | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 264f00bbe2778..19d74b6381336 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1136,19 +1136,7 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept // unlike the primary, replicas don't really care to about found status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return true for the found flag in favor of code simplicity - final OpVsLuceneDocStatus opVsLucene; - if (delete.seqNo() <= localCheckpointTracker.getCheckpoint()) { - // the operation seq# is lower then the current local checkpoint and thus was already put into lucene - // this can happen during recovery where older operations are sent from the translog that are already - // part of the lucene commit (either from a peer recovery or a local translog) - // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in - // question may have been deleted in an out of order op that is not replayed. - // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery - // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; - } else { - opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); - } + final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); final DeletionStrategy plan; if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { From 9b2a6d800514f73b2f826618cc85b0ce30da2372 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 4 Apr 2018 08:56:17 +0100 Subject: [PATCH 6/8] Inline 'plan' and 'status' --- .../index/engine/InternalEngine.java | 55 ++++++++----------- 1 file changed, 23 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 19d74b6381336..5f7cdb2eed92d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -579,7 +579,6 @@ enum OpVsLuceneDocStatus { private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; - final OpVsLuceneDocStatus status; if (op.seqNo() <= localCheckpointTracker.getCheckpoint()) { // The operation seq# is lower then the current local checkpoint and thus was already put into Lucene. This // can happen during recovery where older operations are sent from the translog that are already part of the @@ -600,13 +599,13 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) // then the present op would appear to be fresh and would be applied a second time. To avoid this, we must // check the local checkpoint again: since the local checkpoint only ever advances, checking it after the // call to getVersionFromMap() gives the correct result. - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } else if (versionValue != null) { if (op.seqNo() > versionValue.seqNo || - (op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) - status = OpVsLuceneDocStatus.OP_NEWER; - else { - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + (op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) { + return OpVsLuceneDocStatus.OP_NEWER; + } else { + return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } } else { // load from index @@ -614,23 +613,22 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) { DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid()); if (docAndSeqNo == null) { - status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; + return OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; } else if (op.seqNo() > docAndSeqNo.seqNo) { - status = OpVsLuceneDocStatus.OP_NEWER; + return OpVsLuceneDocStatus.OP_NEWER; } else if (op.seqNo() == docAndSeqNo.seqNo) { // load term to tie break final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo, op.uid().field()); if (op.primaryTerm() > existingTerm) { - status = OpVsLuceneDocStatus.OP_NEWER; + return OpVsLuceneDocStatus.OP_NEWER; } else { - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } } else { - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } } } - return status; } /** resolves the current version of the document, returning null if not found */ @@ -824,7 +822,6 @@ public IndexResult index(Index index) throws IOException { } private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException { - final IndexingStrategy plan; final boolean appendOnlyRequest = canOptimizeAddDocument(index); if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) { /* @@ -836,7 +833,7 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio * requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization. */ assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]"; - plan = IndexingStrategy.optimizedAppendOnly(index.seqNo()); + return IndexingStrategy.optimizedAppendOnly(index.seqNo()); } else { if (appendOnlyRequest == false) { maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr)); @@ -852,26 +849,25 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio // a delete state and return false for the created flag in favor of code simplicity final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); + return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); } else { - plan = IndexingStrategy.processNormally( + return IndexingStrategy.processNormally( opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.seqNo(), index.version() ); } } - return plan; } private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { - assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin(); - final IndexingStrategy plan; + assert index.origin() == Operation.Origin.PRIMARY : "planning as primary but origin is " + index.origin(); // resolve an external operation into an internal one which is safe to replay if (canOptimizeAddDocument(index)) { if (mayHaveBeenIndexedBefore(index)) { - plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L); + final IndexingStrategy plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L); versionMap.enforceSafeAccess(); + return plan; } else { - plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index)); + return IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index)); } } else { versionMap.enforceSafeAccess(); @@ -890,15 +886,14 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { currentVersion, index.version(), currentNotFoundOrDeleted)) { final VersionConflictEngineException e = new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted); - plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion); + return IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion); } else { - plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted, + return IndexingStrategy.processNormally(currentNotFoundOrDeleted, generateSeqNoForOperation(index), index.versionType().updateVersion(currentVersion, index.version()) ); } } - return plan; } private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) @@ -1138,15 +1133,13 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept // a delete state and return true for the found flag in favor of code simplicity final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); - final DeletionStrategy plan; if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version()); + return DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version()); } else { - plan = DeletionStrategy.processNormally( + return DeletionStrategy.processNormally( opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.seqNo(), delete.version()); } - return plan; } private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException { @@ -1163,17 +1156,15 @@ private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException currentVersion = versionValue.version; currentlyDeleted = versionValue.isDelete(); } - final DeletionStrategy plan; if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) { final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted); - plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted); + return DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted); } else { - plan = DeletionStrategy.processNormally( + return DeletionStrategy.processNormally( currentlyDeleted, generateSeqNoForOperation(delete), delete.versionType().updateVersion(currentVersion, delete.version())); } - return plan; } private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) From 5d46eee62c53542c973a6435ff556cd922634210 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 6 Apr 2018 10:10:51 +0100 Subject: [PATCH 7/8] Introduce DocHistoryEntry Today, each test has its own process for deciding whether to refresh/flush/GC deletes after applying an operation. This change moves this decision into generateSingleDocHistory(). --- .../index/engine/InternalEngineTests.java | 92 ++++++++++++------- 1 file changed, 60 insertions(+), 32 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 9cdc68444ea16..38fca74272ac4 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1384,11 +1384,26 @@ public void testVersioningCreateExistsException() throws IOException { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - protected List generateSingleDocHistory(boolean forReplica, VersionType versionType, - boolean partialOldPrimary, long primaryTerm, - int minOpCount, int maxOpCount) { + private class DocHistoryEntry { + final Engine.Operation op; + final boolean refreshAfterOperation; + final boolean flushAfterOperation; + final boolean gcDeletesAfterOperation; + + private DocHistoryEntry(Engine.Operation op, boolean refreshAfterOperation, + boolean flushAfterOperation, boolean gcDeletesAfterOperation) { + this.op = op; + this.refreshAfterOperation = refreshAfterOperation; + this.flushAfterOperation = flushAfterOperation; + this.gcDeletesAfterOperation = gcDeletesAfterOperation; + } + } + + protected List generateSingleDocHistory(boolean forReplica, VersionType versionType, + boolean partialOldPrimary, long primaryTerm, + int minOpCount, int maxOpCount) { final int numOfOps = randomIntBetween(minOpCount, maxOpCount); - final List ops = new ArrayList<>(); + final List ops = new ArrayList<>(); final Term id = newUid("1"); final int startWithSeqNo; if (partialOldPrimary) { @@ -1435,19 +1450,19 @@ protected List generateSingleDocHistory(boolean forReplica, Ve forReplica ? REPLICA : PRIMARY, System.currentTimeMillis()); } - ops.add(op); + ops.add(new DocHistoryEntry(op, randomBoolean(), randomBoolean(), rarely())); } return ops; } public void testOutOfOrderDocsOnReplica() throws IOException { - final List ops = generateSingleDocHistory(true, + final List ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20); assertOpsOnReplica(ops, replicaEngine, true); } - private void assertOpsOnReplica(List ops, InternalEngine replicaEngine, boolean shuffleOps) throws IOException { - final Engine.Operation lastOp = ops.get(ops.size() - 1); + private void assertOpsOnReplica(List ops, InternalEngine replicaEngine, boolean shuffleOps) throws IOException { + final Engine.Operation lastOp = ops.get(ops.size() - 1).op; final String lastFieldValue; if (lastOp instanceof Engine.Index) { Engine.Index index = (Engine.Index) lastOp; @@ -1458,7 +1473,7 @@ private void assertOpsOnReplica(List ops, InternalEngine repli } if (shuffleOps) { int firstOpWithSeqNo = 0; - while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) { + while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).op.seqNo() < 0) { firstOpWithSeqNo++; } // shuffle ops but make sure legacy ops are first @@ -1466,7 +1481,8 @@ private void assertOpsOnReplica(List ops, InternalEngine repli shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random()); } boolean firstOp = true; - for (Engine.Operation op : ops) { + for (final DocHistoryEntry docHistoryEntry : ops) { + final Engine.Operation op = docHistoryEntry.op; logger.info("performing [{}], v [{}], seq# [{}], term [{}]", op.operationType().name().charAt(0), op.version(), op.seqNo(), op.primaryTerm()); if (op instanceof Engine.Index) { @@ -1491,13 +1507,14 @@ private void assertOpsOnReplica(List ops, InternalEngine repli assertThat(result.getVersion(), equalTo(op.version())); assertThat(result.hasFailure(), equalTo(false)); } - if (randomBoolean()) { + if (docHistoryEntry.refreshAfterOperation) { engine.refresh("test"); } - if (randomBoolean()) { + if (docHistoryEntry.flushAfterOperation) { engine.flush(); engine.refresh("test"); } + // TODO GC deletes? firstOp = false; } @@ -1512,8 +1529,8 @@ private void assertOpsOnReplica(List ops, InternalEngine repli } public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedException { - final List ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 100, 300); - final Engine.Operation lastOp = ops.get(ops.size() - 1); + final List ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 100, 300); + final Engine.Operation lastOp = ops.get(ops.size() - 1).op; final String lastFieldValue; if (lastOp instanceof Engine.Index) { Engine.Index index = (Engine.Index) lastOp; @@ -1535,7 +1552,7 @@ public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedEx } } - private void concurrentlyApplyOps(List ops, InternalEngine engine) throws InterruptedException { + private void concurrentlyApplyOps(List ops, InternalEngine engine) throws InterruptedException { Thread[] thread = new Thread[randomIntBetween(3, 5)]; CountDownLatch startGun = new CountDownLatch(thread.length); AtomicInteger offset = new AtomicInteger(-1); @@ -1550,15 +1567,21 @@ private void concurrentlyApplyOps(List ops, InternalEngine eng int docOffset; while ((docOffset = offset.incrementAndGet()) < ops.size()) { try { - final Engine.Operation op = ops.get(docOffset); + final DocHistoryEntry docHistoryEntry = ops.get(docOffset); + final Engine.Operation op = docHistoryEntry.op; if (op instanceof Engine.Index) { engine.index((Engine.Index) op); } else { engine.delete((Engine.Delete) op); } - if ((docOffset + 1) % 4 == 0) { + if (docHistoryEntry.refreshAfterOperation) { engine.refresh("test"); } + if (docHistoryEntry.flushAfterOperation) { + engine.flush(); + engine.refresh("test"); + } + // TODO GC deletes? } catch (IOException e) { throw new AssertionError(e); } @@ -1572,11 +1595,11 @@ private void concurrentlyApplyOps(List ops, InternalEngine eng } public void testInternalVersioningOnPrimary() throws IOException { - final List ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); + final List ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine); } - private int assertOpsOnPrimary(List ops, long currentOpVersion, boolean docDeleted, InternalEngine engine) + private int assertOpsOnPrimary(List ops, long currentOpVersion, boolean docDeleted, InternalEngine engine) throws IOException { String lastFieldValue = null; int opsPerformed = 0; @@ -1586,7 +1609,8 @@ private int assertOpsOnPrimary(List ops, long currentOpVersion index.getAutoGeneratedIdTimestamp(), index.isRetry()); BiFunction delWithVersion = (version, delete) -> new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), delete.primaryTerm(), version, delete.versionType(), delete.origin(), delete.startTime()); - for (Engine.Operation op : ops) { + for (final DocHistoryEntry docHistoryEntry : ops) { + final Engine.Operation op = docHistoryEntry.op; final boolean versionConflict = rarely(); final boolean versionedOp = versionConflict || randomBoolean(); final long conflictingVersion = docDeleted || randomBoolean() ? @@ -1650,12 +1674,14 @@ private int assertOpsOnPrimary(List ops, long currentOpVersion } } } - if (randomBoolean()) { + + // TODO pure refresh? + if (docHistoryEntry.flushAfterOperation) { engine.flush(); engine.refresh("test"); } - if (rarely()) { + if (docHistoryEntry.gcDeletesAfterOperation) { // simulate GC deletes engine.refresh("gc_simulation", Engine.SearcherScope.INTERNAL); engine.clearDeletedTombstones(); @@ -1680,8 +1706,8 @@ public void testNonInternalVersioningOnPrimary() throws IOException { final Set nonInternalVersioning = new HashSet<>(Arrays.asList(VersionType.values())); nonInternalVersioning.remove(VersionType.INTERNAL); final VersionType versionType = randomFrom(nonInternalVersioning); - final List ops = generateSingleDocHistory(false, versionType, false, 2, 2, 20); - final Engine.Operation lastOp = ops.get(ops.size() - 1); + final List ops = generateSingleDocHistory(false, versionType, false, 2, 2, 20); + final Engine.Operation lastOp = ops.get(ops.size() - 1).op; final String lastFieldValue; if (lastOp instanceof Engine.Index) { Engine.Index index = (Engine.Index) lastOp; @@ -1697,7 +1723,8 @@ public void testNonInternalVersioningOnPrimary() throws IOException { long highestOpVersion = Versions.NOT_FOUND; long seqNo = -1; boolean docDeleted = true; - for (Engine.Operation op : ops) { + for (final DocHistoryEntry docHistoryEntry : ops) { + final Engine.Operation op = docHistoryEntry.op; logger.info("performing [{}], v [{}], seq# [{}], term [{}]", op.operationType().name().charAt(0), op.version(), op.seqNo(), op.primaryTerm()); if (op instanceof Engine.Index) { @@ -1737,13 +1764,14 @@ public void testNonInternalVersioningOnPrimary() throws IOException { assertThat(result.getFailure(), instanceOf(VersionConflictEngineException.class)); } } - if (randomBoolean()) { + if (docHistoryEntry.refreshAfterOperation) { engine.refresh("test"); } - if (randomBoolean()) { + if (docHistoryEntry.flushAfterOperation) { engine.flush(); engine.refresh("test"); } + // TODO GC deletes? } assertVisibleCount(engine, docDeleted ? 0 : 1); @@ -1758,9 +1786,9 @@ public void testNonInternalVersioningOnPrimary() throws IOException { } public void testVersioningPromotedReplica() throws IOException { - final List replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, false, 1, 2, 20); - List primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); - Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1); + final List replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, false, 1, 2, 20); + List primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); + Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1).op; final boolean deletedOnReplica = lastReplicaOp instanceof Engine.Delete; final long finalReplicaVersion = lastReplicaOp.version(); final long finalReplicaSeqNo = lastReplicaOp.seqNo(); @@ -1779,8 +1807,8 @@ public void testVersioningPromotedReplica() throws IOException { } public void testConcurrentExternalVersioningOnPrimary() throws IOException, InterruptedException { - final List ops = generateSingleDocHistory(false, VersionType.EXTERNAL, false, 2, 100, 300); - final Engine.Operation lastOp = ops.get(ops.size() - 1); + final List ops = generateSingleDocHistory(false, VersionType.EXTERNAL, false, 2, 100, 300); + final Engine.Operation lastOp = ops.get(ops.size() - 1).op; final String lastFieldValue; if (lastOp instanceof Engine.Index) { Engine.Index index = (Engine.Index) lastOp; From f331de6c6d709476788a3c2ab4dd64ba589edb26 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 17 Apr 2018 13:35:33 +0100 Subject: [PATCH 8/8] Back out changes to tests --- .../index/engine/InternalEngineTests.java | 92 +++++++------------ 1 file changed, 32 insertions(+), 60 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 3bb1308dc976e..60913c644eadb 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1384,26 +1384,11 @@ public void testVersioningCreateExistsException() throws IOException { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - private class DocHistoryEntry { - final Engine.Operation op; - final boolean refreshAfterOperation; - final boolean flushAfterOperation; - final boolean gcDeletesAfterOperation; - - private DocHistoryEntry(Engine.Operation op, boolean refreshAfterOperation, - boolean flushAfterOperation, boolean gcDeletesAfterOperation) { - this.op = op; - this.refreshAfterOperation = refreshAfterOperation; - this.flushAfterOperation = flushAfterOperation; - this.gcDeletesAfterOperation = gcDeletesAfterOperation; - } - } - - protected List generateSingleDocHistory(boolean forReplica, VersionType versionType, - boolean partialOldPrimary, long primaryTerm, - int minOpCount, int maxOpCount) { + protected List generateSingleDocHistory(boolean forReplica, VersionType versionType, + boolean partialOldPrimary, long primaryTerm, + int minOpCount, int maxOpCount) { final int numOfOps = randomIntBetween(minOpCount, maxOpCount); - final List ops = new ArrayList<>(); + final List ops = new ArrayList<>(); final Term id = newUid("1"); final int startWithSeqNo; if (partialOldPrimary) { @@ -1450,19 +1435,19 @@ protected List generateSingleDocHistory(boolean forReplica, Ver forReplica ? REPLICA : PRIMARY, System.currentTimeMillis()); } - ops.add(new DocHistoryEntry(op, randomBoolean(), randomBoolean(), rarely())); + ops.add(op); } return ops; } public void testOutOfOrderDocsOnReplica() throws IOException { - final List ops = generateSingleDocHistory(true, + final List ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20); assertOpsOnReplica(ops, replicaEngine, true); } - private void assertOpsOnReplica(List ops, InternalEngine replicaEngine, boolean shuffleOps) throws IOException { - final Engine.Operation lastOp = ops.get(ops.size() - 1).op; + private void assertOpsOnReplica(List ops, InternalEngine replicaEngine, boolean shuffleOps) throws IOException { + final Engine.Operation lastOp = ops.get(ops.size() - 1); final String lastFieldValue; if (lastOp instanceof Engine.Index) { Engine.Index index = (Engine.Index) lastOp; @@ -1473,7 +1458,7 @@ private void assertOpsOnReplica(List ops, InternalEngine replic } if (shuffleOps) { int firstOpWithSeqNo = 0; - while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).op.seqNo() < 0) { + while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) { firstOpWithSeqNo++; } // shuffle ops but make sure legacy ops are first @@ -1481,8 +1466,7 @@ private void assertOpsOnReplica(List ops, InternalEngine replic shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random()); } boolean firstOp = true; - for (final DocHistoryEntry docHistoryEntry : ops) { - final Engine.Operation op = docHistoryEntry.op; + for (Engine.Operation op : ops) { logger.info("performing [{}], v [{}], seq# [{}], term [{}]", op.operationType().name().charAt(0), op.version(), op.seqNo(), op.primaryTerm()); if (op instanceof Engine.Index) { @@ -1507,14 +1491,13 @@ private void assertOpsOnReplica(List ops, InternalEngine replic assertThat(result.getVersion(), equalTo(op.version())); assertThat(result.hasFailure(), equalTo(false)); } - if (docHistoryEntry.refreshAfterOperation) { + if (randomBoolean()) { engine.refresh("test"); } - if (docHistoryEntry.flushAfterOperation) { + if (randomBoolean()) { engine.flush(); engine.refresh("test"); } - // TODO GC deletes? firstOp = false; } @@ -1529,8 +1512,8 @@ private void assertOpsOnReplica(List ops, InternalEngine replic } public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedException { - final List ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 100, 300); - final Engine.Operation lastOp = ops.get(ops.size() - 1).op; + final List ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 100, 300); + final Engine.Operation lastOp = ops.get(ops.size() - 1); final String lastFieldValue; if (lastOp instanceof Engine.Index) { Engine.Index index = (Engine.Index) lastOp; @@ -1552,7 +1535,7 @@ public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedEx } } - private void concurrentlyApplyOps(List ops, InternalEngine engine) throws InterruptedException { + private void concurrentlyApplyOps(List ops, InternalEngine engine) throws InterruptedException { Thread[] thread = new Thread[randomIntBetween(3, 5)]; CountDownLatch startGun = new CountDownLatch(thread.length); AtomicInteger offset = new AtomicInteger(-1); @@ -1567,21 +1550,15 @@ private void concurrentlyApplyOps(List ops, InternalEngine engi int docOffset; while ((docOffset = offset.incrementAndGet()) < ops.size()) { try { - final DocHistoryEntry docHistoryEntry = ops.get(docOffset); - final Engine.Operation op = docHistoryEntry.op; + final Engine.Operation op = ops.get(docOffset); if (op instanceof Engine.Index) { engine.index((Engine.Index) op); } else { engine.delete((Engine.Delete) op); } - if (docHistoryEntry.refreshAfterOperation) { + if ((docOffset + 1) % 4 == 0) { engine.refresh("test"); } - if (docHistoryEntry.flushAfterOperation) { - engine.flush(); - engine.refresh("test"); - } - // TODO GC deletes? } catch (IOException e) { throw new AssertionError(e); } @@ -1595,11 +1572,11 @@ private void concurrentlyApplyOps(List ops, InternalEngine engi } public void testInternalVersioningOnPrimary() throws IOException { - final List ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); + final List ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine); } - private int assertOpsOnPrimary(List ops, long currentOpVersion, boolean docDeleted, InternalEngine engine) + private int assertOpsOnPrimary(List ops, long currentOpVersion, boolean docDeleted, InternalEngine engine) throws IOException { String lastFieldValue = null; int opsPerformed = 0; @@ -1609,8 +1586,7 @@ private int assertOpsOnPrimary(List ops, long currentOpVersion, index.getAutoGeneratedIdTimestamp(), index.isRetry()); BiFunction delWithVersion = (version, delete) -> new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), delete.primaryTerm(), version, delete.versionType(), delete.origin(), delete.startTime()); - for (final DocHistoryEntry docHistoryEntry : ops) { - final Engine.Operation op = docHistoryEntry.op; + for (Engine.Operation op : ops) { final boolean versionConflict = rarely(); final boolean versionedOp = versionConflict || randomBoolean(); final long conflictingVersion = docDeleted || randomBoolean() ? @@ -1674,14 +1650,12 @@ private int assertOpsOnPrimary(List ops, long currentOpVersion, } } } - - // TODO pure refresh? - if (docHistoryEntry.flushAfterOperation) { + if (randomBoolean()) { engine.flush(); engine.refresh("test"); } - if (docHistoryEntry.gcDeletesAfterOperation) { + if (rarely()) { // simulate GC deletes engine.refresh("gc_simulation", Engine.SearcherScope.INTERNAL); engine.clearDeletedTombstones(); @@ -1706,8 +1680,8 @@ public void testNonInternalVersioningOnPrimary() throws IOException { final Set nonInternalVersioning = new HashSet<>(Arrays.asList(VersionType.values())); nonInternalVersioning.remove(VersionType.INTERNAL); final VersionType versionType = randomFrom(nonInternalVersioning); - final List ops = generateSingleDocHistory(false, versionType, false, 2, 2, 20); - final Engine.Operation lastOp = ops.get(ops.size() - 1).op; + final List ops = generateSingleDocHistory(false, versionType, false, 2, 2, 20); + final Engine.Operation lastOp = ops.get(ops.size() - 1); final String lastFieldValue; if (lastOp instanceof Engine.Index) { Engine.Index index = (Engine.Index) lastOp; @@ -1723,8 +1697,7 @@ public void testNonInternalVersioningOnPrimary() throws IOException { long highestOpVersion = Versions.NOT_FOUND; long seqNo = -1; boolean docDeleted = true; - for (final DocHistoryEntry docHistoryEntry : ops) { - final Engine.Operation op = docHistoryEntry.op; + for (Engine.Operation op : ops) { logger.info("performing [{}], v [{}], seq# [{}], term [{}]", op.operationType().name().charAt(0), op.version(), op.seqNo(), op.primaryTerm()); if (op instanceof Engine.Index) { @@ -1764,14 +1737,13 @@ public void testNonInternalVersioningOnPrimary() throws IOException { assertThat(result.getFailure(), instanceOf(VersionConflictEngineException.class)); } } - if (docHistoryEntry.refreshAfterOperation) { + if (randomBoolean()) { engine.refresh("test"); } - if (docHistoryEntry.flushAfterOperation) { + if (randomBoolean()) { engine.flush(); engine.refresh("test"); } - // TODO GC deletes? } assertVisibleCount(engine, docDeleted ? 0 : 1); @@ -1786,9 +1758,9 @@ public void testNonInternalVersioningOnPrimary() throws IOException { } public void testVersioningPromotedReplica() throws IOException { - final List replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, false, 1, 2, 20); - List primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); - Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1).op; + final List replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, false, 1, 2, 20); + List primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); + Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1); final boolean deletedOnReplica = lastReplicaOp instanceof Engine.Delete; final long finalReplicaVersion = lastReplicaOp.version(); final long finalReplicaSeqNo = lastReplicaOp.seqNo(); @@ -1807,8 +1779,8 @@ public void testVersioningPromotedReplica() throws IOException { } public void testConcurrentExternalVersioningOnPrimary() throws IOException, InterruptedException { - final List ops = generateSingleDocHistory(false, VersionType.EXTERNAL, false, 2, 100, 300); - final Engine.Operation lastOp = ops.get(ops.size() - 1).op; + final List ops = generateSingleDocHistory(false, VersionType.EXTERNAL, false, 2, 100, 300); + final Engine.Operation lastOp = ops.get(ops.size() - 1); final String lastFieldValue; if (lastOp instanceof Engine.Index) { Engine.Index index = (Engine.Index) lastOp;