Skip to content

Commit 424ed93

Browse files
authored
Always use soft-deletes in InternalEngine (#50415)
Peer recoveries become faster and use less storage (i.e., no more extra translog) with soft-deletes. Soft-deletes has been enabled by default since 7.0. We should make it mandatory in 8.0, so we can simplify the logic in the engine, recoveries, and other components. With this change, InternalEngine will always use soft-deletes regardless of the soft_deletes settings.
1 parent d02afcc commit 424ed93

File tree

7 files changed

+159
-478
lines changed

7 files changed

+159
-478
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 46 additions & 93 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -168,15 +168,12 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
168168

169169
protected final ElasticsearchDirectoryReader wrapReader(DirectoryReader reader,
170170
Function<DirectoryReader, DirectoryReader> readerWrapperFunction) throws IOException {
171-
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
172-
reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
173-
}
174171
reader = readerWrapperFunction.apply(reader);
175172
return ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId());
176173
}
177174

178175
protected DirectoryReader open(IndexCommit commit) throws IOException {
179-
return DirectoryReader.open(commit, OFF_HEAP_READER_ATTRIBUTES);
176+
return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(commit, OFF_HEAP_READER_ATTRIBUTES), Lucene.SOFT_DELETES_FIELD);
180177
}
181178

182179
private DocsStats docsStats(final SegmentInfos lastCommittedSegmentInfos) {
@@ -313,10 +310,7 @@ public Closeable acquireHistoryRetentionLock(HistorySource historySource) {
313310

314311
@Override
315312
public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService, long fromSeqNo, long toSeqNo,
316-
boolean requiredFullRange) throws IOException {
317-
if (engineConfig.getIndexSettings().isSoftDeleteEnabled() == false) {
318-
throw new IllegalStateException("accessing changes snapshot requires soft-deletes enabled");
319-
}
313+
boolean requiredFullRange) {
320314
return newEmptySnapshot();
321315
}
322316

server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ protected void parseCreateField(ParseContext context, List<IndexableField> field
236236
fields.add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length));
237237
}
238238

239-
if (originalSource != null && adaptedSource != originalSource && context.indexSettings().isSoftDeleteEnabled()) {
239+
if (originalSource != null && adaptedSource != originalSource) {
240240
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
241241
BytesRef ref = originalSource.toBytesRef();
242242
fields.add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 87 additions & 352 deletions
Large diffs are not rendered by default.

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2911,25 +2911,23 @@ public void testDocStats() throws Exception {
29112911
indexDoc(indexShard, "_doc", id);
29122912
}
29132913
// Need to update and sync the global checkpoint and the retention leases for the soft-deletes retention MergePolicy.
2914-
if (indexShard.indexSettings.isSoftDeleteEnabled()) {
2915-
final long newGlobalCheckpoint = indexShard.getLocalCheckpoint();
2916-
if (indexShard.routingEntry().primary()) {
2917-
indexShard.updateLocalCheckpointForShard(indexShard.routingEntry().allocationId().getId(),
2918-
indexShard.getLocalCheckpoint());
2919-
indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(),
2920-
indexShard.getLocalCheckpoint());
2921-
indexShard.syncRetentionLeases();
2922-
} else {
2923-
indexShard.updateGlobalCheckpointOnReplica(newGlobalCheckpoint, "test");
2914+
final long newGlobalCheckpoint = indexShard.getLocalCheckpoint();
2915+
if (indexShard.routingEntry().primary()) {
2916+
indexShard.updateLocalCheckpointForShard(indexShard.routingEntry().allocationId().getId(),
2917+
indexShard.getLocalCheckpoint());
2918+
indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(),
2919+
indexShard.getLocalCheckpoint());
2920+
indexShard.syncRetentionLeases();
2921+
} else {
2922+
indexShard.updateGlobalCheckpointOnReplica(newGlobalCheckpoint, "test");
29242923

2925-
final RetentionLeases retentionLeases = indexShard.getRetentionLeases();
2926-
indexShard.updateRetentionLeasesOnReplica(new RetentionLeases(
2927-
retentionLeases.primaryTerm(), retentionLeases.version() + 1,
2928-
retentionLeases.leases().stream().map(lease -> new RetentionLease(lease.id(), newGlobalCheckpoint + 1,
2929-
lease.timestamp(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE)).collect(Collectors.toList())));
2930-
}
2931-
indexShard.sync();
2924+
final RetentionLeases retentionLeases = indexShard.getRetentionLeases();
2925+
indexShard.updateRetentionLeasesOnReplica(new RetentionLeases(
2926+
retentionLeases.primaryTerm(), retentionLeases.version() + 1,
2927+
retentionLeases.leases().stream().map(lease -> new RetentionLease(lease.id(), newGlobalCheckpoint + 1,
2928+
lease.timestamp(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE)).collect(Collectors.toList())));
29322929
}
2930+
indexShard.sync();
29332931
// flush the buffered deletes
29342932
final FlushRequest flushRequest = new FlushRequest();
29352933
flushRequest.force(false);

server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -982,9 +982,7 @@ public void testFilterCacheStats() throws Exception {
982982
indexRandom(false, true,
983983
client().prepareIndex("index").setId("1").setSource("foo", "bar"),
984984
client().prepareIndex("index").setId("2").setSource("foo", "baz"));
985-
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
986-
persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP.
987-
}
985+
persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP.
988986
refresh();
989987
ensureGreen();
990988

test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1067,8 +1067,7 @@ public static List<Translog.Operation> readAllOperationsInLucene(Engine engine,
10671067
* Asserts the provided engine has a consistent document history between translog and Lucene index.
10681068
*/
10691069
public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine engine, MapperService mapper) throws IOException {
1070-
if (mapper == null || mapper.documentMapper() == null || engine.config().getIndexSettings().isSoftDeleteEnabled() == false
1071-
|| (engine instanceof InternalEngine) == false) {
1070+
if (mapper == null || mapper.documentMapper() == null || (engine instanceof InternalEngine) == false) {
10721071
return;
10731072
}
10741073
final List<Translog.Operation> translogOps = new ArrayList<>();
@@ -1090,8 +1089,12 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e
10901089
final long globalCheckpoint = EngineTestCase.getTranslog(engine).getLastSyncedGlobalCheckpoint();
10911090
final long retainedOps = engine.config().getIndexSettings().getSoftDeleteRetentionOperations();
10921091
final long seqNoForRecovery;
1093-
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) {
1094-
seqNoForRecovery = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
1092+
if (engine.config().getIndexSettings().isSoftDeleteEnabled()) {
1093+
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) {
1094+
seqNoForRecovery = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
1095+
}
1096+
} else {
1097+
seqNoForRecovery = engine.getMinRetainedSeqNo();
10951098
}
10961099
final long minSeqNoToRetain = Math.min(seqNoForRecovery, globalCheckpoint + 1 - retainedOps);
10971100
for (Translog.Operation translogOp : translogOps) {

0 commit comments

Comments
 (0)