From 84d9c01e7b677c738aff06ec4522e5f53273fce7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 18 Dec 2019 12:04:09 -0500 Subject: [PATCH 1/8] Employ prrl for indices without soft-deletes --- .../upgrades/FullClusterRestartIT.java | 4 +- .../elasticsearch/upgrades/RecoveryIT.java | 5 +- .../org/elasticsearch/index/IndexService.java | 4 +- .../index/seqno/ReplicationTracker.java | 7 +-- .../elasticsearch/index/shard/IndexShard.java | 3 +- .../recovery/RecoverySourceHandler.java | 62 ++++++++----------- .../gateway/ReplicaShardAllocatorIT.java | 8 +-- .../recovery/RecoverySourceHandlerTests.java | 9 +-- .../test/rest/ESRestTestCase.java | 5 +- 9 files changed, 46 insertions(+), 61 deletions(-) diff --git a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java index 2d367261f8895..67d3007e9af16 100644 --- a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java @@ -1278,7 +1278,7 @@ public void testOperationBasedRecovery() throws Exception { } } flush(index, true); - ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index); + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, false); // less than 10% of the committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING). int uncommittedDocs = randomIntBetween(0, (int) (committedDocs * 0.1)); for (int i = 0; i < uncommittedDocs; i++) { @@ -1288,6 +1288,7 @@ public void testOperationBasedRecovery() throws Exception { } else { ensureGreen(index); assertNoFileBasedRecovery(index, n -> true); + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, true); } } @@ -1312,6 +1313,7 @@ public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception { ensureGreen(index); flush(index, true); assertEmptyTranslog(index); + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, true); } } } diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index cd4a07aab3ec0..7bd52d266914d 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -695,7 +695,7 @@ public void testOperationBasedRecovery() throws Exception { ensureGreen(index); indexDocs(index, 0, randomIntBetween(100, 200)); flush(index, randomBoolean()); - ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index); + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, false); // uncommitted docs must be less than 10% of committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING). indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3)); } else { @@ -705,6 +705,9 @@ public void testOperationBasedRecovery() throws Exception { || nodeName.startsWith(CLUSTER_NAME + "-0") || (nodeName.startsWith(CLUSTER_NAME + "-1") && Booleans.parseBoolean(System.getProperty("tests.first_round")) == false)); indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3)); + if (CLUSTER_TYPE == ClusterType.UPGRADED) { + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, true); + } } } diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 481ad3f5a7f6f..15fd0de2760a6 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -820,9 +820,7 @@ private void maybeSyncGlobalCheckpoints() { } private void syncRetentionLeases() { - if (indexSettings.isSoftDeleteEnabled()) { - sync(IndexShard::syncRetentionLeases, "retention lease"); - } + sync(IndexShard::syncRetentionLeases, "retention lease"); } private void sync(final Consumer sync, final String source) { diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 8c42784b88f82..14ef07c656943 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -994,10 +994,7 @@ public synchronized void activatePrimaryMode(final long localCheckpoint) { updateLocalCheckpoint(shardAllocationId, checkpoints.get(shardAllocationId), localCheckpoint); updateGlobalCheckpointOnPrimary(); - if (indexSettings.isSoftDeleteEnabled()) { - addPeerRecoveryRetentionLeaseForSolePrimary(); - } - + addPeerRecoveryRetentionLeaseForSolePrimary(); assert invariant(); } @@ -1358,7 +1355,7 @@ public synchronized boolean hasAllPeerRecoveryRetentionLeases() { * prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases. */ public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener listener) { - if (indexSettings().isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases == false) { + if (hasAllPeerRecoveryRetentionLeases == false) { final List shardRoutings = routingTable.assignedShards(); final GroupedActionListener groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> { setHasAllPeerRecoveryRetentionLeases(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 059961e1e5897..bb32f1acc28d2 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2224,7 +2224,6 @@ public boolean assertRetentionLeasesPersisted() throws IOException { public void syncRetentionLeases() { assert assertPrimaryMode(); verifyNotClosed(); - ensureSoftDeletesEnabled("retention leases"); replicationTracker.renewPeerRecoveryRetentionLeases(); final Tuple retentionLeases = getRetentionLeases(true); if (retentionLeases.v1()) { @@ -2619,7 +2618,7 @@ public RetentionLease addPeerRecoveryRetentionLease(String nodeId, long globalCh ActionListener listener) { assert assertPrimaryMode(); // only needed for BWC reasons involving rolling upgrades from versions that do not support PRRLs: - assert indexSettings.getIndexVersionCreated().before(Version.V_7_4_0); + assert indexSettings.getIndexVersionCreated().before(Version.V_7_4_0) || indexSettings.isSoftDeleteEnabled() == false; return replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 285edc329be06..07db659299e7c 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -165,12 +165,12 @@ public void recoverToTarget(ActionListener listener) { throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; - retentionLeaseRef.set(softDeletesEnabled ? shard.getRetentionLeases().get( - ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) : null); + retentionLeaseRef.set( + shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting))); }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); final Engine.HistorySource historySource; - if (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null) { + if (softDeletesEnabled && (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null)) { historySource = Engine.HistorySource.INDEX; } else { historySource = Engine.HistorySource.TRANSLOG; @@ -190,7 +190,7 @@ && isTargetSameHistory() // Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery // without having a complete history. - if (isSequenceNumberBasedRecovery && retentionLeaseRef.get() != null) { + if (isSequenceNumberBasedRecovery && softDeletesEnabled && retentionLeaseRef.get() != null) { // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock retentionLock.close(); logger.trace("history is retained by {}", retentionLeaseRef.get()); @@ -209,7 +209,7 @@ && isTargetSameHistory() if (isSequenceNumberBasedRecovery) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); startingSeqNo = request.startingSeqNo(); - if (softDeletesEnabled && retentionLeaseRef.get() == null) { + if (retentionLeaseRef.get() == null) { createRetentionLease(startingSeqNo, ActionListener.map(sendFileStep, ignored -> SendFileResult.EMPTY)); } else { sendFileStep.onResponse(SendFileResult.EMPTY); @@ -251,36 +251,24 @@ && isTargetSameHistory() }); final StepListener deleteRetentionLeaseStep = new StepListener<>(); - if (softDeletesEnabled) { - runUnderPrimaryPermit(() -> { - try { - // If the target previously had a copy of this shard then a file-based recovery might move its global - // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a - // new one later on in the recovery. - shard.removePeerRecoveryRetentionLease(request.targetNode().getId(), - new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, - deleteRetentionLeaseStep, false)); - } catch (RetentionLeaseNotFoundException e) { - logger.debug("no peer-recovery retention lease for " + request.targetAllocationId()); - deleteRetentionLeaseStep.onResponse(null); - } - }, shardId + " removing retention leaes for [" + request.targetAllocationId() + "]", - shard, cancellableThreads, logger); - } else { - deleteRetentionLeaseStep.onResponse(null); - } + runUnderPrimaryPermit(() -> { + try { + // If the target previously had a copy of this shard then a file-based recovery might move its global + // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a + // new one later on in the recovery. + shard.removePeerRecoveryRetentionLease(request.targetNode().getId(), + new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, + deleteRetentionLeaseStep, false)); + } catch (RetentionLeaseNotFoundException e) { + logger.debug("no peer-recovery retention lease for " + request.targetAllocationId()); + deleteRetentionLeaseStep.onResponse(null); + } + }, shardId + " removing retention lease for [" + request.targetAllocationId() + "]", + shard, cancellableThreads, logger); deleteRetentionLeaseStep.whenComplete(ignored -> { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]"); - - final Consumer> createRetentionLeaseAsync; - if (softDeletesEnabled) { - createRetentionLeaseAsync = l -> createRetentionLease(startingSeqNo, l); - } else { - createRetentionLeaseAsync = l -> l.onResponse(null); - } - - phase1(safeCommitRef.getIndexCommit(), createRetentionLeaseAsync, () -> estimateNumOps, sendFileStep); + phase1(safeCommitRef.getIndexCommit(), startingSeqNo, () -> estimateNumOps, sendFileStep); }, onFailure); } catch (final Exception e) { @@ -451,8 +439,7 @@ static final class SendFileResult { * segments that are missing. Only segments that have the same size and * checksum can be reused */ - void phase1(IndexCommit snapshot, Consumer> createRetentionLease, - IntSupplier translogOps, ActionListener listener) { + void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener listener) { cancellableThreads.checkForCancel(); final Store store = shard.store(); try { @@ -526,7 +513,7 @@ void phase1(IndexCommit snapshot, Consumer> creat sendFileInfoStep.whenComplete(r -> sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure); - sendFilesStep.whenComplete(r -> createRetentionLease.accept(createRetentionLeaseStep), listener::onFailure); + sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure); createRetentionLeaseStep.whenComplete(retentionLease -> { @@ -554,7 +541,7 @@ void phase1(IndexCommit snapshot, Consumer> creat // but we must still create a retention lease final StepListener createRetentionLeaseStep = new StepListener<>(); - createRetentionLease.accept(createRetentionLeaseStep); + createRetentionLease(startingSeqNo, createRetentionLeaseStep); createRetentionLeaseStep.whenComplete(retentionLease -> { final TimeValue took = stopWatch.totalTime(); logger.trace("recovery [phase1]: took [{}]", took); @@ -590,7 +577,8 @@ private void createRetentionLease(final long startingSeqNo, ActionListener addRetentionLeaseStep = new StepListener<>(); final long estimatedGlobalCheckpoint = startingSeqNo - 1; final RetentionLease newLease = shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java index 35064b0063676..9868adfe3b86b 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java @@ -78,7 +78,7 @@ public void testPreferCopyCanPerformNoopRecovery() throws Exception { assertAcked( client().admin().indices().prepareCreate(indexName) .setSettings(Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 1.0f) @@ -211,7 +211,7 @@ public void testFullClusterRestartPerformNoopRecovery() throws Exception { assertAcked( client().admin().indices().prepareCreate(indexName) .setSettings(Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), randomIntBetween(10, 100) + "kb") .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numOfReplicas) @@ -248,7 +248,7 @@ public void testPreferCopyWithHighestMatchingOperations() throws Exception { assertAcked( client().admin().indices().prepareCreate(indexName) .setSettings(Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), randomIntBetween(10, 100) + "kb") .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) @@ -329,7 +329,7 @@ public void testPeerRecoveryForClosedIndices() throws Exception { createIndex(indexName, Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()) .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") .build()); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index e57f5162a7a49..db9f52f942e63 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -64,7 +64,6 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.ReplicationTracker; -import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -102,7 +101,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; import java.util.function.IntSupplier; import java.util.zip.CRC32; @@ -467,10 +465,9 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE between(1, 8)) { @Override - void phase1(IndexCommit snapshot, Consumer> createRetentionLease, - IntSupplier translogOps, ActionListener listener) { + void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener listener) { phase1Called.set(true); - super.phase1(snapshot, createRetentionLease, translogOps, listener); + super.phase1(snapshot, startingSeqNo, translogOps, listener); } @Override @@ -686,7 +683,7 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada try { final CountDownLatch latch = new CountDownLatch(1); handler.phase1(DirectoryReader.listCommits(dir).get(0), - l -> recoveryExecutor.execute(() -> l.onResponse(null)), + 0, () -> 0, new LatchedActionListener<>(phase1Listener, latch)); latch.await(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 73ca1dde99ca8..5182fa07546f5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -1128,7 +1128,7 @@ public void assertEmptyTranslog(String index) throws Exception { * Peer recovery retention leases are renewed and synced to replicas periodically (every 30 seconds). This ensures * that we have renewed every PRRL to the global checkpoint of the corresponding copy and properly synced to all copies. */ - public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index) throws Exception { + public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index, boolean alwaysExists) throws Exception { assertBusy(() -> { Map stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards"))); @SuppressWarnings("unchecked") Map>> shards = @@ -1139,9 +1139,10 @@ public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index) thro assertNotNull(globalCheckpoint); @SuppressWarnings("unchecked") List> retentionLeases = (List>) XContentMapValues.extractValue("retention_leases.leases", copy); - if (retentionLeases == null) { + if (alwaysExists == false && retentionLeases == null) { continue; } + assertNotNull(retentionLeases); for (Map retentionLease : retentionLeases) { if (((String) retentionLease.get("id")).startsWith("peer_recovery/")) { assertThat(retentionLease.get("retaining_seq_no"), equalTo(globalCheckpoint + 1)); From bda80cde2422eae99934e3e9f86cd51c699fe623 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 19 Dec 2019 02:18:04 -0500 Subject: [PATCH 2/8] policy with retaining seqno --- .../index/engine/InternalEngine.java | 23 +++++++++++++++---- .../translog/TranslogDeletionPolicy.java | 20 ++++++++++++++-- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1e445c0b0ac85..886619778c559 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -87,6 +87,7 @@ import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.seqno.LocalCheckpointTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; @@ -189,11 +190,7 @@ public InternalEngine(EngineConfig engineConfig) { final EngineConfig engineConfig, final BiFunction localCheckpointTrackerSupplier) { super(engineConfig); - final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( - engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), - engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(), - engineConfig.getIndexSettings().getTranslogRetentionTotalFiles() - ); + final TranslogDeletionPolicy translogDeletionPolicy = newTranslogDeletionPolicy(engineConfig); store.incRef(); IndexWriter writer = null; Translog translog = null; @@ -488,6 +485,22 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery translog.trimUnreferencedReaders(); } + private static TranslogDeletionPolicy newTranslogDeletionPolicy(EngineConfig config) { + final LongSupplier retainingSeqNo; + if (config.getIndexSettings().isSoftDeleteEnabled()) { + retainingSeqNo = () -> Long.MAX_VALUE; + } else { + retainingSeqNo = () -> config.retentionLeasesSupplier().get().leases().stream() + .mapToLong(RetentionLease::retainingSequenceNumber) + .max().orElse(Long.MAX_VALUE); + }; + return new TranslogDeletionPolicy( + config.getIndexSettings().getTranslogRetentionSize().getBytes(), + config.getIndexSettings().getTranslogRetentionAge().getMillis(), + config.getIndexSettings().getTranslogRetentionTotalFiles(), + retainingSeqNo); + } + private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier, LongConsumer persistedSequenceNumberConsumer) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 8a553aad326b7..a2c26220fc4a2 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongSupplier; public class TranslogDeletionPolicy { @@ -65,10 +66,14 @@ public void assertNoOpenTranslogRefs() { private int retentionTotalFiles; - public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) { + private final LongSupplier retainingSeqNoSupplier; + + public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles, + LongSupplier retainingSeqNoSupplier) { this.retentionSizeInBytes = retentionSizeInBytes; this.retentionAgeInMillis = retentionAgeInMillis; this.retentionTotalFiles = retentionTotalFiles; + this.retainingSeqNoSupplier = retainingSeqNoSupplier; if (Assertions.ENABLED) { openTranslogRef = new ConcurrentHashMap<>(); } else { @@ -172,7 +177,9 @@ synchronized long minTranslogGenRequired(List readers, TranslogW minByAgeAndSize = Math.max(minByAge, minBySize); } long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles); - return Math.min(Math.max(minByAgeAndSize, minByNumFiles), Math.min(minByLocks, minTranslogGenerationForRecovery)); + long minByPolicy = Math.max(minByAgeAndSize, minByNumFiles); + long minByRetainingSeqNo = getMinTranslogGenByRetainingSeqNo(readers, writer, retainingSeqNoSupplier.getAsLong()); + return Math.min(Math.max(minByPolicy, minByRetainingSeqNo), Math.min(minByLocks, minTranslogGenerationForRecovery)); } static long getMinTranslogGenBySize(List readers, TranslogWriter writer, long retentionSizeInBytes) { @@ -214,6 +221,15 @@ static long getMinTranslogGenByTotalFiles(List readers, Translog return minGen; } + static long getMinTranslogGenByRetainingSeqNo(List readers, TranslogWriter writer, long seqNo) { + for (TranslogReader reader : readers) { + if (reader.getCheckpoint().maxEffectiveSeqNo() >= seqNo) { + return reader.generation; + } + } + return writer.generation; + } + protected long currentTime() { return System.currentTimeMillis(); } From 8d594458769f776af7786a5c9c955644e77e9289 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 19 Dec 2019 02:21:19 -0500 Subject: [PATCH 3/8] Revert "policy with retaining seqno" This reverts commit 781a4b825a4d429cb07aeb8b51975e6b8573a8a9. --- .../index/engine/InternalEngine.java | 23 ++++--------------- .../translog/TranslogDeletionPolicy.java | 20 ++-------------- 2 files changed, 7 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 886619778c559..1e445c0b0ac85 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -87,7 +87,6 @@ import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.seqno.LocalCheckpointTracker; -import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; @@ -190,7 +189,11 @@ public InternalEngine(EngineConfig engineConfig) { final EngineConfig engineConfig, final BiFunction localCheckpointTrackerSupplier) { super(engineConfig); - final TranslogDeletionPolicy translogDeletionPolicy = newTranslogDeletionPolicy(engineConfig); + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( + engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), + engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(), + engineConfig.getIndexSettings().getTranslogRetentionTotalFiles() + ); store.incRef(); IndexWriter writer = null; Translog translog = null; @@ -485,22 +488,6 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery translog.trimUnreferencedReaders(); } - private static TranslogDeletionPolicy newTranslogDeletionPolicy(EngineConfig config) { - final LongSupplier retainingSeqNo; - if (config.getIndexSettings().isSoftDeleteEnabled()) { - retainingSeqNo = () -> Long.MAX_VALUE; - } else { - retainingSeqNo = () -> config.retentionLeasesSupplier().get().leases().stream() - .mapToLong(RetentionLease::retainingSequenceNumber) - .max().orElse(Long.MAX_VALUE); - }; - return new TranslogDeletionPolicy( - config.getIndexSettings().getTranslogRetentionSize().getBytes(), - config.getIndexSettings().getTranslogRetentionAge().getMillis(), - config.getIndexSettings().getTranslogRetentionTotalFiles(), - retainingSeqNo); - } - private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier, LongConsumer persistedSequenceNumberConsumer) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index a2c26220fc4a2..8a553aad326b7 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.LongSupplier; public class TranslogDeletionPolicy { @@ -66,14 +65,10 @@ public void assertNoOpenTranslogRefs() { private int retentionTotalFiles; - private final LongSupplier retainingSeqNoSupplier; - - public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles, - LongSupplier retainingSeqNoSupplier) { + public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) { this.retentionSizeInBytes = retentionSizeInBytes; this.retentionAgeInMillis = retentionAgeInMillis; this.retentionTotalFiles = retentionTotalFiles; - this.retainingSeqNoSupplier = retainingSeqNoSupplier; if (Assertions.ENABLED) { openTranslogRef = new ConcurrentHashMap<>(); } else { @@ -177,9 +172,7 @@ synchronized long minTranslogGenRequired(List readers, TranslogW minByAgeAndSize = Math.max(minByAge, minBySize); } long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles); - long minByPolicy = Math.max(minByAgeAndSize, minByNumFiles); - long minByRetainingSeqNo = getMinTranslogGenByRetainingSeqNo(readers, writer, retainingSeqNoSupplier.getAsLong()); - return Math.min(Math.max(minByPolicy, minByRetainingSeqNo), Math.min(minByLocks, minTranslogGenerationForRecovery)); + return Math.min(Math.max(minByAgeAndSize, minByNumFiles), Math.min(minByLocks, minTranslogGenerationForRecovery)); } static long getMinTranslogGenBySize(List readers, TranslogWriter writer, long retentionSizeInBytes) { @@ -221,15 +214,6 @@ static long getMinTranslogGenByTotalFiles(List readers, Translog return minGen; } - static long getMinTranslogGenByRetainingSeqNo(List readers, TranslogWriter writer, long seqNo) { - for (TranslogReader reader : readers) { - if (reader.getCheckpoint().maxEffectiveSeqNo() >= seqNo) { - return reader.generation; - } - } - return writer.generation; - } - protected long currentTime() { return System.currentTimeMillis(); } From f630cdb46e429378d69e72b86bec35b56fe0544e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 19 Dec 2019 03:18:59 -0500 Subject: [PATCH 4/8] fix test --- .../index/shard/IndexShardRetentionLeaseTests.java | 3 +-- .../main/java/org/elasticsearch/test/rest/ESRestTestCase.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index ed429bb680d7d..31bdfce261ad9 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -314,8 +314,7 @@ public void testRetentionLeasesActionsFailWithSoftDeletesDisabled() throws Excep assertThat(expectThrows(AssertionError.class, () -> shard.removeRetentionLease( randomAlphaOfLength(10), ActionListener.wrap(() -> {}))).getMessage(), equalTo("retention leases requires soft deletes but [index] does not have soft deletes enabled")); - assertThat(expectThrows(AssertionError.class, shard::syncRetentionLeases).getMessage(), - equalTo("retention leases requires soft deletes but [index] does not have soft deletes enabled")); + shard.syncRetentionLeases(); closeShards(shard); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 5182fa07546f5..60b00ad842718 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -1139,7 +1139,7 @@ public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index, bool assertNotNull(globalCheckpoint); @SuppressWarnings("unchecked") List> retentionLeases = (List>) XContentMapValues.extractValue("retention_leases.leases", copy); - if (alwaysExists == false && retentionLeases == null) { + if (alwaysExists == false && retentionLeases == null) { continue; } assertNotNull(retentionLeases); From 9c0d68bb3afecd8358ef2898c8f1cf8aa4ae530d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 19 Dec 2019 03:26:54 -0500 Subject: [PATCH 5/8] adjust hasAllPeerRecoveryRetentionLeases flag --- .../elasticsearch/index/seqno/ReplicationTracker.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 14ef07c656943..bfe89a65d89df 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -895,10 +895,12 @@ public ReplicationTracker( this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; - this.hasAllPeerRecoveryRetentionLeases = indexSettings.isSoftDeleteEnabled() && - (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) || - (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) && - indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN)); + this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0) + || (indexSettings.isSoftDeleteEnabled() && + (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) || + (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) && + indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN))); + this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings()); this.safeCommitInfoSupplier = safeCommitInfoSupplier; assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; From 877b6863ff48462ff20e8ea567fa9c59976c7402 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 19 Dec 2019 09:35:19 -0500 Subject: [PATCH 6/8] Do not disable translog retention for indices without soft-deletes --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index bb32f1acc28d2..5415a433d8670 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1892,10 +1892,10 @@ boolean shouldRollTranslogGeneration() { public void onSettingsChanged() { Engine engineOrNull = getEngineOrNull(); if (engineOrNull != null) { - final boolean useRetentionLeasesInPeerRecovery = this.useRetentionLeasesInPeerRecovery; + final boolean disableTranslogRetention = indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery; engineOrNull.onSettingsChanged( - useRetentionLeasesInPeerRecovery ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(), - useRetentionLeasesInPeerRecovery ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(), + disableTranslogRetention ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(), + disableTranslogRetention ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(), indexSettings.getSoftDeleteRetentionOperations() ); } From 36d2dd9e4785aadcc44f1c56fa44ea49e6ce151d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 19 Dec 2019 10:34:42 -0500 Subject: [PATCH 7/8] remove stale test --- .../index/seqno/RetentionLeaseIT.java | 30 ------------------- 1 file changed, 30 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index 831422f8dad86..f081f87eaa365 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -336,36 +336,6 @@ public void testBackgroundRetentionLeaseSync() throws Exception { } } - public void testRetentionLeasesBackgroundSyncWithSoftDeletesDisabled() throws Exception { - final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); - internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); - TimeValue syncIntervalSetting = TimeValue.timeValueMillis(between(1, 100)); - final Settings settings = Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", numberOfReplicas) - .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), syncIntervalSetting.getStringRep()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) - .build(); - createIndex("index", settings); - final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId(); - final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName(); - final MockTransportService primaryTransportService = (MockTransportService) internalCluster().getInstance( - TransportService.class, primaryShardNodeName); - final AtomicBoolean backgroundSyncRequestSent = new AtomicBoolean(); - primaryTransportService.addSendBehavior((connection, requestId, action, request, options) -> { - if (action.startsWith(RetentionLeaseBackgroundSyncAction.ACTION_NAME)) { - backgroundSyncRequestSent.set(true); - } - connection.sendRequest(requestId, action, request, options); - }); - final long start = System.nanoTime(); - ensureGreen("index"); - final long syncEnd = System.nanoTime(); - // We sleep long enough for the retention leases background sync to be triggered - Thread.sleep(Math.max(0, randomIntBetween(2, 3) * syncIntervalSetting.millis() - TimeUnit.NANOSECONDS.toMillis(syncEnd - start))); - assertFalse("retention leases background sync must be a noop if soft deletes is disabled", backgroundSyncRequestSent.get()); - } - public void testRetentionLeasesSyncOnRecovery() throws Exception { final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); From 6071abdc20f0933f0bfd1c1a87608e2821f883c1 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 19 Dec 2019 12:37:15 -0500 Subject: [PATCH 8/8] make sure every copy has established PRRL --- .../org/elasticsearch/test/rest/ESRestTestCase.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 60b00ad842718..f10f444e918ed 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -52,6 +52,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESTestCase; @@ -87,12 +88,15 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.stream.Collectors; import static java.util.Collections.sort; import static java.util.Collections.unmodifiableList; import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.in; /** * Superclass for tests that interact with an external test cluster using Elasticsearch's {@link RestClient}. @@ -1148,6 +1152,15 @@ public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index, bool assertThat(retentionLease.get("retaining_seq_no"), equalTo(globalCheckpoint + 1)); } } + if (alwaysExists) { + List existingLeaseIds = retentionLeases.stream().map(lease -> (String) lease.get("id")) + .collect(Collectors.toList()); + List expectedLeaseIds = shard.stream() + .map(shr -> (String) XContentMapValues.extractValue("routing.node", shr)) + .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId) + .collect(Collectors.toList()); + assertThat("not every active copy has established its PPRL", expectedLeaseIds, everyItem(in(existingLeaseIds))); + } } } }, 60, TimeUnit.SECONDS);