Skip to content

Commit 3c37f4b

Browse files
committed
Refactor handling of version conflicts and sequence numbers
1 parent 1c71393 commit 3c37f4b

File tree

2 files changed

+134
-87
lines changed

2 files changed

+134
-87
lines changed

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 81 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import java.util.HashMap;
7979
import java.util.List;
8080
import java.util.Map;
81+
import java.util.Optional;
8182
import java.util.Set;
8283
import java.util.concurrent.atomic.AtomicBoolean;
8384
import java.util.concurrent.atomic.AtomicInteger;
@@ -86,6 +87,7 @@
8687
import java.util.concurrent.locks.ReentrantLock;
8788
import java.util.function.Function;
8889
import java.util.function.LongSupplier;
90+
import java.util.function.Supplier;
8991

9092
public class InternalEngine extends Engine {
9193

@@ -422,7 +424,7 @@ private SearcherManager createSearcherManager() throws EngineException {
422424

423425
@Override
424426
public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException {
425-
try (ReleasableLock lock = readLock.acquire()) {
427+
try (ReleasableLock ignored = readLock.acquire()) {
426428
ensureOpen();
427429
if (get.realtime()) {
428430
VersionValue versionValue = versionMap.getUnderLock(get.uid());
@@ -444,11 +446,28 @@ public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws
444446
}
445447
}
446448

447-
private boolean checkVersionConflict(
448-
final Operation op,
449-
final long currentVersion,
450-
final long expectedVersion,
451-
final boolean deleted) {
449+
/**
450+
* Checks for version conflicts. If a version conflict exists, the optional return value represents the operation result. Otherwise, if
451+
* no conflicts are found, the optional return value is not present.
452+
*
453+
* @param <T> the result type
454+
* @param op the operation
455+
* @param currentVersion the current version
456+
* @param expectedVersion the expected version
457+
* @param deleted {@code true} if the current version is not found or represents a delete
458+
* @param onSuccess if there is a version conflict that can be ignored, the result of the operation
459+
* @param onFailure if there is a version conflict that can not be ignored, the result of the operation
460+
* @return if there is a version conflict, the optional value is present and represents the operation result, otherwise the return value
461+
* is not present
462+
*/
463+
private <T extends Result> Optional<T> checkVersionConflict(
464+
final Operation op,
465+
final long currentVersion,
466+
final long expectedVersion,
467+
final boolean deleted,
468+
final Supplier<T> onSuccess,
469+
final Function<VersionConflictEngineException, T> onFailure) {
470+
final T result;
452471
if (op.versionType() == VersionType.FORCE) {
453472
if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
454473
// If index was created in 5.0 or later, 'force' is not allowed at all
@@ -462,14 +481,22 @@ private boolean checkVersionConflict(
462481
if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
463482
if (op.origin().isRecovery()) {
464483
// version conflict, but okay
465-
return true;
484+
result = onSuccess.get();
466485
} else {
467486
// fatal version conflict
468-
throw new VersionConflictEngineException(shardId, op.type(), op.id(),
487+
final VersionConflictEngineException e =
488+
new VersionConflictEngineException(
489+
shardId,
490+
op.type(),
491+
op.id(),
469492
op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
493+
result = onFailure.apply(e);
470494
}
495+
496+
return Optional.of(result);
497+
} else {
498+
return Optional.empty();
471499
}
472-
return false;
473500
}
474501

475502
private long checkDeletedAndGCed(VersionValue versionValue) {
@@ -485,7 +512,7 @@ private long checkDeletedAndGCed(VersionValue versionValue) {
485512
@Override
486513
public IndexResult index(Index index) {
487514
IndexResult result;
488-
try (ReleasableLock lock = readLock.acquire()) {
515+
try (ReleasableLock ignored = readLock.acquire()) {
489516
ensureOpen();
490517
if (index.origin().isRecovery()) {
491518
// Don't throttle recovery operations
@@ -584,6 +611,7 @@ private IndexResult innerIndex(Index index) throws IOException {
584611
final Translog.Location location;
585612
final long updatedVersion;
586613
IndexResult indexResult = null;
614+
long seqNo = index.seqNo();
587615
try (Releasable ignored = acquireLock(index.uid())) {
588616
lastWriteNanos = index.startTime();
589617
/* if we have an autoGeneratedID that comes into the engine we can potentially optimize
@@ -648,35 +676,32 @@ private IndexResult innerIndex(Index index) throws IOException {
648676
}
649677
}
650678
final long expectedVersion = index.version();
651-
final boolean conflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted);
652-
653-
final long seqNo;
654-
if (index.origin() == Operation.Origin.PRIMARY) {
655-
if (!conflict) {
679+
final Optional<IndexResult> checkVersionConflictResult =
680+
checkVersionConflict(
681+
index,
682+
currentVersion,
683+
expectedVersion,
684+
deleted,
685+
() -> new IndexResult(currentVersion, index.seqNo(), false),
686+
e -> new IndexResult(e, currentVersion, index.seqNo()));
687+
688+
if (checkVersionConflictResult.isPresent()) {
689+
indexResult = checkVersionConflictResult.get();
690+
} else {
691+
// no version conflict
692+
if (index.origin() == Operation.Origin.PRIMARY) {
656693
seqNo = seqNoService.generateSeqNo();
657-
} else {
658-
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
659694
}
660-
} else {
661-
seqNo = index.seqNo();
662-
}
663695

664-
if (conflict) {
665-
// skip index operation because of version conflict on recovery
666-
indexResult = new IndexResult(expectedVersion, seqNo, false);
667-
} else {
696+
/**
697+
* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
698+
* number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
699+
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
700+
*/
701+
index.parsedDoc().updateSeqID(seqNo, index.primaryTerm());
668702
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
669703
index.parsedDoc().version().setLongValue(updatedVersion);
670704

671-
// Update the document's sequence number and primary term, the
672-
// sequence number here is derived here from either the sequence
673-
// number service if this is on the primary, or the existing
674-
// document's sequence number if this is on the replica. The
675-
// primary term here has already been set, see
676-
// IndexShard.prepareIndex where the Engine.Index operation is
677-
// created
678-
index.parsedDoc().updateSeqID(seqNo, index.primaryTerm());
679-
680705
if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
681706
// document does not exists, we can optimize for create, but double check if assertions are running
682707
assert assertDocDoesNotExist(index, canOptimizeAddDocument == false);
@@ -686,17 +711,17 @@ private IndexResult innerIndex(Index index) throws IOException {
686711
}
687712
indexResult = new IndexResult(updatedVersion, seqNo, deleted);
688713
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
689-
? translog.add(new Translog.Index(index, indexResult))
690-
: null;
714+
? translog.add(new Translog.Index(index, indexResult))
715+
: null;
691716
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
692717
indexResult.setTranslogLocation(location);
693718
}
694719
indexResult.setTook(System.nanoTime() - index.startTime());
695720
indexResult.freeze();
696721
return indexResult;
697722
} finally {
698-
if (indexResult != null && indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
699-
seqNoService.markSeqNoAsCompleted(indexResult.getSeqNo());
723+
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
724+
seqNoService.markSeqNoAsCompleted(seqNo);
700725
}
701726
}
702727

@@ -741,7 +766,7 @@ private static void update(final Term uid, final List<ParseContext.Document> doc
741766
@Override
742767
public DeleteResult delete(Delete delete) {
743768
DeleteResult result;
744-
try (ReleasableLock lock = readLock.acquire()) {
769+
try (ReleasableLock ignored = readLock.acquire()) {
745770
ensureOpen();
746771
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
747772
result = innerDelete(delete);
@@ -766,6 +791,7 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
766791
final long updatedVersion;
767792
final boolean found;
768793
DeleteResult deleteResult = null;
794+
long seqNo = delete.seqNo();
769795
try (Releasable ignored = acquireLock(delete.uid())) {
770796
lastWriteNanos = delete.startTime();
771797
final long currentVersion;
@@ -782,38 +808,37 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
782808

783809
final long expectedVersion = delete.version();
784810

785-
final boolean conflict = checkVersionConflict(delete, currentVersion, expectedVersion, deleted);
786-
787-
final long seqNo;
788-
if (delete.origin() == Operation.Origin.PRIMARY) {
789-
if (!conflict) {
811+
final Optional<DeleteResult> result =
812+
checkVersionConflict(
813+
delete,
814+
currentVersion,
815+
expectedVersion,
816+
deleted,
817+
() -> new DeleteResult(expectedVersion, delete.seqNo(), true),
818+
e -> new DeleteResult(e, expectedVersion, delete.seqNo()));
819+
820+
if (result.isPresent()) {
821+
deleteResult = result.get();
822+
} else {
823+
if (delete.origin() == Operation.Origin.PRIMARY) {
790824
seqNo = seqNoService.generateSeqNo();
791-
} else {
792-
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
793825
}
794-
} else {
795-
seqNo = delete.seqNo();
796-
}
797826

798-
if (conflict) {
799-
// skip executing delete because of version conflict on recovery
800-
deleteResult = new DeleteResult(expectedVersion, seqNo, true);
801-
} else {
802827
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
803828
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
804829
deleteResult = new DeleteResult(updatedVersion, seqNo, found);
805830
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
806-
? translog.add(new Translog.Delete(delete, deleteResult))
807-
: null;
831+
? translog.add(new Translog.Delete(delete, deleteResult))
832+
: null;
808833
versionMap.putUnderLock(delete.uid().bytes(),
809-
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
834+
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
810835
deleteResult.setTranslogLocation(location);
811836
}
812837
deleteResult.setTook(System.nanoTime() - delete.startTime());
813838
deleteResult.freeze();
814839
return deleteResult;
815840
} finally {
816-
if (deleteResult != null && deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
841+
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
817842
seqNoService.markSeqNoAsCompleted(deleteResult.getSeqNo());
818843
}
819844
}

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 53 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,14 @@
152152
import java.util.concurrent.atomic.AtomicInteger;
153153
import java.util.concurrent.atomic.AtomicLong;
154154
import java.util.concurrent.atomic.AtomicReference;
155+
import java.util.function.LongSupplier;
155156
import java.util.function.Supplier;
157+
import java.util.stream.Collectors;
156158

157159
import static java.util.Collections.emptyMap;
158160
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
159161
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
162+
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
160163
import static org.hamcrest.CoreMatchers.instanceOf;
161164
import static org.hamcrest.Matchers.containsString;
162165
import static org.hamcrest.Matchers.equalTo;
@@ -2975,18 +2978,44 @@ public long generateSeqNo() {
29752978
}
29762979
}
29772980

2978-
public void testOutOfOrderSequenceNumbersWithVersionConflict() {
2981+
public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOException {
29792982
final List<Engine.Operation> operations = new ArrayList<>();
2983+
29802984
final int numberOfOperations = randomIntBetween(16, 32);
29812985
final Term uid = newUid("1");
29822986
final Document document = testDocumentWithTextField();
2987+
final AtomicLong sequenceNumber = new AtomicLong();
2988+
final Engine.Operation.Origin origin = randomFrom(PEER_RECOVERY, PRIMARY, PEER_RECOVERY);
2989+
final LongSupplier sequenceNumberSupplier =
2990+
origin == PRIMARY ? () -> SequenceNumbersService.UNASSIGNED_SEQ_NO : sequenceNumber::getAndIncrement;
29832991
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
29842992
final ParsedDocument doc = testParsedDocument("1", "1", "test", null, document, B_1, null);
29852993
for (int i = 0; i < numberOfOperations; i++) {
29862994
if (randomBoolean()) {
2987-
operations.add(indexOperation(uid, doc, i, i));
2995+
final Engine.Index index = new Engine.Index(
2996+
uid,
2997+
doc,
2998+
sequenceNumberSupplier.getAsLong(),
2999+
1,
3000+
i,
3001+
VersionType.EXTERNAL,
3002+
origin,
3003+
System.nanoTime(),
3004+
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
3005+
false);
3006+
operations.add(index);
29883007
} else {
2989-
operations.add(deleteOperation("test", "1", uid, i, i));
3008+
final Engine.Delete delete = new Engine.Delete(
3009+
"test",
3010+
"1",
3011+
uid,
3012+
sequenceNumberSupplier.getAsLong(),
3013+
1,
3014+
i,
3015+
VersionType.EXTERNAL,
3016+
origin,
3017+
System.nanoTime());
3018+
operations.add(delete);
29903019
}
29913020
}
29923021

@@ -3001,38 +3030,31 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() {
30013030
}
30023031
}
30033032

3004-
assertThat(engine.seqNoService().getLocalCheckpoint(), equalTo((long) (numberOfOperations - 1)));
3033+
final long expectedLocalCheckpoint;
3034+
if (origin == PRIMARY) {
3035+
// we can only advance as far as the number of operations that did not conflict
3036+
int count = 0;
3037+
3038+
// each time the version increments as we walk the list, that counts as a successful operation
3039+
long version = -1;
3040+
for (int i = 0; i < numberOfOperations; i++) {
3041+
if (operations.get(i).version() >= version) {
3042+
count++;
3043+
version = operations.get(i).version();
3044+
}
3045+
}
3046+
3047+
// sequence numbers start at zero, so the expected local checkpoint is the number of successful operations minus one
3048+
expectedLocalCheckpoint = count - 1;
3049+
} else {
3050+
expectedLocalCheckpoint = numberOfOperations - 1;
3051+
}
3052+
3053+
assertThat(engine.seqNoService().getLocalCheckpoint(), equalTo(expectedLocalCheckpoint));
30053054
final Engine.GetResult result = engine.get(new Engine.Get(true, uid));
30063055
assertThat(result.exists(), equalTo(exists));
30073056
}
30083057

3009-
private Engine.Index indexOperation(final Term uid, final ParsedDocument doc, final int seqNo, final int version) {
3010-
return new Engine.Index(
3011-
uid,
3012-
doc,
3013-
seqNo,
3014-
1,
3015-
version,
3016-
VersionType.EXTERNAL,
3017-
Engine.Operation.Origin.PEER_RECOVERY,
3018-
System.nanoTime(),
3019-
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
3020-
false);
3021-
}
3022-
3023-
private Engine.Delete deleteOperation(final String type, final String id, final Term uid, final int seqNo, final int version) {
3024-
return new Engine.Delete(
3025-
type,
3026-
id,
3027-
uid,
3028-
seqNo,
3029-
1,
3030-
version,
3031-
VersionType.EXTERNAL,
3032-
Engine.Operation.Origin.PEER_RECOVERY,
3033-
System.nanoTime());
3034-
}
3035-
30363058
/**
30373059
* Return a tuple representing the sequence ID for the given {@code Get}
30383060
* operation. The first value in the tuple is the sequence number, the

0 commit comments

Comments
 (0)