From f96a6c91ba2042be97e4517c4db80a8dfa65e470 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 14 Nov 2017 07:09:25 -0500 Subject: [PATCH 1/3] Expose indexing plans This is an additional engine refactoring to expose indexing plans. --- .../index/engine/InternalEngine.java | 76 ++++++++++++------- 1 file changed, 49 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a928843509de7..e7097a98964b4 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -72,6 +72,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; +import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; @@ -668,9 +669,7 @@ private boolean canOptimizeAddDocument(Index index) { + index.getAutoGeneratedIdTimestamp(); switch (index.origin()) { case PRIMARY: - assert (index.version() == Versions.MATCH_ANY && index.versionType() == VersionType.INTERNAL) - : "version: " + index.version() + " type: " + index.versionType(); - return true; + assertPrimaryCanOptimizeAddDocument(index); case PEER_RECOVERY: case REPLICA: assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL @@ -686,6 +685,12 @@ private boolean canOptimizeAddDocument(Index index) { return false; } + protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) { + assert (index.version() == Versions.MATCH_ANY && index.versionType() == VersionType.INTERNAL) + : "version: " + index.version() + " type: " + index.versionType(); + return true; + } + private boolean assertVersionType(final Engine.Operation operation) { if (operation.origin() == Operation.Origin.REPLICA || operation.origin() == Operation.Origin.PEER_RECOVERY || @@ -706,9 +711,7 @@ private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origi assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : "old op recovering but it already has a seq no.;" + " index version: " + engineConfig.getIndexSettings().getIndexVersionCreated() + ", seqNo: " + seqNo; } else if (origin == Operation.Origin.PRIMARY) { - // sequence number should not be set when operation origin is primary - assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO - : "primary operations must never have an assigned sequence number but was [" + seqNo + "]"; + assertPrimaryIncomingSequenceNumber(origin, seqNo); } else if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1)) { // sequence number should be set when operation origin is not primary assert seqNo >= 0 : "recovery or replica ops should have an assigned seq no.; origin: " + origin; @@ -716,6 +719,13 @@ private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origi return true; } + protected boolean assertPrimaryIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) { + // sequence number should not be set when operation origin is primary + assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO + : "primary operations must never have an assigned sequence number but was [" + seqNo + "]"; + return true; + } + private boolean assertSequenceNumberBeforeIndexing(final Engine.Operation.Origin origin, final long seqNo) { if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1) || origin == Operation.Origin.PRIMARY) { @@ -776,14 +786,7 @@ public IndexResult index(Index index) throws IOException { * if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls * updateDocument. */ - final IndexingStrategy plan; - - if (index.origin() == Operation.Origin.PRIMARY) { - plan = planIndexingAsPrimary(index); - } else { - // non-primary mode (i.e., replica or recovery) - plan = planIndexingAsNonPrimary(index); - } + final IndexingStrategy plan = indexingStrategyForOperation(index); final IndexResult indexResult; if (plan.earlyResultOnPreFlightError.isPresent()) { @@ -824,7 +827,8 @@ public IndexResult index(Index index) throws IOException { } } - private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException { + protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException { + assertNonPrimaryOrigin(index); final IndexingStrategy plan; if (canOptimizeAddDocument(index) && mayHaveBeenIndexedBefore(index) == false) { // no need to deal with out of order delivery - we never saw this one @@ -867,7 +871,16 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0 return plan; } - private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { + protected IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException { + if (index.origin() == Operation.Origin.PRIMARY) { + return planIndexingAsPrimary(index); + } else { + // non-primary mode (i.e., replica or recovery) + return planIndexingAsNonPrimary(index); + } + } + + protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin(); final IndexingStrategy plan; // resolve an external operation into an internal one which is safe to replay @@ -983,7 +996,7 @@ private static void index(final List docs, final IndexWri } } - private static final class IndexingStrategy { + protected static final class IndexingStrategy { final boolean currentNotFoundOrDeleted; final boolean useLuceneUpdateDocument; final long seqNoForIndexing; @@ -1077,12 +1090,7 @@ public DeleteResult delete(Delete delete) throws IOException { try (ReleasableLock ignored = readLock.acquire(); Releasable ignored2 = acquireLock(delete.uid())) { ensureOpen(); lastWriteNanos = delete.startTime(); - final DeletionStrategy plan; - if (delete.origin() == Operation.Origin.PRIMARY) { - plan = planDeletionAsPrimary(delete); - } else { - plan = planDeletionAsNonPrimary(delete); - } + final DeletionStrategy plan = deletionStrategyForOperation(delete); if (plan.earlyResultOnPreflightError.isPresent()) { deleteResult = plan.earlyResultOnPreflightError.get(); @@ -1121,8 +1129,17 @@ public DeleteResult delete(Delete delete) throws IOException { return deleteResult; } - private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException { - assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); + protected DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException { + if (delete.origin() == Operation.Origin.PRIMARY) { + return planDeletionAsPrimary(delete); + } else { + // non-primary mode (i.e., replica or recovery) + return planDeletionAsNonPrimary(delete); + } + } + + protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException { + assertNonPrimaryOrigin(delete); // drop out of order operations assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() : "resolving out of order delivery based on versioning but version type isn't fit for it. got [" @@ -1159,7 +1176,12 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0 return plan; } - private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException { + protected boolean assertNonPrimaryOrigin(final Operation operation) { + assert operation.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + operation.origin(); + return true; + } + + protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException { assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); // resolve operation from external to internal final VersionValue versionValue = resolveDocVersion(delete); @@ -1210,7 +1232,7 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) } } - private static final class DeletionStrategy { + protected static final class DeletionStrategy { // of a rare double delete final boolean deleteFromLucene; final boolean currentlyDeleted; From 3f451f42da394b9399063d55744ee754282beef5 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 14 Nov 2017 14:20:19 -0500 Subject: [PATCH 2/3] Missing break --- .../main/java/org/elasticsearch/index/engine/InternalEngine.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index e7097a98964b4..1a5757dc6d1d6 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -670,6 +670,7 @@ private boolean canOptimizeAddDocument(Index index) { switch (index.origin()) { case PRIMARY: assertPrimaryCanOptimizeAddDocument(index); + break; case PEER_RECOVERY: case REPLICA: assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL From 987985531b1da03f798934242c3e1f893546bf08 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 14 Nov 2017 14:41:27 -0500 Subject: [PATCH 3/3] Oops --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1a5757dc6d1d6..c587e9e004242 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -670,7 +670,7 @@ private boolean canOptimizeAddDocument(Index index) { switch (index.origin()) { case PRIMARY: assertPrimaryCanOptimizeAddDocument(index); - break; + return true; case PEER_RECOVERY: case REPLICA: assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL