Skip to content

Commit 06cacdc

Browse files
committed
Expose indexing plans
This is an additional engine refactoring to expose indexing plans. Relates #27375
1 parent 96262bf commit 06cacdc

File tree

1 file changed

+49
-26
lines changed

1 file changed

+49
-26
lines changed

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.elasticsearch.index.seqno.SequenceNumbers;
7373
import org.elasticsearch.index.seqno.SequenceNumbersService;
7474
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
75+
import org.elasticsearch.index.shard.IndexingStats;
7576
import org.elasticsearch.index.shard.ShardId;
7677
import org.elasticsearch.index.translog.Translog;
7778
import org.elasticsearch.index.translog.TranslogConfig;
@@ -680,8 +681,7 @@ private boolean canOptimizeAddDocument(Index index) {
680681
+ index.getAutoGeneratedIdTimestamp();
681682
switch (index.origin()) {
682683
case PRIMARY:
683-
assert (index.version() == Versions.MATCH_ANY && index.versionType() == VersionType.INTERNAL)
684-
: "version: " + index.version() + " type: " + index.versionType();
684+
assertPrimaryCanOptimizeAddDocument(index);
685685
return true;
686686
case PEER_RECOVERY:
687687
case REPLICA:
@@ -698,6 +698,12 @@ private boolean canOptimizeAddDocument(Index index) {
698698
return false;
699699
}
700700

701+
protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) {
702+
assert (index.version() == Versions.MATCH_ANY && index.versionType() == VersionType.INTERNAL)
703+
: "version: " + index.version() + " type: " + index.versionType();
704+
return true;
705+
}
706+
701707
private boolean assertVersionType(final Engine.Operation operation) {
702708
if (operation.origin() == Operation.Origin.REPLICA ||
703709
operation.origin() == Operation.Origin.PEER_RECOVERY ||
@@ -718,16 +724,21 @@ private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origi
718724
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : "old op recovering but it already has a seq no.;" +
719725
" index version: " + engineConfig.getIndexSettings().getIndexVersionCreated() + ", seqNo: " + seqNo;
720726
} else if (origin == Operation.Origin.PRIMARY) {
721-
// sequence number should not be set when operation origin is primary
722-
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO
723-
: "primary operations must never have an assigned sequence number but was [" + seqNo + "]";
727+
assertPrimaryIncomingSequenceNumber(origin, seqNo);
724728
} else if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1)) {
725729
// sequence number should be set when operation origin is not primary
726730
assert seqNo >= 0 : "recovery or replica ops should have an assigned seq no.; origin: " + origin;
727731
}
728732
return true;
729733
}
730734

735+
protected boolean assertPrimaryIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
736+
// sequence number should not be set when operation origin is primary
737+
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO
738+
: "primary operations must never have an assigned sequence number but was [" + seqNo + "]";
739+
return true;
740+
}
741+
731742
private boolean assertSequenceNumberBeforeIndexing(final Engine.Operation.Origin origin, final long seqNo) {
732743
if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1) ||
733744
origin == Operation.Origin.PRIMARY) {
@@ -788,14 +799,7 @@ public IndexResult index(Index index) throws IOException {
788799
* if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls
789800
* updateDocument.
790801
*/
791-
final IndexingStrategy plan;
792-
793-
if (index.origin() == Operation.Origin.PRIMARY) {
794-
plan = planIndexingAsPrimary(index);
795-
} else {
796-
// non-primary mode (i.e., replica or recovery)
797-
plan = planIndexingAsNonPrimary(index);
798-
}
802+
final IndexingStrategy plan = indexingStrategyForOperation(index);
799803

800804
final IndexResult indexResult;
801805
if (plan.earlyResultOnPreFlightError.isPresent()) {
@@ -836,7 +840,8 @@ public IndexResult index(Index index) throws IOException {
836840
}
837841
}
838842

839-
private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
843+
protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
844+
assertNonPrimaryOrigin(index);
840845
final IndexingStrategy plan;
841846
if (canOptimizeAddDocument(index) && mayHaveBeenIndexedBefore(index) == false) {
842847
// no need to deal with out of order delivery - we never saw this one
@@ -879,7 +884,16 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0
879884
return plan;
880885
}
881886

882-
private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
887+
protected IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException {
888+
if (index.origin() == Operation.Origin.PRIMARY) {
889+
return planIndexingAsPrimary(index);
890+
} else {
891+
// non-primary mode (i.e., replica or recovery)
892+
return planIndexingAsNonPrimary(index);
893+
}
894+
}
895+
896+
protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
883897
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
884898
final IndexingStrategy plan;
885899
// resolve an external operation into an internal one which is safe to replay
@@ -995,7 +1009,7 @@ private static void index(final List<ParseContext.Document> docs, final IndexWri
9951009
}
9961010
}
9971011

998-
private static final class IndexingStrategy {
1012+
protected static final class IndexingStrategy {
9991013
final boolean currentNotFoundOrDeleted;
10001014
final boolean useLuceneUpdateDocument;
10011015
final long seqNoForIndexing;
@@ -1089,12 +1103,7 @@ public DeleteResult delete(Delete delete) throws IOException {
10891103
try (ReleasableLock ignored = readLock.acquire(); Releasable ignored2 = acquireLock(delete.uid())) {
10901104
ensureOpen();
10911105
lastWriteNanos = delete.startTime();
1092-
final DeletionStrategy plan;
1093-
if (delete.origin() == Operation.Origin.PRIMARY) {
1094-
plan = planDeletionAsPrimary(delete);
1095-
} else {
1096-
plan = planDeletionAsNonPrimary(delete);
1097-
}
1106+
final DeletionStrategy plan = deletionStrategyForOperation(delete);
10981107

10991108
if (plan.earlyResultOnPreflightError.isPresent()) {
11001109
deleteResult = plan.earlyResultOnPreflightError.get();
@@ -1133,8 +1142,17 @@ public DeleteResult delete(Delete delete) throws IOException {
11331142
return deleteResult;
11341143
}
11351144

1136-
private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
1137-
assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
1145+
protected DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException {
1146+
if (delete.origin() == Operation.Origin.PRIMARY) {
1147+
return planDeletionAsPrimary(delete);
1148+
} else {
1149+
// non-primary mode (i.e., replica or recovery)
1150+
return planDeletionAsNonPrimary(delete);
1151+
}
1152+
}
1153+
1154+
protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
1155+
assertNonPrimaryOrigin(delete);
11381156
// drop out of order operations
11391157
assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() :
11401158
"resolving out of order delivery based on versioning but version type isn't fit for it. got ["
@@ -1171,7 +1189,12 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0
11711189
return plan;
11721190
}
11731191

1174-
private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
1192+
protected boolean assertNonPrimaryOrigin(final Operation operation) {
1193+
assert operation.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + operation.origin();
1194+
return true;
1195+
}
1196+
1197+
protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
11751198
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
11761199
// resolve operation from external to internal
11771200
final VersionValue versionValue = resolveDocVersion(delete);
@@ -1222,7 +1245,7 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
12221245
}
12231246
}
12241247

1225-
private static final class DeletionStrategy {
1248+
protected static final class DeletionStrategy {
12261249
// of a rare double delete
12271250
final boolean deleteFromLucene;
12281251
final boolean currentlyDeleted;

0 commit comments

Comments
 (0)