Skip to content

Commit b636ca7

Browse files
authored
Engine: version logic on replicas should not be hard coded (#23998)
The refactoring in #23711 hardcoded version logic for replica to assume monotonic versions. Sadly that's wrong for `FORCE` and `VERSION_GTE`. Instead we should use the methods in VersionType to detect conflicts. Note - once replicas use sequence numbers for out of order delivery, this logic goes away.
1 parent f0df5e6 commit b636ca7

File tree

2 files changed

+65
-29
lines changed

2 files changed

+65
-29
lines changed

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -441,8 +441,8 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnVersions(final Operation
441441
if (versionValue == null) {
442442
return OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
443443
} else {
444-
return op.version() > versionValue.getVersion() ?
445-
OpVsLuceneDocStatus.OP_NEWER : OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
444+
return op.versionType().isVersionConflictForWrites(versionValue.getVersion(), op.version(), versionValue.isDelete()) ?
445+
OpVsLuceneDocStatus.OP_STALE_OR_EQUAL : OpVsLuceneDocStatus.OP_NEWER;
446446
}
447447
}
448448

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 63 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1309,8 +1309,9 @@ public void testVersioningCreateExistsException() throws IOException {
13091309
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
13101310
}
13111311

1312-
protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, boolean externalVersioning, boolean partialOldPrimary,
1313-
long primaryTerm, int minOpCount, int maxOpCount) {
1312+
protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, VersionType versionType,
1313+
boolean partialOldPrimary, long primaryTerm,
1314+
int minOpCount, int maxOpCount) {
13141315
final int numOfOps = randomIntBetween(minOpCount, maxOpCount);
13151316
final List<Engine.Operation> ops = new ArrayList<>();
13161317
final Term id = newUid(Uid.createUid("test", "1"));
@@ -1322,14 +1323,30 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, bo
13221323
}
13231324
final String valuePrefix = forReplica ? "r_" : "p_";
13241325
final boolean incrementTermWhenIntroducingSeqNo = randomBoolean();
1325-
final VersionType versionType = externalVersioning ? VersionType.EXTERNAL : VersionType.INTERNAL;
13261326
for (int i = 0; i < numOfOps; i++) {
13271327
final Engine.Operation op;
1328+
final long version;
1329+
switch (versionType) {
1330+
case INTERNAL:
1331+
version = forReplica ? i : Versions.MATCH_ANY;
1332+
break;
1333+
case EXTERNAL:
1334+
version = i;
1335+
break;
1336+
case EXTERNAL_GTE:
1337+
version = randomBoolean() ? Math.max(i - 1, 0) : i;
1338+
break;
1339+
case FORCE:
1340+
version = randomNonNegativeLong();
1341+
break;
1342+
default:
1343+
throw new UnsupportedOperationException("unknown version type: " + versionType);
1344+
}
13281345
if (randomBoolean()) {
13291346
op = new Engine.Index(id, testParsedDocument("1", "test", null, testDocumentWithTextField(valuePrefix + i), B_1, null),
13301347
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbersService.UNASSIGNED_SEQ_NO,
13311348
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
1332-
forReplica || externalVersioning ? i : Versions.MATCH_ANY,
1349+
version,
13331350
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
13341351
forReplica ? REPLICA : PRIMARY,
13351352
System.currentTimeMillis(), -1, false
@@ -1338,7 +1355,7 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, bo
13381355
op = new Engine.Delete("test", "1", id,
13391356
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbersService.UNASSIGNED_SEQ_NO,
13401357
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
1341-
forReplica || externalVersioning ? i : Versions.MATCH_ANY,
1358+
version,
13421359
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
13431360
forReplica ? REPLICA : PRIMARY,
13441361
System.currentTimeMillis());
@@ -1349,10 +1366,20 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, bo
13491366
}
13501367

13511368
public void testOutOfOrderDocsOnReplica() throws IOException {
1352-
final List<Engine.Operation> ops = generateSingleDocHistory(true, true, false, 2, 2, 20);
1353-
assertOpsOnReplica(ops, replicaEngine);
1369+
final List<Engine.Operation> ops = generateSingleDocHistory(true,
1370+
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 2, 20);
1371+
assertOpsOnReplica(ops, replicaEngine, true);
13541372
}
13551373

1374+
public void testNonStandardVersioningOnReplica() throws IOException {
1375+
// TODO: this can be folded into testOutOfOrderDocsOnReplica once out of order
1376+
// is detected using seq#
1377+
final List<Engine.Operation> ops = generateSingleDocHistory(true,
1378+
randomFrom(VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20);
1379+
assertOpsOnReplica(ops, replicaEngine, false);
1380+
}
1381+
1382+
13561383
public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException {
13571384
IndexSettings oldSettings = IndexSettingsModule.newIndexSettings("testOld", Settings.builder()
13581385
.put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us
@@ -1365,12 +1392,12 @@ public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException {
13651392
try (Store oldReplicaStore = createStore();
13661393
InternalEngine replicaEngine =
13671394
createEngine(oldSettings, oldReplicaStore, createTempDir("translog-old-replica"), newMergePolicy())) {
1368-
final List<Engine.Operation> ops = generateSingleDocHistory(true, true, true, 2, 2, 20);
1369-
assertOpsOnReplica(ops, replicaEngine);
1395+
final List<Engine.Operation> ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), true, 2, 2, 20);
1396+
assertOpsOnReplica(ops, replicaEngine, true);
13701397
}
13711398
}
13721399

1373-
private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine replicaEngine) throws IOException {
1400+
private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine replicaEngine, boolean shuffleOps) throws IOException {
13741401
final Engine.Operation lastOp = ops.get(ops.size() - 1);
13751402
final String lastFieldValue;
13761403
if (lastOp instanceof Engine.Index) {
@@ -1380,13 +1407,15 @@ private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine repli
13801407
// delete
13811408
lastFieldValue = null;
13821409
}
1383-
int firstOpWithSeqNo = 0;
1384-
while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) {
1385-
firstOpWithSeqNo++;
1410+
if (shuffleOps) {
1411+
int firstOpWithSeqNo = 0;
1412+
while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) {
1413+
firstOpWithSeqNo++;
1414+
}
1415+
// shuffle ops but make sure legacy ops are first
1416+
shuffle(ops.subList(0, firstOpWithSeqNo), random());
1417+
shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random());
13861418
}
1387-
// shuffle ops but make sure legacy ops are first
1388-
shuffle(ops.subList(0, firstOpWithSeqNo), random());
1389-
shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random());
13901419
boolean firstOp = true;
13911420
for (Engine.Operation op : ops) {
13921421
logger.info("performing [{}], v [{}], seq# [{}], term [{}]",
@@ -1432,7 +1461,7 @@ private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine repli
14321461
}
14331462

14341463
public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedException {
1435-
final List<Engine.Operation> ops = generateSingleDocHistory(true, true, false, 2, 100, 300);
1464+
final List<Engine.Operation> ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 100, 300);
14361465
final Engine.Operation lastOp = ops.get(ops.size() - 1);
14371466
final String lastFieldValue;
14381467
if (lastOp instanceof Engine.Index) {
@@ -1492,7 +1521,7 @@ private void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine eng
14921521
}
14931522

14941523
public void testInternalVersioningOnPrimary() throws IOException {
1495-
final List<Engine.Operation> ops = generateSingleDocHistory(false, false, false, 2, 2, 20);
1524+
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20);
14961525
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
14971526
}
14981527

@@ -1595,8 +1624,11 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
15951624
return opsPerformed;
15961625
}
15971626

1598-
public void testExternalVersioningOnPrimary() throws IOException {
1599-
final List<Engine.Operation> ops = generateSingleDocHistory(false, true, false, 2, 2, 20);
1627+
public void testNonInternalVersioningOnPrimary() throws IOException {
1628+
final Set<VersionType> nonInternalVersioning = new HashSet<>(Arrays.asList(VersionType.values()));
1629+
nonInternalVersioning.remove(VersionType.INTERNAL);
1630+
final VersionType versionType = randomFrom(nonInternalVersioning);
1631+
final List<Engine.Operation> ops = generateSingleDocHistory(false, versionType, false, 2, 2, 20);
16001632
final Engine.Operation lastOp = ops.get(ops.size() - 1);
16011633
final String lastFieldValue;
16021634
if (lastOp instanceof Engine.Index) {
@@ -1606,7 +1638,10 @@ public void testExternalVersioningOnPrimary() throws IOException {
16061638
// delete
16071639
lastFieldValue = null;
16081640
}
1609-
shuffle(ops, random());
1641+
// other version types don't support out of order processing.
1642+
if (versionType == VersionType.EXTERNAL) {
1643+
shuffle(ops, random());
1644+
}
16101645
long highestOpVersion = Versions.NOT_FOUND;
16111646
long seqNo = -1;
16121647
boolean docDeleted = true;
@@ -1616,7 +1651,7 @@ public void testExternalVersioningOnPrimary() throws IOException {
16161651
if (op instanceof Engine.Index) {
16171652
final Engine.Index index = (Engine.Index) op;
16181653
Engine.IndexResult result = engine.index(index);
1619-
if (op.version() > highestOpVersion) {
1654+
if (op.versionType().isVersionConflictForWrites(highestOpVersion, op.version(), docDeleted) == false) {
16201655
seqNo++;
16211656
assertThat(result.getSeqNo(), equalTo(seqNo));
16221657
assertThat(result.isCreated(), equalTo(docDeleted));
@@ -1634,7 +1669,7 @@ public void testExternalVersioningOnPrimary() throws IOException {
16341669
} else {
16351670
final Engine.Delete delete = (Engine.Delete) op;
16361671
Engine.DeleteResult result = engine.delete(delete);
1637-
if (op.version() > highestOpVersion) {
1672+
if (op.versionType().isVersionConflictForWrites(highestOpVersion, op.version(), docDeleted) == false) {
16381673
seqNo++;
16391674
assertThat(result.getSeqNo(), equalTo(seqNo));
16401675
assertThat(result.isFound(), equalTo(docDeleted == false));
@@ -1660,6 +1695,7 @@ public void testExternalVersioningOnPrimary() throws IOException {
16601695

16611696
assertVisibleCount(engine, docDeleted ? 0 : 1);
16621697
if (docDeleted == false) {
1698+
logger.info("searching for [{}]", lastFieldValue);
16631699
try (Searcher searcher = engine.acquireSearcher("test")) {
16641700
final TotalHitCountCollector collector = new TotalHitCountCollector();
16651701
searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector);
@@ -1669,13 +1705,13 @@ public void testExternalVersioningOnPrimary() throws IOException {
16691705
}
16701706

16711707
public void testVersioningPromotedReplica() throws IOException {
1672-
final List<Engine.Operation> replicaOps = generateSingleDocHistory(true, true, false, 1, 2, 20);
1673-
List<Engine.Operation> primaryOps = generateSingleDocHistory(false, false, false, 2, 2, 20);
1708+
final List<Engine.Operation> replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, false, 1, 2, 20);
1709+
List<Engine.Operation> primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20);
16741710
Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1);
16751711
final boolean deletedOnReplica = lastReplicaOp instanceof Engine.Delete;
16761712
final long finalReplicaVersion = lastReplicaOp.version();
16771713
final long finalReplicaSeqNo = lastReplicaOp.seqNo();
1678-
assertOpsOnReplica(replicaOps, replicaEngine);
1714+
assertOpsOnReplica(replicaOps, replicaEngine, true);
16791715
final int opsOnPrimary = assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine);
16801716
final long currentSeqNo = getSequenceID(replicaEngine, new Engine.Get(false, lastReplicaOp.uid())).v1();
16811717
try (Searcher searcher = engine.acquireSearcher("test")) {
@@ -1689,7 +1725,7 @@ public void testVersioningPromotedReplica() throws IOException {
16891725
}
16901726

16911727
public void testConcurrentExternalVersioningOnPrimary() throws IOException, InterruptedException {
1692-
final List<Engine.Operation> ops = generateSingleDocHistory(false, true, false, 2, 100, 300);
1728+
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.EXTERNAL, false, 2, 100, 300);
16931729
final Engine.Operation lastOp = ops.get(ops.size() - 1);
16941730
final String lastFieldValue;
16951731
if (lastOp instanceof Engine.Index) {

0 commit comments

Comments
 (0)