Skip to content

Commit 9993cf3

Browse files
authored
Use standard semantics for retried auto-id requests (#47311)
Adds support for handling auto-id requests with optype CREATE. Also simplifies the code handling this by using the standard indexing path when dealing with possible retry conflicts. Relates #47169
1 parent dea96fb commit 9993cf3

File tree

2 files changed

+96
-97
lines changed

2 files changed

+96
-97
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -999,13 +999,9 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
999999
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
10001000
final IndexingStrategy plan;
10011001
// resolve an external operation into an internal one which is safe to replay
1002-
if (canOptimizeAddDocument(index)) {
1003-
if (mayHaveBeenIndexedBefore(index)) {
1004-
plan = IndexingStrategy.overrideExistingAsIfNotThere();
1005-
versionMap.enforceSafeAccess();
1006-
} else {
1007-
plan = IndexingStrategy.optimizedAppendOnly(1L);
1008-
}
1002+
final boolean canOptimizeAddDocument = canOptimizeAddDocument(index);
1003+
if (canOptimizeAddDocument && mayHaveBeenIndexedBefore(index) == false) {
1004+
plan = IndexingStrategy.optimizedAppendOnly(1L);
10091005
} else {
10101006
versionMap.enforceSafeAccess();
10111007
// resolves incoming version
@@ -1038,7 +1034,7 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
10381034
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
10391035
} else {
10401036
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
1041-
index.versionType().updateVersion(currentVersion, index.version())
1037+
canOptimizeAddDocument ? 1L : index.versionType().updateVersion(currentVersion, index.version())
10421038
);
10431039
}
10441040
}
@@ -1190,11 +1186,6 @@ static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted,
11901186
true, false, versionForIndexing, null);
11911187
}
11921188

1193-
static IndexingStrategy overrideExistingAsIfNotThere() {
1194-
return new IndexingStrategy(true, true, true,
1195-
false, 1L, null);
1196-
}
1197-
11981189
public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long versionForIndexing) {
11991190
return new IndexingStrategy(currentNotFoundOrDeleted, false, false,
12001191
false, versionForIndexing, null);

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 92 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -3420,49 +3420,71 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
34203420
public void testDoubleDeliveryPrimary() throws IOException {
34213421
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(),
34223422
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
3423-
Engine.Index operation = appendOnlyPrimary(doc, false, 1);
3424-
Engine.Index retry = appendOnlyPrimary(doc, true, 1);
3423+
final boolean create = randomBoolean();
3424+
Engine.Index operation = appendOnlyPrimary(doc, false, 1, create);
3425+
Engine.Index retry = appendOnlyPrimary(doc, true, 1, create);
34253426
if (randomBoolean()) {
34263427
Engine.IndexResult indexResult = engine.index(operation);
34273428
assertLuceneOperations(engine, 1, 0, 0);
34283429
assertEquals(0, engine.getNumVersionLookups());
34293430
assertNotNull(indexResult.getTranslogLocation());
34303431
Engine.IndexResult retryResult = engine.index(retry);
3431-
assertLuceneOperations(engine, 1, 1, 0);
3432-
assertEquals(0, engine.getNumVersionLookups());
3433-
assertNotNull(retryResult.getTranslogLocation());
3434-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
3432+
assertLuceneOperations(engine, 1, create ? 0 : 1, 0);
3433+
assertEquals(1, engine.getNumVersionLookups());
3434+
if (create) {
3435+
assertNull(retryResult.getTranslogLocation());
3436+
} else {
3437+
assertNotNull(retryResult.getTranslogLocation());
3438+
}
34353439
} else {
34363440
Engine.IndexResult retryResult = engine.index(retry);
3437-
assertLuceneOperations(engine, 0, 1, 0);
3438-
assertEquals(0, engine.getNumVersionLookups());
3441+
assertLuceneOperations(engine, 1, 0, 0);
3442+
assertEquals(1, engine.getNumVersionLookups());
34393443
assertNotNull(retryResult.getTranslogLocation());
34403444
Engine.IndexResult indexResult = engine.index(operation);
3441-
assertLuceneOperations(engine, 0, 2, 0);
3442-
assertEquals(0, engine.getNumVersionLookups());
3445+
assertLuceneOperations(engine, 1, create ? 0 : 1, 0);
3446+
assertEquals(2, engine.getNumVersionLookups());
34433447
assertNotNull(retryResult.getTranslogLocation());
3444-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
3448+
if (create) {
3449+
assertNull(indexResult.getTranslogLocation());
3450+
} else {
3451+
assertNotNull(indexResult.getTranslogLocation());
3452+
}
34453453
}
34463454

34473455
engine.refresh("test");
34483456
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
34493457
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
34503458
assertEquals(1, topDocs.totalHits.value);
34513459
}
3452-
operation = appendOnlyPrimary(doc, false, 1);
3453-
retry = appendOnlyPrimary(doc, true, 1);
3460+
operation = appendOnlyPrimary(doc, false, 1, create);
3461+
retry = appendOnlyPrimary(doc, true, 1, create);
34543462
if (randomBoolean()) {
34553463
Engine.IndexResult indexResult = engine.index(operation);
3456-
assertNotNull(indexResult.getTranslogLocation());
3464+
if (create) {
3465+
assertNull(indexResult.getTranslogLocation());
3466+
} else {
3467+
assertNotNull(indexResult.getTranslogLocation());
3468+
}
34573469
Engine.IndexResult retryResult = engine.index(retry);
3458-
assertNotNull(retryResult.getTranslogLocation());
3459-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
3470+
if (create) {
3471+
assertNull(retryResult.getTranslogLocation());
3472+
} else {
3473+
assertNotNull(retryResult.getTranslogLocation());
3474+
}
34603475
} else {
34613476
Engine.IndexResult retryResult = engine.index(retry);
3462-
assertNotNull(retryResult.getTranslogLocation());
3477+
if (create) {
3478+
assertNull(retryResult.getTranslogLocation());
3479+
} else {
3480+
assertNotNull(retryResult.getTranslogLocation());
3481+
}
34633482
Engine.IndexResult indexResult = engine.index(operation);
3464-
assertNotNull(retryResult.getTranslogLocation());
3465-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
3483+
if (create) {
3484+
assertNull(indexResult.getTranslogLocation());
3485+
} else {
3486+
assertNotNull(indexResult.getTranslogLocation());
3487+
}
34663488
}
34673489

34683490
engine.refresh("test");
@@ -3519,60 +3541,53 @@ public void testDoubleDeliveryReplicaAppendingAndDeleteOnly() throws IOException
35193541
public void testDoubleDeliveryReplicaAppendingOnly() throws IOException {
35203542
final Supplier<ParsedDocument> doc = () -> testParsedDocument("1", null, testDocumentWithTextField(),
35213543
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
3522-
Engine.Index operation = appendOnlyReplica(doc.get(), false, 1, randomIntBetween(0, 5));
3523-
Engine.Index retry = appendOnlyReplica(doc.get(), true, 1, randomIntBetween(0, 5));
3524-
// operations with a seq# equal or lower to the local checkpoint are not indexed to lucene
3525-
// and the version lookup is skipped
3526-
final boolean sameSeqNo = operation.seqNo() == retry.seqNo();
3527-
if (randomBoolean()) {
3528-
Engine.IndexResult indexResult = engine.index(operation);
3529-
assertLuceneOperations(engine, 1, 0, 0);
3530-
assertEquals(0, engine.getNumVersionLookups());
3531-
assertNotNull(indexResult.getTranslogLocation());
3532-
Engine.IndexResult retryResult = engine.index(retry);
3533-
if (retry.seqNo() > operation.seqNo()) {
3534-
assertLuceneOperations(engine, 1, 1, 0);
3535-
} else {
3536-
assertLuceneOperations(engine, 1, 0, 0);
3537-
}
3538-
assertEquals(sameSeqNo ? 0 : 1, engine.getNumVersionLookups());
3539-
assertNotNull(retryResult.getTranslogLocation());
3540-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
3541-
} else {
3542-
Engine.IndexResult retryResult = engine.index(retry);
3543-
assertLuceneOperations(engine, 1, 0, 0);
3544-
assertEquals(0, engine.getNumVersionLookups());
3545-
assertNotNull(retryResult.getTranslogLocation());
3546-
Engine.IndexResult indexResult = engine.index(operation);
3547-
if (operation.seqNo() > retry.seqNo()) {
3548-
assertLuceneOperations(engine, 1, 1, 0);
3549-
} else {
3550-
assertLuceneOperations(engine, 1, 0, 0);
3551-
}
3552-
assertEquals(sameSeqNo ? 0 : 1, engine.getNumVersionLookups());
3553-
assertNotNull(retryResult.getTranslogLocation());
3554-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
3555-
}
3544+
boolean replicaOperationIsRetry = randomBoolean();
3545+
Engine.Index operation = appendOnlyReplica(doc.get(), replicaOperationIsRetry, 1, randomIntBetween(0, 5));
35563546

3547+
Engine.IndexResult result = engine.index(operation);
3548+
assertLuceneOperations(engine, 1, 0, 0);
3549+
assertEquals(0, engine.getNumVersionLookups());
3550+
assertNotNull(result.getTranslogLocation());
3551+
3552+
// promote to primary: first do refresh
35573553
engine.refresh("test");
35583554
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
35593555
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
35603556
assertEquals(1, topDocs.totalHits.value);
35613557
}
3562-
operation = randomAppendOnly(doc.get(), false, 1);
3563-
retry = randomAppendOnly(doc.get(), true, 1);
3558+
3559+
final boolean create = randomBoolean();
3560+
operation = appendOnlyPrimary(doc.get(), false, 1, create);
3561+
Engine.Index retry = appendOnlyPrimary(doc.get(), true, 1, create);
35643562
if (randomBoolean()) {
3565-
Engine.IndexResult indexResult = engine.index(operation);
3566-
assertNotNull(indexResult.getTranslogLocation());
3563+
// if the replica operation wasn't a retry, the operation arriving on the newly promoted primary must be a retry
3564+
if (replicaOperationIsRetry) {
3565+
Engine.IndexResult indexResult = engine.index(operation);
3566+
if (create) {
3567+
assertNull(indexResult.getTranslogLocation());
3568+
} else {
3569+
assertNotNull(indexResult.getTranslogLocation());
3570+
}
3571+
}
35673572
Engine.IndexResult retryResult = engine.index(retry);
3568-
assertNotNull(retryResult.getTranslogLocation());
3569-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
3573+
if (create) {
3574+
assertNull(retryResult.getTranslogLocation());
3575+
} else {
3576+
assertNotNull(retryResult.getTranslogLocation());
3577+
}
35703578
} else {
35713579
Engine.IndexResult retryResult = engine.index(retry);
3572-
assertNotNull(retryResult.getTranslogLocation());
3580+
if (create) {
3581+
assertNull(retryResult.getTranslogLocation());
3582+
} else {
3583+
assertNotNull(retryResult.getTranslogLocation());
3584+
}
35733585
Engine.IndexResult indexResult = engine.index(operation);
3574-
assertNotNull(retryResult.getTranslogLocation());
3575-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
3586+
if (create) {
3587+
assertNull(indexResult.getTranslogLocation());
3588+
} else {
3589+
assertNotNull(indexResult.getTranslogLocation());
3590+
}
35763591
}
35773592

35783593
engine.refresh("test");
@@ -3650,10 +3665,11 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep
36503665
assertThat(indexResult.getVersion(), equalTo(1L));
36513666

36523667
isRetry = true;
3653-
index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL,
3668+
index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL,
36543669
PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
36553670
indexResult = engine.index(index);
36563671
assertThat(indexResult.getVersion(), equalTo(1L));
3672+
assertNotEquals(indexResult.getSeqNo(), UNASSIGNED_SEQ_NO);
36573673
engine.refresh("test");
36583674
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
36593675
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
@@ -3694,7 +3710,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs()
36943710
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO,
36953711
0);
36963712
Engine.IndexResult indexResult = engine.index(secondIndexRequest);
3697-
assertTrue(indexResult.isCreated());
3713+
assertFalse(indexResult.isCreated());
36983714
engine.refresh("test");
36993715
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
37003716
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
@@ -3719,12 +3735,16 @@ public Engine.Index randomAppendOnly(ParsedDocument doc, boolean retry, final lo
37193735
}
37203736
}
37213737

3722-
public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp) {
3723-
return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, randomBoolean() ? Versions.MATCH_DELETED : Versions.MATCH_ANY,
3738+
public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, boolean create) {
3739+
return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, create ? Versions.MATCH_DELETED : Versions.MATCH_ANY,
37243740
VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, retry,
37253741
UNASSIGNED_SEQ_NO, 0);
37263742
}
37273743

3744+
public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp) {
3745+
return appendOnlyPrimary(doc, retry, autoGeneratedIdTimestamp, randomBoolean());
3746+
}
3747+
37283748
public Engine.Index appendOnlyReplica(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, final long seqNo) {
37293749
return new Engine.Index(newUid(doc), doc, seqNo, 2, 1, null,
37303750
Engine.Operation.Origin.REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, retry, UNASSIGNED_SEQ_NO, 0);
@@ -3735,14 +3755,15 @@ public void testRetryConcurrently() throws InterruptedException, IOException {
37353755
int numDocs = randomIntBetween(1000, 10000);
37363756
List<Engine.Index> docs = new ArrayList<>();
37373757
final boolean primary = randomBoolean();
3758+
final boolean create = randomBoolean();
37383759
for (int i = 0; i < numDocs; i++) {
37393760
final ParsedDocument doc = testParsedDocument(Integer.toString(i), null,
37403761
testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
37413762
final Engine.Index originalIndex;
37423763
final Engine.Index retryIndex;
37433764
if (primary) {
3744-
originalIndex = appendOnlyPrimary(doc, false, i);
3745-
retryIndex = appendOnlyPrimary(doc, true, i);
3765+
originalIndex = appendOnlyPrimary(doc, false, i, create);
3766+
retryIndex = appendOnlyPrimary(doc, true, i, create);
37463767
} else {
37473768
originalIndex = appendOnlyReplica(doc, false, i, i * 2);
37483769
retryIndex = appendOnlyReplica(doc, true, i, i * 2);
@@ -3775,25 +3796,12 @@ public void testRetryConcurrently() throws InterruptedException, IOException {
37753796
for (int i = 0; i < thread.length; i++) {
37763797
thread[i].join();
37773798
}
3778-
if (primary) {
3779-
assertEquals(0, engine.getNumVersionLookups());
3780-
assertEquals(0, engine.getNumIndexVersionsLookups());
3781-
} else {
3782-
// we don't really know what order the operations will arrive and thus can't predict how many
3783-
// version lookups will be needed
3784-
assertThat(engine.getNumIndexVersionsLookups(), lessThanOrEqualTo(engine.getNumVersionLookups()));
3785-
}
37863799
engine.refresh("test");
37873800
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
37883801
int count = searcher.count(new MatchAllDocsQuery());
37893802
assertEquals(numDocs, count);
37903803
}
3791-
if (primary) {
3792-
// primaries rely on lucene dedup and may index the same document twice
3793-
assertThat(engine.getNumDocUpdates(), greaterThanOrEqualTo((long) numDocs));
3794-
assertThat(engine.getNumDocAppends() + engine.getNumDocUpdates(), equalTo(numDocs * 2L));
3795-
} else {
3796-
// replicas rely on seq# based dedup and in this setup (same seq#) should never rely on lucene
3804+
if (create || primary == false) {
37973805
assertLuceneOperations(engine, numDocs, 0, 0);
37983806
}
37993807
}
@@ -3825,7 +3833,7 @@ public void testEngineMaxTimestampIsInitialized() throws IOException {
38253833
assertEquals(timestamp1, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp());
38263834
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(),
38273835
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
3828-
engine.index(appendOnlyPrimary(doc, true, timestamp2));
3836+
engine.index(appendOnlyPrimary(doc, true, timestamp2, false));
38293837
assertEquals(maxTimestamp12, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp());
38303838
globalCheckpoint.set(1); // make sure flush cleans up commits for later.
38313839
engine.flush();
@@ -4770,7 +4778,7 @@ public void testSeqNoGenerator() throws IOException {
47704778
parsedDocument,
47714779
UNASSIGNED_SEQ_NO,
47724780
randomIntBetween(1, 8),
4773-
Versions.MATCH_ANY,
4781+
Versions.NOT_FOUND,
47744782
VersionType.INTERNAL,
47754783
Engine.Operation.Origin.PRIMARY,
47764784
System.nanoTime(),

0 commit comments

Comments
 (0)