Skip to content

Commit 78d9f0b

Browse files
committed
Engine: version logic on replicas should not be hard coded (#23998)
The refactoring in #23711 hardcoded version logic for replica to assume monotonic versions. Sadly that's wrong for `FORCE` and `VERSION_GTE`. Instead we should use the methods in VersionType to detect conflicts.
1 parent 98fd29a commit 78d9f0b

File tree

1 file changed

+53
-21
lines changed

1 file changed

+53
-21
lines changed

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1219,25 +1219,41 @@ public void testVersioningCreateExistsException() throws IOException {
12191219

12201220
}
12211221

1222-
protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, boolean externalVersioning, int minOpCount, int maxOpCount) {
1222+
protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, VersionType versionType, int minOpCount, int maxOpCount) {
12231223
final int numOfOps = randomIntBetween(minOpCount, maxOpCount);
12241224
final List<Engine.Operation> ops = new ArrayList<>();
12251225
final Term id = newUid(Uid.createUid("test", "1"));
12261226
final String valuePrefix = forReplica ? "r_" : "p_";
1227-
final VersionType versionType = externalVersioning ? VersionType.EXTERNAL : VersionType.INTERNAL;
12281227
for (int i = 0; i < numOfOps; i++) {
12291228
final Engine.Operation op;
1229+
final long version;
1230+
switch (versionType) {
1231+
case INTERNAL:
1232+
version = forReplica ? i : Versions.MATCH_ANY;
1233+
break;
1234+
case EXTERNAL:
1235+
version = i;
1236+
break;
1237+
case EXTERNAL_GTE:
1238+
version = randomBoolean() ? Math.max(i - 1, 0) : i;
1239+
break;
1240+
case FORCE:
1241+
version = randomNonNegativeLong();
1242+
break;
1243+
default:
1244+
throw new UnsupportedOperationException("unknown version type: " + versionType);
1245+
}
12301246
if (randomBoolean()) {
12311247
op = new Engine.Index(id, testParsedDocument("1", "test", null, System.currentTimeMillis(), -1L,
12321248
testDocumentWithTextField(valuePrefix + i), B_1, null),
1233-
forReplica || externalVersioning ? i : Versions.MATCH_ANY,
1249+
version,
12341250
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
12351251
forReplica ? REPLICA : PRIMARY,
12361252
System.currentTimeMillis(), -1, false
12371253
);
12381254
} else {
12391255
op = new Engine.Delete("test", "1", id,
1240-
forReplica || externalVersioning ? i : Versions.MATCH_ANY,
1256+
version,
12411257
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
12421258
forReplica ? REPLICA : PRIMARY,
12431259
System.currentTimeMillis());
@@ -1248,10 +1264,17 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, bo
12481264
}
12491265

12501266
public void testOutOfOrderDocsOnReplica() throws IOException {
1251-
final List<Engine.Operation> ops = generateSingleDocHistory(true, true, 2, 20);
1252-
assertOpsOnReplica(ops, replicaEngine);
1267+
final List<Engine.Operation> ops = generateSingleDocHistory(true,
1268+
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 20);
1269+
assertOpsOnReplica(ops, replicaEngine, true);
1270+
}
1271+
1272+
public void testNonStandardVersioningOnReplica() throws IOException {
1273+
final List<Engine.Operation> ops = generateSingleDocHistory(true, randomFrom(VersionType.EXTERNAL_GTE, VersionType.FORCE), 2, 20);
1274+
assertOpsOnReplica(ops, replicaEngine, false);
12531275
}
12541276

1277+
12551278
public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException {
12561279
IndexSettings oldSettings = IndexSettingsModule.newIndexSettings("testOld", Settings.builder()
12571280
.put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us
@@ -1264,12 +1287,12 @@ public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException {
12641287
try (Store oldReplicaStore = createStore();
12651288
InternalEngine replicaEngine =
12661289
createEngine(oldSettings, oldReplicaStore, createTempDir("translog-old-replica"), newMergePolicy())) {
1267-
final List<Engine.Operation> ops = generateSingleDocHistory(true, true, 2, 20);
1268-
assertOpsOnReplica(ops, replicaEngine);
1290+
final List<Engine.Operation> ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 20);
1291+
assertOpsOnReplica(ops, replicaEngine, true);
12691292
}
12701293
}
12711294

1272-
private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine replicaEngine) throws IOException {
1295+
private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine replicaEngine, boolean shuffleOps) throws IOException {
12731296
final Engine.Operation lastOp = ops.get(ops.size() - 1);
12741297
final String lastFieldValue;
12751298
if (lastOp instanceof Engine.Index) {
@@ -1279,7 +1302,9 @@ private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine repli
12791302
// delete
12801303
lastFieldValue = null;
12811304
}
1282-
shuffle(ops, random());
1305+
if (shuffleOps) {
1306+
shuffle(ops, random());
1307+
}
12831308
boolean firstOp = true;
12841309
for (Engine.Operation op : ops) {
12851310
logger.info("performing [{}], v [{}]", op.operationType().name().charAt(0), op.version());
@@ -1324,7 +1349,7 @@ private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine repli
13241349
}
13251350

13261351
public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedException {
1327-
final List<Engine.Operation> ops = generateSingleDocHistory(true, true, 100, 300);
1352+
final List<Engine.Operation> ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 100, 300);
13281353
final Engine.Operation lastOp = ops.get(ops.size() - 1);
13291354
final String lastFieldValue;
13301355
if (lastOp instanceof Engine.Index) {
@@ -1384,7 +1409,7 @@ private void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine eng
13841409
}
13851410

13861411
public void testInternalVersioningOnPrimary() throws IOException {
1387-
final List<Engine.Operation> ops = generateSingleDocHistory(false, false, 2, 20);
1412+
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.INTERNAL, 2, 20);
13881413
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
13891414
}
13901415

@@ -1484,8 +1509,11 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
14841509
return opsPerformed;
14851510
}
14861511

1487-
public void testExternalVersioningOnPrimary() throws IOException {
1488-
final List<Engine.Operation> ops = generateSingleDocHistory(false, true, 2, 20);
1512+
public void testNonInternalVersioningOnPrimary() throws IOException {
1513+
final Set<VersionType> nonInternalVersioning = new HashSet<>(Arrays.asList(VersionType.values()));
1514+
nonInternalVersioning.remove(VersionType.INTERNAL);
1515+
final VersionType versionType = randomFrom(nonInternalVersioning);
1516+
final List<Engine.Operation> ops = generateSingleDocHistory(false, versionType, 2, 20);
14891517
final Engine.Operation lastOp = ops.get(ops.size() - 1);
14901518
final String lastFieldValue;
14911519
if (lastOp instanceof Engine.Index) {
@@ -1495,7 +1523,10 @@ public void testExternalVersioningOnPrimary() throws IOException {
14951523
// delete
14961524
lastFieldValue = null;
14971525
}
1498-
shuffle(ops, random());
1526+
// other version types don't support out of order processing.
1527+
if (versionType == VersionType.EXTERNAL) {
1528+
shuffle(ops, random());
1529+
}
14991530
long highestOpVersion = Versions.NOT_FOUND;
15001531
long seqNo = -1;
15011532
boolean docDeleted = true;
@@ -1504,7 +1535,7 @@ public void testExternalVersioningOnPrimary() throws IOException {
15041535
if (op instanceof Engine.Index) {
15051536
final Engine.Index index = (Engine.Index) op;
15061537
Engine.IndexResult result = engine.index(index);
1507-
if (op.version() > highestOpVersion) {
1538+
if (op.versionType().isVersionConflictForWrites(highestOpVersion, op.version(), docDeleted) == false) {
15081539
seqNo++;
15091540
assertThat(result.isCreated(), equalTo(docDeleted));
15101541
assertThat(result.getVersion(), equalTo(op.version()));
@@ -1521,7 +1552,7 @@ public void testExternalVersioningOnPrimary() throws IOException {
15211552
} else {
15221553
final Engine.Delete delete = (Engine.Delete) op;
15231554
Engine.DeleteResult result = engine.delete(delete);
1524-
if (op.version() > highestOpVersion) {
1555+
if (op.versionType().isVersionConflictForWrites(highestOpVersion, op.version(), docDeleted) == false) {
15251556
seqNo++;
15261557
assertThat(result.isFound(), equalTo(docDeleted == false));
15271558
assertThat(result.getVersion(), equalTo(op.version()));
@@ -1546,6 +1577,7 @@ public void testExternalVersioningOnPrimary() throws IOException {
15461577

15471578
assertVisibleCount(engine, docDeleted ? 0 : 1);
15481579
if (docDeleted == false) {
1580+
logger.info("searching for [{}]", lastFieldValue);
15491581
try (Searcher searcher = engine.acquireSearcher("test")) {
15501582
final TotalHitCountCollector collector = new TotalHitCountCollector();
15511583
searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector);
@@ -1555,17 +1587,17 @@ public void testExternalVersioningOnPrimary() throws IOException {
15551587
}
15561588

15571589
public void testVersioningPromotedReplica() throws IOException {
1558-
final List<Engine.Operation> replicaOps = generateSingleDocHistory(true, true, 2, 20);
1559-
List<Engine.Operation> primaryOps = generateSingleDocHistory(false, false, 2, 20);
1590+
final List<Engine.Operation> replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, 2, 20);
1591+
List<Engine.Operation> primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, 2, 20);
15601592
Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1);
15611593
final boolean deletedOnReplica = lastReplicaOp instanceof Engine.Delete;
15621594
final long finalReplicaVersion = lastReplicaOp.version();
1563-
assertOpsOnReplica(replicaOps, replicaEngine);
1595+
assertOpsOnReplica(replicaOps, replicaEngine, true);
15641596
assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine);
15651597
}
15661598

15671599
public void testConcurrentExternalVersioningOnPrimary() throws IOException, InterruptedException {
1568-
final List<Engine.Operation> ops = generateSingleDocHistory(false, true, 100, 300);
1600+
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.EXTERNAL, 100, 300);
15691601
final Engine.Operation lastOp = ops.get(ops.size() - 1);
15701602
final String lastFieldValue;
15711603
if (lastOp instanceof Engine.Index) {

0 commit comments

Comments
 (0)