diff --git a/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index add4a44390373..b07b68d82b844 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -71,12 +71,18 @@ public void onCommit(List commits) throws IOException { } private void setLastCommittedTranslogGeneration(List commits) throws IOException { - // when opening an existing lucene index, we currently always open the last commit. - // we therefore use the translog gen as the one that will be required for recovery - final IndexCommit indexCommit = commits.get(commits.size() - 1); - assert indexCommit.isDeleted() == false : "last commit is deleted"; - long minGen = Long.parseLong(indexCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); - translogDeletionPolicy.setMinTranslogGenerationForRecovery(minGen); + // We need to keep translog since the smallest translog generation of un-deleted commits. + // However, there are commits that are not deleted just because they are being snapshotted (rather than being kept by the policy). + // TODO: We need to distinguish those commits and skip them in calculating the minimum required translog generation. + long minRequiredGen = Long.MAX_VALUE; + for (IndexCommit indexCommit : commits) { + if (indexCommit.isDeleted() == false) { + long translogGen = Long.parseLong(indexCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); + minRequiredGen = Math.min(translogGen, minRequiredGen); + } + } + assert minRequiredGen != Long.MAX_VALUE : "All commits are deleted"; + translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen); } public SnapshotDeletionPolicy getIndexDeletionPolicy() { diff --git a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 5d4385cbd384b..689ecea9f4afd 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -60,20 +60,23 @@ public void testSettingMinTranslogGen() throws IOException { EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); List commitList = new ArrayList<>(); long count = randomIntBetween(10, 20); - long lastGen = 0; + long minGen = Long.MAX_VALUE; for (int i = 0; i < count; i++) { - lastGen += randomIntBetween(10, 20000); + long lastGen = randomIntBetween(10, 20000); + minGen = Math.min(minGen, lastGen); commitList.add(mockIndexCommitWithTranslogGen(lastGen)); } combinedDeletionPolicy.onInit(commitList); - verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(lastGen); + verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(minGen); commitList.clear(); + minGen = Long.MAX_VALUE; for (int i = 0; i < count; i++) { - lastGen += randomIntBetween(10, 20000); + long lastGen = randomIntBetween(10, 20000); + minGen = Math.min(minGen, lastGen); commitList.add(mockIndexCommitWithTranslogGen(lastGen)); } combinedDeletionPolicy.onCommit(commitList); - verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(lastGen); + verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(minGen); } IndexCommit mockIndexCommitWithTranslogGen(long gen) throws IOException {