Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2108,8 +2108,9 @@ private IndexWriterConfig getIndexWriterConfig() {
// Give us the opportunity to upgrade old segments while performing
// background merges
MergePolicy mergePolicy = config().getMergePolicy();
// always configure soft-deletes field so an engine with soft-deletes disabled can open a Lucene index with soft-deletes.
iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
if (softDeleteEnabled) {
iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, softDeletesPolicy::getRetentionQuery,
new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, softDeletesPolicy::getRetentionQuery, mergePolicy));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5454,6 +5454,34 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
}
}

public void testOpenSoftDeletesIndexWithSoftDeletesDisabled() throws Exception {
try (Store store = createStore()) {
Path translogPath = createTempDir();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final IndexSettings softDeletesEnabled = IndexSettingsModule.newIndexSettings(
IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(Settings.builder().
put(defaultSettings.getSettings()).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)).build());
final List<DocIdSeqNoAndTerm> docs;
try (InternalEngine engine = createEngine(
config(softDeletesEnabled, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get))) {
List<Engine.Operation> ops = generateReplicaHistory(between(1, 100), randomBoolean());
applyOperations(engine, ops);
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.syncTranslog();
engine.flush();
docs = getDocIds(engine, true);
}
final IndexSettings softDeletesDisabled = IndexSettingsModule.newIndexSettings(
IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(Settings.builder()
.put(defaultSettings.getSettings()).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false)).build());
EngineConfig config = config(softDeletesDisabled, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get);
trimUnsafeCommits(config);
try (InternalEngine engine = createEngine(config)) {
assertThat(getDocIds(engine, true), equalTo(docs));
}
}
}

static void trimUnsafeCommits(EngineConfig config) throws IOException {
final Store store = config.getStore();
final TranslogConfig translogConfig = config.getTranslogConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -704,6 +705,32 @@ public static List<Engine.Operation> generateSingleDocHistory(boolean forReplica
return ops;
}

public List<Engine.Operation> generateReplicaHistory(int numOps, boolean allowGapInSeqNo) {
long seqNo = 0;
List<Engine.Operation> operations = new ArrayList<>(numOps);
for (int i = 0; i < numOps; i++) {
String id = Integer.toString(between(1, 100));
final ParsedDocument doc = EngineTestCase.createParsedDoc(id, null);
if (randomBoolean()) {
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(),
-1, true));
} else if (randomBoolean()) {
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis()));
} else {
operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA,
threadPool.relativeTimeInMillis(), "test-" + i));
}
seqNo++;
if (allowGapInSeqNo && rarely()) {
seqNo++;
}
}
Randomness.shuffle(operations);
return operations;
}

public static void assertOpsOnReplica(
final List<Engine.Operation> ops,
final InternalEngine replicaEngine,
Expand Down Expand Up @@ -788,14 +815,7 @@ public static void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngi
int docOffset;
while ((docOffset = offset.incrementAndGet()) < ops.size()) {
try {
final Engine.Operation op = ops.get(docOffset);
if (op instanceof Engine.Index) {
engine.index((Engine.Index) op);
} else if (op instanceof Engine.Delete){
engine.delete((Engine.Delete) op);
} else {
engine.noOp((Engine.NoOp) op);
}
applyOperation(engine, ops.get(docOffset));
if ((docOffset + 1) % 4 == 0) {
engine.refresh("test");
}
Expand All @@ -814,6 +834,36 @@ public static void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngi
}
}

public static void applyOperations(Engine engine, List<Engine.Operation> operations) throws IOException {
for (Engine.Operation operation : operations) {
applyOperation(engine, operation);
if (randomInt(100) < 10) {
engine.refresh("test");
}
if (rarely()) {
engine.flush();
}
}
}

public static Engine.Result applyOperation(Engine engine, Engine.Operation operation) throws IOException {
final Engine.Result result;
switch (operation.operationType()) {
case INDEX:
result = engine.index((Engine.Index) operation);
break;
case DELETE:
result = engine.delete((Engine.Delete) operation);
break;
case NO_OP:
result = engine.noOp((Engine.NoOp) operation);
break;
default:
throw new IllegalStateException("No operation defined for [" + operation + "]");
}
return result;
}

/**
* Gets a collection of tuples of docId, sequence number, and primary term of all live documents in the provided engine.
*/
Expand Down