diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a928843509de7..c587e9e004242 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -72,6 +72,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; +import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; @@ -668,8 +669,7 @@ private boolean canOptimizeAddDocument(Index index) { + index.getAutoGeneratedIdTimestamp(); switch (index.origin()) { case PRIMARY: - assert (index.version() == Versions.MATCH_ANY && index.versionType() == VersionType.INTERNAL) - : "version: " + index.version() + " type: " + index.versionType(); + assertPrimaryCanOptimizeAddDocument(index); return true; case PEER_RECOVERY: case REPLICA: @@ -686,6 +686,12 @@ private boolean canOptimizeAddDocument(Index index) { return false; } + protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) { + assert (index.version() == Versions.MATCH_ANY && index.versionType() == VersionType.INTERNAL) + : "version: " + index.version() + " type: " + index.versionType(); + return true; + } + private boolean assertVersionType(final Engine.Operation operation) { if (operation.origin() == Operation.Origin.REPLICA || operation.origin() == Operation.Origin.PEER_RECOVERY || @@ -706,9 +712,7 @@ private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origi assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : "old op recovering but it already has a seq no.;" + " index version: " + engineConfig.getIndexSettings().getIndexVersionCreated() + ", seqNo: " + seqNo; } else if (origin == Operation.Origin.PRIMARY) { - // sequence number should not be set when operation origin is primary - assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO - : "primary operations must never have an assigned sequence number but was [" + seqNo + "]"; + assertPrimaryIncomingSequenceNumber(origin, seqNo); } else if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1)) { // sequence number should be set when operation origin is not primary assert seqNo >= 0 : "recovery or replica ops should have an assigned seq no.; origin: " + origin; @@ -716,6 +720,13 @@ private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origi return true; } + protected boolean assertPrimaryIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) { + // sequence number should not be set when operation origin is primary + assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO + : "primary operations must never have an assigned sequence number but was [" + seqNo + "]"; + return true; + } + private boolean assertSequenceNumberBeforeIndexing(final Engine.Operation.Origin origin, final long seqNo) { if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1) || origin == Operation.Origin.PRIMARY) { @@ -776,14 +787,7 @@ public IndexResult index(Index index) throws IOException { * if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls * updateDocument. */ - final IndexingStrategy plan; - - if (index.origin() == Operation.Origin.PRIMARY) { - plan = planIndexingAsPrimary(index); - } else { - // non-primary mode (i.e., replica or recovery) - plan = planIndexingAsNonPrimary(index); - } + final IndexingStrategy plan = indexingStrategyForOperation(index); final IndexResult indexResult; if (plan.earlyResultOnPreFlightError.isPresent()) { @@ -824,7 +828,8 @@ public IndexResult index(Index index) throws IOException { } } - private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException { + protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException { + assertNonPrimaryOrigin(index); final IndexingStrategy plan; if (canOptimizeAddDocument(index) && mayHaveBeenIndexedBefore(index) == false) { // no need to deal with out of order delivery - we never saw this one @@ -867,7 +872,16 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0 return plan; } - private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { + protected IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException { + if (index.origin() == Operation.Origin.PRIMARY) { + return planIndexingAsPrimary(index); + } else { + // non-primary mode (i.e., replica or recovery) + return planIndexingAsNonPrimary(index); + } + } + + protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin(); final IndexingStrategy plan; // resolve an external operation into an internal one which is safe to replay @@ -983,7 +997,7 @@ private static void index(final List docs, final IndexWri } } - private static final class IndexingStrategy { + protected static final class IndexingStrategy { final boolean currentNotFoundOrDeleted; final boolean useLuceneUpdateDocument; final long seqNoForIndexing; @@ -1077,12 +1091,7 @@ public DeleteResult delete(Delete delete) throws IOException { try (ReleasableLock ignored = readLock.acquire(); Releasable ignored2 = acquireLock(delete.uid())) { ensureOpen(); lastWriteNanos = delete.startTime(); - final DeletionStrategy plan; - if (delete.origin() == Operation.Origin.PRIMARY) { - plan = planDeletionAsPrimary(delete); - } else { - plan = planDeletionAsNonPrimary(delete); - } + final DeletionStrategy plan = deletionStrategyForOperation(delete); if (plan.earlyResultOnPreflightError.isPresent()) { deleteResult = plan.earlyResultOnPreflightError.get(); @@ -1121,8 +1130,17 @@ public DeleteResult delete(Delete delete) throws IOException { return deleteResult; } - private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException { - assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); + protected DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException { + if (delete.origin() == Operation.Origin.PRIMARY) { + return planDeletionAsPrimary(delete); + } else { + // non-primary mode (i.e., replica or recovery) + return planDeletionAsNonPrimary(delete); + } + } + + protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException { + assertNonPrimaryOrigin(delete); // drop out of order operations assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() : "resolving out of order delivery based on versioning but version type isn't fit for it. got [" @@ -1159,7 +1177,12 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0 return plan; } - private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException { + protected boolean assertNonPrimaryOrigin(final Operation operation) { + assert operation.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + operation.origin(); + return true; + } + + protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException { assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); // resolve operation from external to internal final VersionValue versionValue = resolveDocVersion(delete); @@ -1210,7 +1233,7 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) } } - private static final class DeletionStrategy { + protected static final class DeletionStrategy { // of a rare double delete final boolean deleteFromLucene; final boolean currentlyDeleted;