diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 0ffbbedeea32e..328d1ede2b416 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -248,8 +248,7 @@ public final class IndexSettings { * This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4). **/ public static final Setting INDEX_TRANSLOG_RETENTION_AGE_SETTING = - Setting.timeSetting("index.translog.retention.age", - settings -> shouldDisableTranslogRetention(settings) ? TimeValue.MINUS_ONE : TimeValue.timeValueHours(12), + Setting.timeSetting("index.translog.retention.age", settings -> TimeValue.MINUS_ONE, TimeValue.MINUS_ONE, Property.Dynamic, Property.IndexScope); /** @@ -259,9 +258,7 @@ public final class IndexSettings { * This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4). **/ public static final Setting INDEX_TRANSLOG_RETENTION_SIZE_SETTING = - Setting.byteSizeSetting("index.translog.retention.size", - settings -> shouldDisableTranslogRetention(settings) ? "-1" : "512MB", - Property.Dynamic, Property.IndexScope); + Setting.byteSizeSetting("index.translog.retention.size", settings -> "-1", Property.Dynamic, Property.IndexScope); /** * Controls the number of translog files that are no longer needed for persistence reasons will be kept around before being deleted. @@ -351,8 +348,6 @@ public final class IndexSettings { private volatile TimeValue syncInterval; private volatile TimeValue refreshInterval; private volatile ByteSizeValue flushThresholdSize; - private volatile TimeValue translogRetentionAge; - private volatile ByteSizeValue translogRetentionSize; private volatile ByteSizeValue generationThresholdSize; private volatile ByteSizeValue flushAfterMergeThresholdSize; private final MergeSchedulerConfig mergeSchedulerConfig; @@ -512,8 +507,6 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti this.indexSortConfig = new IndexSortConfig(this); searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER); defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE); - setTranslogRetentionAge(scopedSettings.get(INDEX_TRANSLOG_RETENTION_AGE_SETTING)); - setTranslogRetentionSize(scopedSettings.get(INDEX_TRANSLOG_RETENTION_SIZE_SETTING)); scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio); scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING, @@ -551,8 +544,6 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer( INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, this::setGenerationThresholdSize); - scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge); - scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize); scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval); scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); scopedSettings.addSettingsUpdateConsumer(MAX_ANALYZED_OFFSET_SETTING, this::setHighlightMaxAnalyzedOffset); @@ -578,24 +569,6 @@ private void setFlushAfterMergeThresholdSize(ByteSizeValue byteSizeValue) { this.flushAfterMergeThresholdSize = byteSizeValue; } - private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) { - if (shouldDisableTranslogRetention(settings) && byteSizeValue.getBytes() >= 0) { - // ignore the translog retention settings if soft-deletes enabled - this.translogRetentionSize = new ByteSizeValue(-1); - } else { - this.translogRetentionSize = byteSizeValue; - } - } - - private void setTranslogRetentionAge(TimeValue age) { - if (shouldDisableTranslogRetention(settings) && age.millis() >= 0) { - // ignore the translog retention settings if soft-deletes enabled - this.translogRetentionAge = TimeValue.MINUS_ONE; - } else { - this.translogRetentionAge = age; - } - } - private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) { this.generationThresholdSize = generationThresholdSize; } @@ -772,36 +745,6 @@ public TimeValue getRefreshInterval() { */ public ByteSizeValue getFlushAfterMergeThresholdSize() { return flushAfterMergeThresholdSize; } - /** - * Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries - */ - public ByteSizeValue getTranslogRetentionSize() { - assert shouldDisableTranslogRetention(settings) == false || translogRetentionSize.getBytes() == -1L : translogRetentionSize; - return translogRetentionSize; - } - - /** - * Returns the transaction log retention age which controls the maximum age (time from creation) that translog files will be kept - * around - */ - public TimeValue getTranslogRetentionAge() { - assert shouldDisableTranslogRetention(settings) == false || translogRetentionAge.millis() == -1L : translogRetentionSize; - return translogRetentionAge; - } - - /** - * Returns the maximum number of translog files that that no longer required for persistence should be kept for peer recovery - * when soft-deletes is disabled. - */ - public int getTranslogRetentionTotalFiles() { - return INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.get(getSettings()); - } - - private static boolean shouldDisableTranslogRetention(Settings settings) { - return INDEX_SOFT_DELETES_SETTING.get(settings) - && IndexMetaData.SETTING_INDEX_VERSION_CREATED.get(settings).onOrAfter(Version.V_7_4_0); - } - /** * Returns the generation threshold size. As sequence numbers can cause multiple generations to * be preserved for rollback purposes, we want to keep the size of individual generations from diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index ff484ba327cee..07994849d1359 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -66,7 +66,6 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.regex.Regex; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.VersionType; @@ -1783,7 +1782,7 @@ public IndexCommit getIndexCommit() { } } - public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { + public void onSettingsChanged() { } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index cf3182fd5a7c1..8a3fc02defdca 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -70,8 +70,6 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; import org.elasticsearch.common.metrics.CounterMetric; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.common.util.concurrent.ReleasableLock; @@ -190,11 +188,7 @@ public InternalEngine(EngineConfig engineConfig) { final EngineConfig engineConfig, final BiFunction localCheckpointTrackerSupplier) { super(engineConfig); - final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( - engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), - engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(), - engineConfig.getIndexSettings().getTranslogRetentionTotalFiles() - ); + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(); store.incRef(); IndexWriter writer = null; Translog translog = null; @@ -2432,14 +2426,11 @@ final void ensureCanFlush() { } @Override - public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { + public void onSettingsChanged() { mergeScheduler.refreshConfig(); // config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed: maybePruneDeletes(); - final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy(); - translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis()); - translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes()); - softDeletesPolicy.setRetentionOperations(softDeletesRetentionOps); + softDeletesPolicy.setRetentionOperations(config().getIndexSettings().getSoftDeleteRetentionOperations()); } public MergeStats getMergeStats() { diff --git a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java index 06520d3036c31..67163f53516e0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java @@ -152,7 +152,7 @@ public void trimUnreferencedTranslogFiles() { if (minTranslogGeneration < lastCommitGeneration) { // a translog deletion policy that retains nothing but the last translog generation from safe commit - final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1, 0); + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(); translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration); translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 2f11ab8b35dbc..9b68fd15c7792 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -222,11 +222,7 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm } final long translogGenOfLastCommit = Long.parseLong(infos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); final TranslogConfig translogConfig = config.getTranslogConfig(); - final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( - config.getIndexSettings().getTranslogRetentionSize().getBytes(), - config.getIndexSettings().getTranslogRetentionAge().getMillis(), - config.getIndexSettings().getTranslogRetentionTotalFiles() - ); + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(); translogDeletionPolicy.setTranslogGenerationOfLastCommit(translogGenOfLastCommit); try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, config.getGlobalCheckpointSupplier(), diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d44dea5e10d28..7ea68562783cf 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -67,7 +67,6 @@ import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -612,7 +611,6 @@ public void onFailure(Exception e) { if (shardRoutings.stream().allMatch( shr -> shr.assignedToNode() && retentionLeases.contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shr)))) { useRetentionLeasesInPeerRecovery = true; - turnOffTranslogRetention(); } } } @@ -1886,35 +1884,10 @@ boolean shouldRollTranslogGeneration() { public void onSettingsChanged() { Engine engineOrNull = getEngineOrNull(); if (engineOrNull != null) { - final boolean disableTranslogRetention = indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery; - engineOrNull.onSettingsChanged( - disableTranslogRetention ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(), - disableTranslogRetention ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(), - indexSettings.getSoftDeleteRetentionOperations() - ); + engineOrNull.onSettingsChanged(); } } - private void turnOffTranslogRetention() { - logger.debug("turn off the translog retention for the replication group {} " + - "as it starts using retention leases exclusively in peer recoveries", shardId); - // Off to the generic threadPool as pruning the delete tombstones can be expensive. - threadPool.generic().execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - if (state != IndexShardState.CLOSED) { - logger.warn("failed to turn off translog retention", e); - } - } - - @Override - protected void doRun() { - onSettingsChanged(); - trimTranslog(); - } - }); - } - /** * Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed */ diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 055b9b82fc917..549caf9f8cb0a 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1672,7 +1672,7 @@ public void trimUnreferencedReaders() throws IOException { // we're shutdown potentially on some tragic event, don't delete anything return; } - long minReferencedGen = deletionPolicy.minTranslogGenRequired(readers, current); + long minReferencedGen = deletionPolicy.minTranslogGenRequired(); assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is [" + getMinFileGeneration() + "]"; diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 8a553aad326b7..24c3e75c4d381 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -23,14 +23,12 @@ import org.elasticsearch.Assertions; import org.elasticsearch.common.lease.Releasable; -import java.io.IOException; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -public class TranslogDeletionPolicy { +public final class TranslogDeletionPolicy { private final Map openTranslogRef; @@ -59,16 +57,8 @@ public void assertNoOpenTranslogRefs() { */ private long translogGenerationOfLastCommit = 1; - private long retentionSizeInBytes; - private long retentionAgeInMillis; - - private int retentionTotalFiles; - - public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) { - this.retentionSizeInBytes = retentionSizeInBytes; - this.retentionAgeInMillis = retentionAgeInMillis; - this.retentionTotalFiles = retentionTotalFiles; + public TranslogDeletionPolicy() { if (Assertions.ENABLED) { openTranslogRef = new ConcurrentHashMap<>(); } else { @@ -95,18 +85,6 @@ public synchronized void setTranslogGenerationOfLastCommit(long lastGen) { translogGenerationOfLastCommit = lastGen; } - public synchronized void setRetentionSizeInBytes(long bytes) { - retentionSizeInBytes = bytes; - } - - public synchronized void setRetentionAgeInMillis(long ageInMillis) { - retentionAgeInMillis = ageInMillis; - } - - synchronized void setRetentionTotalFiles(int retentionTotalFiles) { - this.retentionTotalFiles = retentionTotalFiles; - } - /** * acquires the basis generation for a new snapshot. Any translog generation above, and including, the returned generation * will not be deleted until the returned {@link Releasable} is closed. @@ -156,66 +134,9 @@ private synchronized void releaseTranslogGen(long translogGen) { /** * returns the minimum translog generation that is still required by the system. Any generation below * the returned value may be safely deleted - * - * @param readers current translog readers - * @param writer current translog writer */ - synchronized long minTranslogGenRequired(List readers, TranslogWriter writer) throws IOException { - long minByLocks = getMinTranslogGenRequiredByLocks(); - long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime()); - long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes); - final long minByAgeAndSize; - if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) { - // both size and age are disabled; - minByAgeAndSize = Long.MAX_VALUE; - } else { - minByAgeAndSize = Math.max(minByAge, minBySize); - } - long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles); - return Math.min(Math.max(minByAgeAndSize, minByNumFiles), Math.min(minByLocks, minTranslogGenerationForRecovery)); - } - - static long getMinTranslogGenBySize(List readers, TranslogWriter writer, long retentionSizeInBytes) { - if (retentionSizeInBytes >= 0) { - long totalSize = writer.sizeInBytes(); - long minGen = writer.getGeneration(); - for (int i = readers.size() - 1; i >= 0 && totalSize < retentionSizeInBytes; i--) { - final TranslogReader reader = readers.get(i); - totalSize += reader.sizeInBytes(); - minGen = reader.getGeneration(); - } - return minGen; - } else { - return Long.MIN_VALUE; - } - } - - static long getMinTranslogGenByAge(List readers, TranslogWriter writer, long maxRetentionAgeInMillis, long now) - throws IOException { - if (maxRetentionAgeInMillis >= 0) { - for (TranslogReader reader: readers) { - if (now - reader.getLastModifiedTime() <= maxRetentionAgeInMillis) { - return reader.getGeneration(); - } - } - return writer.getGeneration(); - } else { - return Long.MIN_VALUE; - } - } - - static long getMinTranslogGenByTotalFiles(List readers, TranslogWriter writer, final int maxTotalFiles) { - long minGen = writer.generation; - int totalFiles = 1; // for the current writer - for (int i = readers.size() - 1; i >= 0 && totalFiles < maxTotalFiles; i--) { - totalFiles++; - minGen = readers.get(i).generation; - } - return minGen; - } - - protected long currentTime() { - return System.currentTimeMillis(); + synchronized long minTranslogGenRequired() { + return Math.min(getMinTranslogGenRequiredByLocks(), minTranslogGenerationForRecovery); } private long getMinTranslogGenRequiredByLocks() { diff --git a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java index 9480ee3c1e1f3..76bb38876095d 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java @@ -178,24 +178,16 @@ private boolean isTranslogClean(ShardPath shardPath, ClusterState clusterState, final TranslogConfig translogConfig = new TranslogConfig(shardPath.getShardId(), translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardPath.getShardId().id()); - // We open translog to check for corruption, do not clean anything. - final TranslogDeletionPolicy retainAllTranslogPolicy = new TranslogDeletionPolicy( - Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE) { - @Override - long minTranslogGenRequired(List readers, TranslogWriter writer) { - long minGen = writer.generation; - for (TranslogReader reader : readers) { - minGen = Math.min(reader.generation, minGen); - } - return minGen; - } - }; + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(); try (Translog translog = new Translog(translogConfig, translogUUID, - retainAllTranslogPolicy, () -> translogGlobalCheckpoint, () -> primaryTerm, seqNo -> {}); + translogDeletionPolicy, () -> translogGlobalCheckpoint, () -> primaryTerm, seqNo -> {}); Translog.Snapshot snapshot = translog.newSnapshot()) { //noinspection StatementWithEmptyBody we are just checking that we can iterate through the whole snapshot while (snapshot.next() != null) { } + // We open translog to check for corruption, do not clean anything. + translogDeletionPolicy.setTranslogGenerationOfLastCommit(translog.getMinFileGeneration()); + translogDeletionPolicy.setMinTranslogGenerationForRecovery(translog.getMinFileGeneration()); } return true; } catch (TranslogCorruptedException e) { diff --git a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 4e7ff217c9c5f..f20bee58d0487 100644 --- a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -473,8 +473,6 @@ public Settings onNodeStopped(String nodeName) throws Exception { // prevent a sequence-number-based recovery from being possible client(primaryNode).admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0) .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.getKey(), "0s") ).get(); diff --git a/server/src/test/java/org/elasticsearch/index/IndexSettingsTests.java b/server/src/test/java/org/elasticsearch/index/IndexSettingsTests.java index 57dfebbd231c0..2a56e4a7b10bb 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexSettingsTests.java @@ -545,30 +545,4 @@ public void testSoftDeletesDefaultSetting() { Settings settings = Settings.builder().put(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey(), createdVersion).build(); assertTrue(IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)); } - - public void testIgnoreTranslogRetentionSettingsIfSoftDeletesEnabled() { - Settings.Builder settings = Settings.builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomVersionBetween(random(), Version.V_7_4_0, Version.CURRENT)); - if (randomBoolean()) { - settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomPositiveTimeValue()); - } - if (randomBoolean()) { - settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), between(1, 1024) + "b"); - } - IndexMetaData metaData = newIndexMeta("index", settings.build()); - IndexSettings indexSettings = new IndexSettings(metaData, Settings.EMPTY); - assertThat(indexSettings.getTranslogRetentionAge().millis(), equalTo(-1L)); - assertThat(indexSettings.getTranslogRetentionSize().getBytes(), equalTo(-1L)); - - Settings.Builder newSettings = Settings.builder().put(settings.build()); - if (randomBoolean()) { - newSettings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomPositiveTimeValue()); - } - if (randomBoolean()) { - newSettings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), between(1, 1024) + "b"); - } - indexSettings.updateIndexMetaData(newIndexMeta("index", newSettings.build())); - assertThat(indexSettings.getTranslogRetentionAge().millis(), equalTo(-1L)); - assertThat(indexSettings.getTranslogRetentionSize().getBytes(), equalTo(-1L)); - } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index deb224f5b1895..50441feec7e1c 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -38,7 +38,6 @@ import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; -import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -54,7 +53,7 @@ public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { final int extraRetainedOps = between(0, 100); final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, NO_OPS_PERFORMED, extraRetainedOps, () -> RetentionLeases.EMPTY); - TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); + TranslogDeletionPolicy translogPolicy = new TranslogDeletionPolicy(); CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); final LongArrayList maxSeqNoList = new LongArrayList(); @@ -101,7 +100,7 @@ public void testAcquireIndexCommit() throws Exception { final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps, () -> RetentionLeases.EMPTY); final UUID translogUUID = UUID.randomUUID(); - TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); + TranslogDeletionPolicy translogPolicy = new TranslogDeletionPolicy(); CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); long lastMaxSeqNo = between(1, 1000); long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo); @@ -181,7 +180,7 @@ public void testAcquireIndexCommit() throws Exception { public void testDeleteInvalidCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY); - TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); + TranslogDeletionPolicy translogPolicy = new TranslogDeletionPolicy(); CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); final int invalidCommits = between(1, 10); @@ -216,7 +215,7 @@ public void testCheckUnreferencedCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY); final UUID translogUUID = UUID.randomUUID(); - final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); + final TranslogDeletionPolicy translogPolicy = new TranslogDeletionPolicy(); CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); final List commitList = new ArrayList<>(); int totalCommits = between(2, 20); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 986017eafae59..0415941a48bab 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -133,6 +133,7 @@ import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.index.translog.TranslogDeletionPolicy; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.VersionUtils; @@ -187,7 +188,6 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; -import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.Matchers.contains; @@ -1373,8 +1373,7 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception { } settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0); indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build()); - engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(), - indexSettings.getSoftDeleteRetentionOperations()); + engine.onSettingsChanged(); globalCheckpoint.set(localCheckpoint); engine.syncTranslog(); @@ -1464,8 +1463,7 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc } settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0); indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build()); - engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(), - indexSettings.getSoftDeleteRetentionOperations()); + engine.onSettingsChanged(); // If we already merged down to 1 segment, then the next force-merge will be a noop. We need to add an extra segment to make // merges happen so we can verify that _recovery_source are pruned. See: https://github.com/elastic/elasticsearch/issues/41628. final int numSegments; @@ -2706,13 +2704,6 @@ public void testTranslogReplayWithFailure() throws IOException { public void testTranslogCleanUpPostCommitCrash() throws Exception { IndexSettings indexSettings = new IndexSettings(defaultSettings.getIndexMetaData(), defaultSettings.getNodeSettings(), defaultSettings.getScopedSettings()); - IndexMetaData.Builder builder = IndexMetaData.builder(indexSettings.getIndexMetaData()); - builder.settings(Settings.builder().put(indexSettings.getSettings()) - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") - ); - indexSettings.updateIndexMetaData(builder.build()); - try (Store store = createStore()) { AtomicBoolean throwErrorOnCommit = new AtomicBoolean(); final Path translogPath = createTempDir(); @@ -2882,7 +2873,7 @@ public void testRecoverFromForeignTranslog() throws IOException { final String badUUID = Translog.createEmptyTranslog(badTranslogLog, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); Translog translog = new Translog( new TranslogConfig(shardId, badTranslogLog, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), - badUUID, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}); + badUUID, new TranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}); translog.add(new Translog.Index("SomeBogusId", 0, primaryTerm.get(), "{}".getBytes(Charset.forName("UTF-8")))); assertEquals(generation.translogFileGeneration, translog.currentFileGeneration()); @@ -4576,14 +4567,6 @@ public void testSeqNoGenerator() throws IOException { public void testKeepTranslogAfterGlobalCheckpoint() throws Exception { IOUtils.close(engine, store); - final IndexSettings indexSettings = new IndexSettings(defaultSettings.getIndexMetaData(), defaultSettings.getNodeSettings(), - defaultSettings.getScopedSettings()); - IndexMetaData.Builder builder = IndexMetaData.builder(indexSettings.getIndexMetaData()) - .settings(Settings.builder().put(indexSettings.getSettings()) - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomFrom("-1", "100micros", "30m")) - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), randomFrom("-1", "512b", "1gb"))); - indexSettings.updateIndexMetaData(builder.build()); - final Path translogPath = createTempDir(); store = createStore(); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); @@ -4591,7 +4574,7 @@ public void testKeepTranslogAfterGlobalCheckpoint() throws Exception { final String translogUUID = Translog.createEmptyTranslog(translogPath, globalCheckpoint.get(), shardId, primaryTerm.get()); store.associateIndexWithNewTranslog(translogUUID); - final EngineConfig engineConfig = config(indexSettings, store, translogPath, + final EngineConfig engineConfig = config(defaultSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, () -> globalCheckpoint.get()); try (InternalEngine engine = new InternalEngine(engineConfig) { @Override @@ -4732,14 +4715,10 @@ public void testAcquireIndexCommit() throws Exception { public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception { IOUtils.close(engine, store); - final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", - Settings.builder().put(defaultSettings.getSettings()) - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), -1) - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), -1).build()); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore(); InternalEngine engine = - createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), + createEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get))) { final int numDocs = scaledRandomIntBetween(10, 100); for (int docId = 0; docId < numDocs; docId++) { @@ -4817,8 +4796,7 @@ public void testShouldPeriodicallyFlush() throws Exception { .settings(Settings.builder().put(indexSettings.getSettings()) .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); indexSettings.updateIndexMetaData(indexMetaData); - engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(), - indexSettings.getSoftDeleteRetentionOperations()); + engine.onSettingsChanged(); assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(); @@ -4866,8 +4844,7 @@ public void testShouldPeriodicallyFlushAfterMerge() throws Exception { .settings(Settings.builder().put(indexSettings.getSettings()) .put(IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING.getKey(), "0b")).build(); indexSettings.updateIndexMetaData(indexMetaData); - engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(), - indexSettings.getSoftDeleteRetentionOperations()); + engine.onSettingsChanged(); assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(1)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); doc = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null); @@ -4892,8 +4869,7 @@ public void testStressShouldPeriodicallyFlush() throws Exception { .put(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey(), generationThreshold + "b") .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); indexSettings.updateIndexMetaData(indexMetaData); - engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(), - indexSettings.getSoftDeleteRetentionOperations()); + engine.onSettingsChanged(); final int numOps = scaledRandomIntBetween(100, 10_000); for (int i = 0; i < numOps; i++) { final long localCheckPoint = engine.getProcessedLocalCheckpoint(); @@ -4921,8 +4897,7 @@ public void testStressUpdateSameDocWhileGettingIt() throws IOException, Interrup .settings(Settings.builder().put(indexSettings.getSettings()) .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), TimeValue.timeValueMillis(1))).build(); engine.engineConfig.getIndexSettings().updateIndexMetaData(indexMetaData); - engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(), - indexSettings.getSoftDeleteRetentionOperations()); + engine.onSettingsChanged(); ParsedDocument document = testParsedDocument(Integer.toString(0), null, testDocumentWithTextField(), SOURCE, null); final Engine.Index doc = new Engine.Index(newUid(document), document, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), @@ -5190,8 +5165,7 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { if (rarely()) { settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10)); indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build()); - engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(), - indexSettings.getSoftDeleteRetentionOperations()); + engine.onSettingsChanged(); } if (rarely()) { engine.refresh("test"); diff --git a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java index 4856fc0f2b6e1..ef98df967f2f8 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java @@ -211,8 +211,6 @@ public void testTrimUnreferencedTranslogFiles() throws Exception { private void flushAndTrimTranslog(final InternalEngine engine) { engine.flush(true, true); final TranslogDeletionPolicy deletionPolicy = engine.getTranslog().getDeletionPolicy(); - deletionPolicy.setRetentionSizeInBytes(-1); - deletionPolicy.setRetentionAgeInMillis(-1); deletionPolicy.setMinTranslogGenerationForRecovery(engine.getTranslog().getGeneration().translogFileGeneration); engine.flush(true, true); } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index ec10e2a5e8444..d018cae78a467 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -211,10 +211,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { if (expectSeqNoRecovery == false) { IndexMetaData.Builder builder = IndexMetaData.builder(newPrimary.indexSettings().getIndexMetaData()); builder.settings(Settings.builder().put(newPrimary.indexSettings().getSettings()) - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") - .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0) - ); + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0)); newPrimary.indexSettings().updateIndexMetaData(builder.build()); newPrimary.onSettingsChanged(); // Make sure the global checkpoint on the new primary is persisted properly, diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 4f90b042ebd10..5cc9bfd2b6993 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -92,7 +92,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Locale; import java.util.concurrent.BrokenBarrierException; @@ -128,7 +127,6 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; public class IndexShardIT extends ESSingleNodeTestCase { @@ -838,42 +836,6 @@ public void testNoOpEngineFactoryTakesPrecedence() { } } - public void testLimitNumberOfRetainedTranslogFiles() throws Exception { - String indexName = "test"; - int translogRetentionTotalFiles = randomIntBetween(0, 50); - Settings.Builder settings = Settings.builder() - .put(SETTING_NUMBER_OF_SHARDS, 1) - .put(SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.getKey(), translogRetentionTotalFiles); - if (randomBoolean()) { - settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1024 * 1024))); - } - if (randomBoolean()) { - settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), TimeValue.timeValueMillis(between(1, 10_000))); - } - IndexService indexService = createIndex(indexName, settings.build()); - IndexShard shard = indexService.getShard(0); - shard.rollTranslogGeneration(); - CheckedRunnable checkTranslog = () -> { - try (Stream files = Files.list(getTranslog(shard).location()).sorted(Comparator.reverseOrder())) { - long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count(); - assertThat(totalFiles, either(lessThanOrEqualTo((long) translogRetentionTotalFiles)).or(equalTo(1L))); - } - }; - for (int i = 0; i < 100; i++) { - client().prepareIndex(indexName).setId(Integer.toString(i)).setSource("{}", XContentType.JSON).get(); - if (randomInt(100) < 10) { - client().admin().indices().prepareFlush(indexName).setWaitIfOngoing(true).get(); - checkTranslog.run(); - } - if (randomInt(100) < 10) { - shard.rollTranslogGeneration(); - } - } - client().admin().indices().prepareFlush(indexName).setForce(true).setWaitIfOngoing(true).get(); - checkTranslog.run(); - } - /** * Asserts that there are no files in the specified path */ diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 0d296af5f0c8d..adb0455def075 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -20,11 +20,11 @@ package org.elasticsearch.index.translog; import org.apache.lucene.store.ByteArrayDataOutput; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.mockito.Mockito; @@ -38,152 +38,51 @@ import static org.hamcrest.Matchers.equalTo; - public class TranslogDeletionPolicyTests extends ESTestCase { - public void testNoRetention() throws IOException { - long now = System.currentTimeMillis(); - Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + public void testMinRetainedGeneration() throws IOException { + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(); List allGens = new ArrayList<>(readersAndWriter.v1()); allGens.add(readersAndWriter.v2()); try { - TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, 0, 0, 0); - assertMinGenRequired(deletionPolicy, readersAndWriter, 1L); + TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + assertThat(deletionPolicy.minTranslogGenRequired(), equalTo(1L)); final int committedReader = randomIntBetween(0, allGens.size() - 1); final long committedGen = allGens.get(committedReader).generation; deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(committedGen, Long.MAX_VALUE)); deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); - assertMinGenRequired(deletionPolicy, readersAndWriter, committedGen); - } finally { - IOUtils.close(readersAndWriter.v1()); - IOUtils.close(readersAndWriter.v2()); - } - } - - public void testBytesRetention() throws IOException { - long now = System.currentTimeMillis(); - Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); - List allGens = new ArrayList<>(readersAndWriter.v1()); - allGens.add(readersAndWriter.v2()); - try { - final int selectedReader = randomIntBetween(0, allGens.size() - 1); - final long selectedGeneration = allGens.get(selectedReader).generation; - long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get(); - assertThat(TranslogDeletionPolicy.getMinTranslogGenBySize(readersAndWriter.v1(), readersAndWriter.v2(), size), - equalTo(selectedGeneration)); - assertThat(TranslogDeletionPolicy.getMinTranslogGenBySize(readersAndWriter.v1(), readersAndWriter.v2(), -1), - equalTo(Long.MIN_VALUE)); - } finally { - IOUtils.close(readersAndWriter.v1()); - IOUtils.close(readersAndWriter.v2()); - } - } - - public void testAgeRetention() throws IOException { - long now = System.currentTimeMillis(); - Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); - List allGens = new ArrayList<>(readersAndWriter.v1()); - allGens.add(readersAndWriter.v2()); - try { - final int selectedReader = randomIntBetween(0, allGens.size() - 1); - final long selectedGeneration = allGens.get(selectedReader).generation; - long maxAge = now - allGens.get(selectedReader).getLastModifiedTime(); - assertThat(TranslogDeletionPolicy.getMinTranslogGenByAge(readersAndWriter.v1(), readersAndWriter.v2(), maxAge, now), - equalTo(selectedGeneration)); - assertThat(TranslogDeletionPolicy.getMinTranslogGenByAge(readersAndWriter.v1(), readersAndWriter.v2(), -1, now), - equalTo(Long.MIN_VALUE)); - } finally { - IOUtils.close(readersAndWriter.v1()); - IOUtils.close(readersAndWriter.v2()); - } - } - - public void testTotalFilesRetention() throws Exception { - Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(System.currentTimeMillis()); - List allGens = new ArrayList<>(readersAndWriter.v1()); - allGens.add(readersAndWriter.v2()); - try { - assertThat(TranslogDeletionPolicy.getMinTranslogGenByTotalFiles(readersAndWriter.v1(), readersAndWriter.v2(), - randomIntBetween(Integer.MIN_VALUE, 1)), equalTo(readersAndWriter.v2().generation)); - assertThat(TranslogDeletionPolicy.getMinTranslogGenByTotalFiles(readersAndWriter.v1(), readersAndWriter.v2(), - randomIntBetween(allGens.size(), Integer.MAX_VALUE)), equalTo(allGens.get(0).generation)); - int numFiles = randomIntBetween(1, allGens.size()); - long selectedGeneration = allGens.get(allGens.size() - numFiles).generation; - assertThat(TranslogDeletionPolicy.getMinTranslogGenByTotalFiles(readersAndWriter.v1(), readersAndWriter.v2(), numFiles), - equalTo(selectedGeneration)); - } finally { - IOUtils.close(readersAndWriter.v1()); - IOUtils.close(readersAndWriter.v2()); - } - } - - /** - * Tests that age trumps size but recovery trumps both. - */ - public void testRetentionHierarchy() throws IOException { - long now = System.currentTimeMillis(); - Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); - List allGens = new ArrayList<>(readersAndWriter.v1()); - allGens.add(readersAndWriter.v2()); - try { - TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE); - deletionPolicy.setTranslogGenerationOfLastCommit(Long.MAX_VALUE); - deletionPolicy.setMinTranslogGenerationForRecovery(Long.MAX_VALUE); - int selectedReader = randomIntBetween(0, allGens.size() - 1); - final long selectedGenerationByAge = allGens.get(selectedReader).generation; - long maxAge = now - allGens.get(selectedReader).getLastModifiedTime(); - selectedReader = randomIntBetween(0, allGens.size() - 1); - final long selectedGenerationBySize = allGens.get(selectedReader).generation; - long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get(); - selectedReader = randomIntBetween(0, allGens.size() - 1); - final long selectedGenerationByTotalFiles = allGens.get(selectedReader).generation; - deletionPolicy.setRetentionAgeInMillis(maxAge); - deletionPolicy.setRetentionSizeInBytes(size); - final int totalFiles = allGens.size() - selectedReader; - deletionPolicy.setRetentionTotalFiles(totalFiles); - assertMinGenRequired(deletionPolicy, readersAndWriter, - max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles)); - // make a new policy as committed gen can't go backwards (for now) - deletionPolicy = new MockDeletionPolicy(now, size, maxAge, totalFiles); - long committedGen = randomFrom(allGens).generation; - deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(committedGen, Long.MAX_VALUE)); - deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); - assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, - max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles))); - long viewGen = randomFrom(allGens).generation; - try (Releasable ignored = deletionPolicy.acquireTranslogGen(viewGen)) { - assertMinGenRequired(deletionPolicy, readersAndWriter, - min3(committedGen, viewGen, max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles))); - // disable age - deletionPolicy.setRetentionAgeInMillis(-1); - assertMinGenRequired(deletionPolicy, readersAndWriter, - min3(committedGen, viewGen, Math.max(selectedGenerationBySize, selectedGenerationByTotalFiles))); - // disable size - deletionPolicy.setRetentionAgeInMillis(maxAge); - deletionPolicy.setRetentionSizeInBytes(-1); - assertMinGenRequired(deletionPolicy, readersAndWriter, - min3(committedGen, viewGen, Math.max(selectedGenerationByAge, selectedGenerationByTotalFiles))); - // disable age and zie - deletionPolicy.setRetentionAgeInMillis(-1); - deletionPolicy.setRetentionSizeInBytes(-1); - assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen)); - // disable total files - deletionPolicy.setRetentionTotalFiles(0); - assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen)); + assertThat(deletionPolicy.minTranslogGenRequired(), equalTo(committedGen)); + + long gen1 = randomIntBetween(0, allGens.size() - 1); + Releasable releaseGen1 = deletionPolicy.acquireTranslogGen(gen1); + assertThat(deletionPolicy.minTranslogGenRequired(), + equalTo(Math.min(gen1, committedGen))); + + long gen2 = randomIntBetween(0, allGens.size() - 1); + Releasable releaseGen2 = deletionPolicy.acquireTranslogGen(gen2); + assertThat(deletionPolicy.minTranslogGenRequired(), + equalTo(Math.min(Math.min(gen1, gen2), committedGen))); + + if (randomBoolean()) { + releaseGen1.close(); + assertThat(deletionPolicy.minTranslogGenRequired(), + equalTo(Math.min(gen2, committedGen))); + releaseGen2.close(); + } else { + releaseGen2.close(); + assertThat(deletionPolicy.minTranslogGenRequired(), + equalTo(Math.min(gen1, committedGen))); + releaseGen1.close(); } + assertThat(deletionPolicy.minTranslogGenRequired(), equalTo(committedGen)); + } finally { IOUtils.close(readersAndWriter.v1()); IOUtils.close(readersAndWriter.v2()); } - } - private void assertMinGenRequired(TranslogDeletionPolicy deletionPolicy, Tuple, TranslogWriter> readersAndWriter, - long expectedGen) throws IOException { - assertThat(deletionPolicy.minTranslogGenRequired(readersAndWriter.v1(), readersAndWriter.v2()), equalTo(expectedGen)); - } - - private Tuple, TranslogWriter> createReadersAndWriter(final long now) throws IOException { + private Tuple, TranslogWriter> createReadersAndWriter() throws IOException { final Path tempDir = createTempDir(); Files.createFile(tempDir.resolve(Translog.CHECKPOINT_FILE_NAME)); TranslogWriter writer = null; @@ -200,8 +99,6 @@ private Tuple, TranslogWriter> createReadersAndWriter(final tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, 1L, 1L, () -> 1L, () -> 1L, randomNonNegativeLong(), new TragicExceptionHolder(), seqNo -> {}); writer = Mockito.spy(writer); - Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime(); - byte[] bytes = new byte[4]; ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); @@ -213,27 +110,4 @@ private Tuple, TranslogWriter> createReadersAndWriter(final } return new Tuple<>(readers, writer); } - - private static class MockDeletionPolicy extends TranslogDeletionPolicy { - - long now; - - MockDeletionPolicy(long now, long retentionSizeInBytes, long maxRetentionAgeInMillis, int maxRetentionTotalFiles) { - super(retentionSizeInBytes, maxRetentionAgeInMillis, maxRetentionTotalFiles); - this.now = now; - } - - @Override - protected long currentTime() { - return now; - } - } - - private static long max3(long x1, long x2, long x3) { - return Math.max(Math.max(x1, x2), x3); - } - - private static long min3(long x1, long x2, long x3) { - return Math.min(Math.min(x1, x2), x3); - } } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 4bdf0297d5ecf..11315b15d2dbc 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -124,7 +124,6 @@ import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; -import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; @@ -191,12 +190,12 @@ private LongConsumer getPersistedSeqNoConsumer() { protected Translog createTranslog(TranslogConfig config) throws IOException { String translogUUID = Translog.createEmptyTranslog(config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); - return new Translog(config, translogUUID, createTranslogDeletionPolicy(config.getIndexSettings()), + return new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, getPersistedSeqNoConsumer()); } protected Translog openTranslog(TranslogConfig config, String translogUUID) throws IOException { - return new Translog(config, translogUUID, createTranslogDeletionPolicy(config.getIndexSettings()), + return new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, getPersistedSeqNoConsumer()); } @@ -216,7 +215,7 @@ private long commit(Translog translog, long genToRetain, long genToCommit) throw final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); deletionPolicy.setTranslogGenerationOfLastCommit(genToCommit); deletionPolicy.setMinTranslogGenerationForRecovery(genToRetain); - long minGenRequired = deletionPolicy.minTranslogGenRequired(translog.getReaders(), translog.getCurrent()); + long minGenRequired = deletionPolicy.minTranslogGenRequired(); translog.trimUnreferencedReaders(); assertThat(minGenRequired, equalTo(translog.getMinFileGeneration())); assertFilePresences(translog); @@ -247,20 +246,13 @@ public void tearDown() throws Exception { private Translog create(Path path) throws IOException { globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final TranslogConfig translogConfig = getTranslogConfig(path); - final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); - return new Translog(translogConfig, translogUUID, deletionPolicy, () -> globalCheckpoint.get(), primaryTerm::get, + return new Translog(translogConfig, translogUUID, new TranslogDeletionPolicy(), () -> globalCheckpoint.get(), primaryTerm::get, getPersistedSeqNoConsumer()); } private TranslogConfig getTranslogConfig(final Path path) { - final Settings settings = Settings - .builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) - // only randomize between nog age retention and a long one, so failures will have a chance of reproducing - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomBoolean() ? "-1ms" : "1h") - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), randomIntBetween(-1, 2048) + "b") - .build(); + final Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT).build(); return getTranslogConfig(path, settings); } @@ -435,8 +427,6 @@ public void testFindEarliestLastModifiedAge() throws IOException { public void testStats() throws IOException { // self control cleaning for test - translog.getDeletionPolicy().setRetentionSizeInBytes(1024 * 1024); - translog.getDeletionPolicy().setRetentionAgeInMillis(3600 * 1000); final long firstOperationPosition = translog.getFirstOperationPosition(); { final TranslogStats stats = stats(); @@ -513,11 +503,11 @@ public void testStats() throws IOException { } } - markCurrentGenAsCommitted(translog); + commit(translog, translog.currentFileGeneration(), translog.currentFileGeneration()); { final TranslogStats stats = stats(); - assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes)); + assertThat(stats.estimatedNumberOfOperations(), equalTo(0)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(firstOperationPosition)); assertThat(stats.getUncommittedOperations(), equalTo(0)); assertThat(stats.getUncommittedSizeInBytes(), equalTo(firstOperationPosition)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); @@ -526,9 +516,6 @@ public void testStats() throws IOException { public void testUncommittedOperations() throws Exception { final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); - deletionPolicy.setRetentionAgeInMillis(randomLong()); - deletionPolicy.setRetentionSizeInBytes(randomLong()); - final int operations = scaledRandomIntBetween(10, 100); int uncommittedOps = 0; int operationsInLastGen = 0; @@ -1759,7 +1746,7 @@ public void testRandomExceptionsOnTrimOperations( ) throws Exception { List fileChannels = new ArrayList<>(); final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), - false, null, createTranslogDeletionPolicy(), fileChannels); + false, null, new TranslogDeletionPolicy(), fileChannels); IOException expectedException = null; int translogOperations = 0; @@ -1878,7 +1865,7 @@ public void testOpenForeignTranslog() throws IOException { final String foreignTranslog = randomRealisticUnicodeOfCodepointLengthBetween(1, translogGeneration.translogUUID.length()); try { - new Translog(config, foreignTranslog, createTranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, + new Translog(config, foreignTranslog, new TranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}); fail("translog doesn't belong to this UUID"); } catch (TranslogCorruptedException ex) { @@ -2123,8 +2110,7 @@ public void testTragicEventCanBeAnyException() throws IOException { Path tempDir = createTempDir(); final FailSwitch fail = new FailSwitch(); TranslogConfig config = getTranslogConfig(tempDir); - Translog translog = getFailableTranslog(fail, config, false, true, null, - createTranslogDeletionPolicy()); + Translog translog = getFailableTranslog(fail, config, false, true, null, new TranslogDeletionPolicy()); LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly translog.add(new Translog.Index("1", 0, primaryTerm.get(), lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); @@ -2217,7 +2203,7 @@ protected void afterAdd() throws IOException { // drop all that haven't been synced writtenOperations.removeIf(next -> checkpoint.offset < (next.location.translogLocation + next.location.size)); try (Translog tlog = - new Translog(config, translogUUID, createTranslogDeletionPolicy(), + new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}); Translog.Snapshot snapshot = tlog.newSnapshot()) { if (writtenOperations.size() != snapshot.totalOperations()) { @@ -2264,7 +2250,7 @@ public void testRecoveryFromAFutureGenerationCleansUp() throws IOException { // engine blows up, after committing the above generation translog.close(); TranslogConfig config = translog.getConfig(); - final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1, 0); + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE)); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, @@ -2294,8 +2280,6 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { try (Translog translog = getFailableTranslog(fail, config)) { final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); // disable retention so we trim things - deletionPolicy.setRetentionSizeInBytes(-1); - deletionPolicy.setRetentionAgeInMillis(-1); translogUUID = translog.getTranslogUUID(); int translogOperations = randomIntBetween(10, 100); for (int op = 0; op < translogOperations / 2; op++) { @@ -2323,7 +2307,7 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { // expected... } } - final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1, 0); + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE)); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); try (Translog translog = new Translog(config, translogUUID, deletionPolicy, @@ -2339,7 +2323,7 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { } private Translog getFailableTranslog(FailSwitch fail, final TranslogConfig config) throws IOException { - return getFailableTranslog(fail, config, randomBoolean(), false, null, createTranslogDeletionPolicy()); + return getFailableTranslog(fail, config, randomBoolean(), false, null, new TranslogDeletionPolicy()); } private static class FailSwitch { @@ -2523,7 +2507,7 @@ public void testFailWhileCreateWriteWithRecoveredTLogs() throws IOException { translog.add(new Translog.Index("boom", 0, primaryTerm.get(), "boom".getBytes(Charset.forName("UTF-8")))); translog.close(); try { - new Translog(config, translog.getTranslogUUID(), createTranslogDeletionPolicy(), + new Translog(config, translog.getTranslogUUID(), new TranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}) { @Override protected TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen, long initialGlobalCheckpoint, @@ -2651,7 +2635,7 @@ public void testWithRandomException() throws IOException { try { boolean committing = false; final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, - generationUUID, createTranslogDeletionPolicy()); + generationUUID, new TranslogDeletionPolicy()); try { LineFileDocs lineFileDocs = new LineFileDocs(random()); //writes pretty big docs so we cross buffer boarders regularly for (int opsAdded = 0; opsAdded < numOps; opsAdded++) { @@ -2715,7 +2699,7 @@ public void testWithRandomException() throws IOException { // now randomly open this failing tlog again just to make sure we can also recover from failing during recovery if (randomBoolean()) { try { - TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(); + TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); deletionPolicy.setTranslogGenerationOfLastCommit(minGenForRecovery); deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, deletionPolicy)); @@ -2727,7 +2711,7 @@ public void testWithRandomException() throws IOException { } fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file - TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(); + TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); deletionPolicy.setTranslogGenerationOfLastCommit(minGenForRecovery); deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); if (generationUUID == null) { @@ -2809,7 +2793,7 @@ public void testPendingDelete() throws IOException { translog.rollGeneration(); TranslogConfig config = translog.getConfig(); final String translogUUID = translog.getTranslogUUID(); - final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(config.getIndexSettings()); + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); translog.close(); translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}); @@ -2878,15 +2862,7 @@ public void testTranslogOpSerialization() throws Exception { public void testRollGeneration() throws Exception { // make sure we keep some files around - final boolean longRetention = randomBoolean(); final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); - if (longRetention) { - deletionPolicy.setRetentionAgeInMillis(3600 * 1000); - } else { - deletionPolicy.setRetentionAgeInMillis(-1); - } - // we control retention via time, disable size based calculations for simplicity - deletionPolicy.setRetentionSizeInBytes(-1); final long generation = translog.currentFileGeneration(); final int rolls = randomIntBetween(1, 16); int totalOperations = 0; @@ -2921,22 +2897,9 @@ public void testRollGeneration() throws Exception { commit(translog, minGenForRecovery, generation + rolls); assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); assertThat(translog.stats().getUncommittedOperations(), equalTo(0)); - if (longRetention) { - for (int i = 0; i <= rolls; i++) { - assertFileIsPresent(translog, generation + i); - } - deletionPolicy.setRetentionAgeInMillis(randomBoolean() ? 100 : -1); - assertBusy(() -> { - translog.trimUnreferencedReaders(); - for (long i = 0; i < minGenForRecovery; i++) { - assertFileDeleted(translog, i); - } - }); - } else { - // immediate cleanup - for (long i = 0; i < minGenForRecovery; i++) { - assertFileDeleted(translog, i); - } + // immediate cleanup + for (long i = 0; i < minGenForRecovery; i++) { + assertFileDeleted(translog, i); } for (long i = minGenForRecovery; i < generation + rolls; i++) { assertFileIsPresent(translog, i); @@ -3167,9 +3130,8 @@ void callCloseOnTragicEvent() { globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); Path path = createTempDir(); final TranslogConfig translogConfig = getTranslogConfig(path); - final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); - MisbehavingTranslog misbehavingTranslog = new MisbehavingTranslog(translogConfig, translogUUID, deletionPolicy, + MisbehavingTranslog misbehavingTranslog = new MisbehavingTranslog(translogConfig, translogUUID, new TranslogDeletionPolicy(), () -> globalCheckpoint.get(), primaryTerm::get); expectThrows(AssertionError.class, () -> misbehavingTranslog.callCloseDirectly()); @@ -3284,7 +3246,7 @@ public void testSyncConcurrently() throws Exception { return lastGlobalCheckpoint.get(); } }; - try (Translog translog = new Translog(config, translogUUID, createTranslogDeletionPolicy(config.getIndexSettings()), + try (Translog translog = new Translog(config, translogUUID, new TranslogDeletionPolicy(), globalCheckpointSupplier, primaryTerm::get, persistedSeqNos::add)) { Thread[] threads = new Thread[between(2, 8)]; Phaser phaser = new Phaser(threads.length); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index ddc05e3896163..a47dd4b681fd4 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -103,8 +103,6 @@ public void testRetentionPolicyChangeDuringRecovery() throws Exception { recoveryBlocked.await(); IndexMetaData.Builder builder = IndexMetaData.builder(replica.indexSettings().getIndexMetaData()); builder.settings(Settings.builder().put(replica.indexSettings().getSettings()) - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") // force a roll and flush .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), "100b") ); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index dac00626f4420..9f156c162cb2d 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -97,6 +97,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.index.translog.TranslogDeletionPolicy; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.DummyShardLock; @@ -133,7 +134,6 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; -import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -415,7 +415,7 @@ protected Translog createTranslog(Path translogPath, LongSupplier primaryTermSup TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); String translogUUID = Translog.createEmptyTranslog(translogPath, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTermSupplier.getAsLong()); - return new Translog(translogConfig, translogUUID, createTranslogDeletionPolicy(INDEX_SETTINGS), + return new Translog(translogConfig, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTermSupplier, seqNo -> {}); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicies.java b/test/framework/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicies.java deleted file mode 100644 index f0921dfb6ba38..0000000000000 --- a/test/framework/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicies.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.translog; - -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexSettings; - -public class TranslogDeletionPolicies { - - public static TranslogDeletionPolicy createTranslogDeletionPolicy() { - return new TranslogDeletionPolicy( - IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), - IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis(), - IndexSettings.INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.getDefault(Settings.EMPTY) - ); - } - - public static TranslogDeletionPolicy createTranslogDeletionPolicy(IndexSettings indexSettings) { - return new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(), - indexSettings.getTranslogRetentionAge().getMillis(), indexSettings.getTranslogRetentionTotalFiles()); - } - -}