Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,8 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnVersions(final Operation
if (versionValue == null) {
return OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
} else {
return op.version() > versionValue.getVersion() ?
OpVsLuceneDocStatus.OP_NEWER : OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
return op.versionType().isVersionConflictForWrites(versionValue.getVersion(), op.version(), versionValue.isDelete()) ?
OpVsLuceneDocStatus.OP_STALE_OR_EQUAL : OpVsLuceneDocStatus.OP_NEWER;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1309,8 +1309,9 @@ public void testVersioningCreateExistsException() throws IOException {
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}

protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, boolean externalVersioning, boolean partialOldPrimary,
long primaryTerm, int minOpCount, int maxOpCount) {
protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, VersionType versionType,
boolean partialOldPrimary, long primaryTerm,
int minOpCount, int maxOpCount) {
final int numOfOps = randomIntBetween(minOpCount, maxOpCount);
final List<Engine.Operation> ops = new ArrayList<>();
final Term id = newUid(Uid.createUid("test", "1"));
Expand All @@ -1322,14 +1323,30 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, bo
}
final String valuePrefix = forReplica ? "r_" : "p_";
final boolean incrementTermWhenIntroducingSeqNo = randomBoolean();
final VersionType versionType = externalVersioning ? VersionType.EXTERNAL : VersionType.INTERNAL;
for (int i = 0; i < numOfOps; i++) {
final Engine.Operation op;
final long version;
switch (versionType) {
case INTERNAL:
version = forReplica ? i : Versions.MATCH_ANY;
break;
case EXTERNAL:
version = i;
break;
case EXTERNAL_GTE:
version = randomBoolean() ? Math.max(i - 1, 0) : i;
break;
case FORCE:
version = randomNonNegativeLong();
break;
default:
throw new UnsupportedOperationException("unknown version type: " + versionType);
}
if (randomBoolean()) {
op = new Engine.Index(id, testParsedDocument("1", "test", null, testDocumentWithTextField(valuePrefix + i), B_1, null),
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbersService.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
forReplica || externalVersioning ? i : Versions.MATCH_ANY,
version,
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
forReplica ? REPLICA : PRIMARY,
System.currentTimeMillis(), -1, false
Expand All @@ -1338,7 +1355,7 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, bo
op = new Engine.Delete("test", "1", id,
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbersService.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
forReplica || externalVersioning ? i : Versions.MATCH_ANY,
version,
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
forReplica ? REPLICA : PRIMARY,
System.currentTimeMillis());
Expand All @@ -1349,10 +1366,20 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, bo
}

public void testOutOfOrderDocsOnReplica() throws IOException {
final List<Engine.Operation> ops = generateSingleDocHistory(true, true, false, 2, 2, 20);
assertOpsOnReplica(ops, replicaEngine);
final List<Engine.Operation> ops = generateSingleDocHistory(true,
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 2, 20);
assertOpsOnReplica(ops, replicaEngine, true);
}

public void testNonStandardVersioningOnReplica() throws IOException {
// TODO: this can be folded into testOutOfOrderDocsOnReplica once out of order
// is detected using seq#
final List<Engine.Operation> ops = generateSingleDocHistory(true,
randomFrom(VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20);
assertOpsOnReplica(ops, replicaEngine, false);
}


public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException {
IndexSettings oldSettings = IndexSettingsModule.newIndexSettings("testOld", Settings.builder()
.put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us
Expand All @@ -1365,12 +1392,12 @@ public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException {
try (Store oldReplicaStore = createStore();
InternalEngine replicaEngine =
createEngine(oldSettings, oldReplicaStore, createTempDir("translog-old-replica"), newMergePolicy())) {
final List<Engine.Operation> ops = generateSingleDocHistory(true, true, true, 2, 2, 20);
assertOpsOnReplica(ops, replicaEngine);
final List<Engine.Operation> ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), true, 2, 2, 20);
assertOpsOnReplica(ops, replicaEngine, true);
}
}

private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine replicaEngine) throws IOException {
private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine replicaEngine, boolean shuffleOps) throws IOException {
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
Expand All @@ -1380,13 +1407,15 @@ private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine repli
// delete
lastFieldValue = null;
}
int firstOpWithSeqNo = 0;
while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) {
firstOpWithSeqNo++;
if (shuffleOps) {
int firstOpWithSeqNo = 0;
while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) {
firstOpWithSeqNo++;
}
// shuffle ops but make sure legacy ops are first
shuffle(ops.subList(0, firstOpWithSeqNo), random());
shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random());
}
// shuffle ops but make sure legacy ops are first
shuffle(ops.subList(0, firstOpWithSeqNo), random());
shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random());
boolean firstOp = true;
for (Engine.Operation op : ops) {
logger.info("performing [{}], v [{}], seq# [{}], term [{}]",
Expand Down Expand Up @@ -1432,7 +1461,7 @@ private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine repli
}

public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedException {
final List<Engine.Operation> ops = generateSingleDocHistory(true, true, false, 2, 100, 300);
final List<Engine.Operation> ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 100, 300);
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
Expand Down Expand Up @@ -1492,7 +1521,7 @@ private void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine eng
}

public void testInternalVersioningOnPrimary() throws IOException {
final List<Engine.Operation> ops = generateSingleDocHistory(false, false, false, 2, 2, 20);
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20);
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
}

Expand Down Expand Up @@ -1593,8 +1622,11 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
return opsPerformed;
}

public void testExternalVersioningOnPrimary() throws IOException {
final List<Engine.Operation> ops = generateSingleDocHistory(false, true, false, 2, 2, 20);
public void testNonInternalVersioningOnPrimary() throws IOException {
final Set<VersionType> nonInternalVersioning = new HashSet<>(Arrays.asList(VersionType.values()));
nonInternalVersioning.remove(VersionType.INTERNAL);
final VersionType versionType = randomFrom(nonInternalVersioning);
final List<Engine.Operation> ops = generateSingleDocHistory(false, versionType, false, 2, 2, 20);
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
Expand All @@ -1604,7 +1636,10 @@ public void testExternalVersioningOnPrimary() throws IOException {
// delete
lastFieldValue = null;
}
shuffle(ops, random());
// other version types don't support out of order processing.
if (versionType == VersionType.EXTERNAL) {
shuffle(ops, random());
}
long highestOpVersion = Versions.NOT_FOUND;
long seqNo = -1;
boolean docDeleted = true;
Expand All @@ -1614,7 +1649,7 @@ public void testExternalVersioningOnPrimary() throws IOException {
if (op instanceof Engine.Index) {
final Engine.Index index = (Engine.Index) op;
Engine.IndexResult result = engine.index(index);
if (op.version() > highestOpVersion) {
if (op.versionType().isVersionConflictForWrites(highestOpVersion, op.version(), docDeleted) == false) {
seqNo++;
assertThat(result.getSeqNo(), equalTo(seqNo));
assertThat(result.isCreated(), equalTo(docDeleted));
Expand All @@ -1632,7 +1667,7 @@ public void testExternalVersioningOnPrimary() throws IOException {
} else {
final Engine.Delete delete = (Engine.Delete) op;
Engine.DeleteResult result = engine.delete(delete);
if (op.version() > highestOpVersion) {
if (op.versionType().isVersionConflictForWrites(highestOpVersion, op.version(), docDeleted) == false) {
seqNo++;
assertThat(result.getSeqNo(), equalTo(seqNo));
assertThat(result.isFound(), equalTo(docDeleted == false));
Expand All @@ -1658,6 +1693,7 @@ public void testExternalVersioningOnPrimary() throws IOException {

assertVisibleCount(engine, docDeleted ? 0 : 1);
if (docDeleted == false) {
logger.info("searching for [{}]", lastFieldValue);
try (Searcher searcher = engine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector);
Expand All @@ -1667,13 +1703,13 @@ public void testExternalVersioningOnPrimary() throws IOException {
}

public void testVersioningPromotedReplica() throws IOException {
final List<Engine.Operation> replicaOps = generateSingleDocHistory(true, true, false, 1, 2, 20);
List<Engine.Operation> primaryOps = generateSingleDocHistory(false, false, false, 2, 2, 20);
final List<Engine.Operation> replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, false, 1, 2, 20);
List<Engine.Operation> primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20);
Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1);
final boolean deletedOnReplica = lastReplicaOp instanceof Engine.Delete;
final long finalReplicaVersion = lastReplicaOp.version();
final long finalReplicaSeqNo = lastReplicaOp.seqNo();
assertOpsOnReplica(replicaOps, replicaEngine);
assertOpsOnReplica(replicaOps, replicaEngine, true);
final int opsOnPrimary = assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine);
final long currentSeqNo = getSequenceID(replicaEngine, new Engine.Get(false, lastReplicaOp.uid())).v1();
try (Searcher searcher = engine.acquireSearcher("test")) {
Expand All @@ -1687,7 +1723,7 @@ public void testVersioningPromotedReplica() throws IOException {
}

public void testConcurrentExternalVersioningOnPrimary() throws IOException, InterruptedException {
final List<Engine.Operation> ops = generateSingleDocHistory(false, true, false, 2, 100, 300);
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.EXTERNAL, false, 2, 100, 300);
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
Expand Down