Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
Expand Down Expand Up @@ -668,8 +669,7 @@ private boolean canOptimizeAddDocument(Index index) {
+ index.getAutoGeneratedIdTimestamp();
switch (index.origin()) {
case PRIMARY:
assert (index.version() == Versions.MATCH_ANY && index.versionType() == VersionType.INTERNAL)
: "version: " + index.version() + " type: " + index.versionType();
assertPrimaryCanOptimizeAddDocument(index);
return true;
case PEER_RECOVERY:
case REPLICA:
Expand All @@ -686,6 +686,12 @@ private boolean canOptimizeAddDocument(Index index) {
return false;
}

protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) {
assert (index.version() == Versions.MATCH_ANY && index.versionType() == VersionType.INTERNAL)
: "version: " + index.version() + " type: " + index.versionType();
return true;
}

private boolean assertVersionType(final Engine.Operation operation) {
if (operation.origin() == Operation.Origin.REPLICA ||
operation.origin() == Operation.Origin.PEER_RECOVERY ||
Expand All @@ -706,16 +712,21 @@ private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origi
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : "old op recovering but it already has a seq no.;" +
" index version: " + engineConfig.getIndexSettings().getIndexVersionCreated() + ", seqNo: " + seqNo;
} else if (origin == Operation.Origin.PRIMARY) {
// sequence number should not be set when operation origin is primary
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO
: "primary operations must never have an assigned sequence number but was [" + seqNo + "]";
assertPrimaryIncomingSequenceNumber(origin, seqNo);
} else if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1)) {
// sequence number should be set when operation origin is not primary
assert seqNo >= 0 : "recovery or replica ops should have an assigned seq no.; origin: " + origin;
}
return true;
}

protected boolean assertPrimaryIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
// sequence number should not be set when operation origin is primary
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO
: "primary operations must never have an assigned sequence number but was [" + seqNo + "]";
return true;
}

private boolean assertSequenceNumberBeforeIndexing(final Engine.Operation.Origin origin, final long seqNo) {
if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1) ||
origin == Operation.Origin.PRIMARY) {
Expand Down Expand Up @@ -776,14 +787,7 @@ public IndexResult index(Index index) throws IOException {
* if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls
* updateDocument.
*/
final IndexingStrategy plan;

if (index.origin() == Operation.Origin.PRIMARY) {
plan = planIndexingAsPrimary(index);
} else {
// non-primary mode (i.e., replica or recovery)
plan = planIndexingAsNonPrimary(index);
}
final IndexingStrategy plan = indexingStrategyForOperation(index);

final IndexResult indexResult;
if (plan.earlyResultOnPreFlightError.isPresent()) {
Expand Down Expand Up @@ -824,7 +828,8 @@ public IndexResult index(Index index) throws IOException {
}
}

private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
assertNonPrimaryOrigin(index);
final IndexingStrategy plan;
if (canOptimizeAddDocument(index) && mayHaveBeenIndexedBefore(index) == false) {
// no need to deal with out of order delivery - we never saw this one
Expand Down Expand Up @@ -867,7 +872,16 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0
return plan;
}

private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
protected IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException {
if (index.origin() == Operation.Origin.PRIMARY) {
return planIndexingAsPrimary(index);
} else {
// non-primary mode (i.e., replica or recovery)
return planIndexingAsNonPrimary(index);
}
}

protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
final IndexingStrategy plan;
// resolve an external operation into an internal one which is safe to replay
Expand Down Expand Up @@ -983,7 +997,7 @@ private static void index(final List<ParseContext.Document> docs, final IndexWri
}
}

private static final class IndexingStrategy {
protected static final class IndexingStrategy {
final boolean currentNotFoundOrDeleted;
final boolean useLuceneUpdateDocument;
final long seqNoForIndexing;
Expand Down Expand Up @@ -1077,12 +1091,7 @@ public DeleteResult delete(Delete delete) throws IOException {
try (ReleasableLock ignored = readLock.acquire(); Releasable ignored2 = acquireLock(delete.uid())) {
ensureOpen();
lastWriteNanos = delete.startTime();
final DeletionStrategy plan;
if (delete.origin() == Operation.Origin.PRIMARY) {
plan = planDeletionAsPrimary(delete);
} else {
plan = planDeletionAsNonPrimary(delete);
}
final DeletionStrategy plan = deletionStrategyForOperation(delete);

if (plan.earlyResultOnPreflightError.isPresent()) {
deleteResult = plan.earlyResultOnPreflightError.get();
Expand Down Expand Up @@ -1121,8 +1130,17 @@ public DeleteResult delete(Delete delete) throws IOException {
return deleteResult;
}

private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
protected DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException {
if (delete.origin() == Operation.Origin.PRIMARY) {
return planDeletionAsPrimary(delete);
} else {
// non-primary mode (i.e., replica or recovery)
return planDeletionAsNonPrimary(delete);
}
}

protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
assertNonPrimaryOrigin(delete);
// drop out of order operations
assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() :
"resolving out of order delivery based on versioning but version type isn't fit for it. got ["
Expand Down Expand Up @@ -1159,7 +1177,12 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0
return plan;
}

private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
protected boolean assertNonPrimaryOrigin(final Operation operation) {
assert operation.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + operation.origin();
return true;
}

protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
// resolve operation from external to internal
final VersionValue versionValue = resolveDocVersion(delete);
Expand Down Expand Up @@ -1210,7 +1233,7 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
}
}

private static final class DeletionStrategy {
protected static final class DeletionStrategy {
// of a rare double delete
final boolean deleteFromLucene;
final boolean currentlyDeleted;
Expand Down