Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -580,39 +580,56 @@ enum OpVsLuceneDocStatus {

private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException {
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found";
final OpVsLuceneDocStatus status;
if (op.seqNo() <= localCheckpointTracker.getCheckpoint()) {
// The operation seq# is lower then the current local checkpoint and thus was already put into Lucene. This
// can happen during recovery where older operations are sent from the translog that are already part of the
// Lucene commit (either from a peer recovery or a local translog) or due to concurrent indexing & recovery.
// For the former it is important to skip Lucene as the operation in question may have been deleted in an
// out of order op that is not replayed.
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
}

VersionValue versionValue = getVersionFromMap(op.uid().bytes());
assert incrementVersionLookup();
if (versionValue != null) {
if (op.seqNo() > versionValue.seqNo ||
(op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term))
status = OpVsLuceneDocStatus.OP_NEWER;
else {
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
if (op.seqNo() <= localCheckpointTracker.getCheckpoint()) {
// Subtle point: we already compared the seqNo vs the local checkpoint, but the local checkpoint could have
// advanced after that comparison was done. If it did, it may have made some tombstones stale, and if those
// tombstones included one for the present op and were cleared before the call to getVersionFromMap() above,
// then the present op would appear to be fresh and would be applied a second time. To avoid this, we must
// check the local checkpoint again: since the local checkpoint only ever advances, checking it after the
// call to getVersionFromMap() gives the correct result.
return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
} else if (versionValue != null) {
if (op.seqNo() > versionValue.seqNo ||
(op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) {
return OpVsLuceneDocStatus.OP_NEWER;
} else {
return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
}
} else {
// load from index
assert incrementIndexVersionLookup();
try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) {
DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid());
if (docAndSeqNo == null) {
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
return OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
} else if (op.seqNo() > docAndSeqNo.seqNo) {
status = OpVsLuceneDocStatus.OP_NEWER;
return OpVsLuceneDocStatus.OP_NEWER;
} else if (op.seqNo() == docAndSeqNo.seqNo) {
// load term to tie break
final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo, op.uid().field());
if (op.primaryTerm() > existingTerm) {
status = OpVsLuceneDocStatus.OP_NEWER;
return OpVsLuceneDocStatus.OP_NEWER;
} else {
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
}
} else {
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
}
}
}
return status;
}

/** resolves the current version of the document, returning null if not found */
Expand Down Expand Up @@ -806,7 +823,6 @@ public IndexResult index(Index index) throws IOException {
}

private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
final IndexingStrategy plan;
final boolean appendOnlyRequest = canOptimizeAddDocument(index);
if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) {
/*
Expand All @@ -818,7 +834,7 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio
* requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization.
*/
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo());
return IndexingStrategy.optimizedAppendOnly(index.seqNo());
} else {
if (appendOnlyRequest == false) {
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr));
Expand All @@ -832,40 +848,27 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio
// unlike the primary, replicas don't really care to about creation status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return false for the created flag in favor of code simplicity
final OpVsLuceneDocStatus opVsLucene;
if (index.seqNo() <= localCheckpointTracker.getCheckpoint()){
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
// this can happen during recovery where older operations are sent from the translog that are already
// part of the lucene commit (either from a peer recovery or a local translog)
// or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in
// question may have been deleted in an out of order op that is not replayed.
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
} else {
opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
}
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version());
return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version());
} else {
plan = IndexingStrategy.processNormally(
return IndexingStrategy.processNormally(
opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.seqNo(), index.version()
);
}
}
return plan;
}

private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
final IndexingStrategy plan;
assert index.origin() == Operation.Origin.PRIMARY : "planning as primary but origin is " + index.origin();
// resolve an external operation into an internal one which is safe to replay
if (canOptimizeAddDocument(index)) {
if (mayHaveBeenIndexedBefore(index)) {
plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L);
final IndexingStrategy plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L);
versionMap.enforceSafeAccess();
return plan;
} else {
plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index));
return IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index));
}
} else {
versionMap.enforceSafeAccess();
Expand All @@ -884,15 +887,14 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
currentVersion, index.version(), currentNotFoundOrDeleted)) {
final VersionConflictEngineException e =
new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
return IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
} else {
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
return IndexingStrategy.processNormally(currentNotFoundOrDeleted,
generateSeqNoForOperation(index),
index.versionType().updateVersion(currentVersion, index.version())
);
}
}
return plan;
}

private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
Expand Down Expand Up @@ -1132,29 +1134,15 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept
// unlike the primary, replicas don't really care to about found status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return true for the found flag in favor of code simplicity
final OpVsLuceneDocStatus opVsLucene;
if (delete.seqNo() <= localCheckpointTracker.getCheckpoint()) {
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
// this can happen during recovery where older operations are sent from the translog that are already
// part of the lucene commit (either from a peer recovery or a local translog)
// or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in
// question may have been deleted in an out of order op that is not replayed.
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
} else {
opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
}
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);

final DeletionStrategy plan;
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version());
return DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version());
} else {
plan = DeletionStrategy.processNormally(
return DeletionStrategy.processNormally(
opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND,
delete.seqNo(), delete.version());
}
return plan;
}

private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
Expand All @@ -1171,17 +1159,15 @@ private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException
currentVersion = versionValue.version;
currentlyDeleted = versionValue.isDelete();
}
final DeletionStrategy plan;
if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
return DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
} else {
plan = DeletionStrategy.processNormally(
return DeletionStrategy.processNormally(
currentlyDeleted,
generateSeqNoForOperation(delete),
delete.versionType().updateVersion(currentVersion, delete.version()));
}
return plan;
}

private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
Expand Down