From 58f86ac0f90ea9021be861671e75b72228b22eb0 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 11 Feb 2018 20:07:18 -0500 Subject: [PATCH 1/9] Only revisit deletion policy when no pending snapshot A follow-up of #28140 We currently revisit the index deletion policy whenever the global checkpoint has advanced enough. However, we won't be able to clean up the old commit points if they are being snapshotted. Here we prefer a simple solution over an optimal solution as we should revisit if only the last commit is being snapshotted. --- .../elasticsearch/index/engine/CombinedDeletionPolicy.java | 6 +++++- .../index/engine/CombinedDeletionPolicyTests.java | 3 +++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 2a0d89cb1e2a2..f1940c70447f8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -50,6 +50,7 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { private final LongSupplier globalCheckpointSupplier; private final IndexCommit startingCommit; private final ObjectIntHashMap snapshottedCommits; // Number of snapshots held against each commit point. + private volatile int pendingSnapshots; private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint. private volatile IndexCommit lastCommit; // the most recent commit point @@ -61,6 +62,7 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { this.globalCheckpointSupplier = globalCheckpointSupplier; this.startingCommit = startingCommit; this.snapshottedCommits = new ObjectIntHashMap<>(); + this.pendingSnapshots = 0; } @Override @@ -163,6 +165,7 @@ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) { assert lastCommit != null : "Last commit is not initialized yet"; final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit; snapshottedCommits.addTo(snapshotting, 1); // increase refCount + pendingSnapshots++; return new SnapshotIndexCommit(snapshotting); } @@ -178,6 +181,7 @@ synchronized void releaseCommit(final IndexCommit snapshotCommit) { if (refCount == 0) { snapshottedCommits.remove(releasingCommit); } + pendingSnapshots--; } /** @@ -235,7 +239,7 @@ private static int indexOfKeptCommits(List commits, long */ boolean hasUnreferencedCommits() throws IOException { final IndexCommit lastCommit = this.lastCommit; - if (safeCommit != lastCommit) { // Race condition can happen but harmless + if (safeCommit != lastCommit && pendingSnapshots == 0) { // Race condition can happen but harmless if (lastCommit.getUserData().containsKey(SequenceNumbers.MAX_SEQ_NO)) { final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)); // We can clean up the current safe commit if the last commit is safe diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index f663504da9fa7..1aa81a0765162 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -277,6 +277,9 @@ public void testCheckUnreferencedCommits() throws Exception { assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false)); // Advanced enough globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE)); + final IndexCommit snapshot = indexPolicy.acquireIndexCommit(randomBoolean()); + assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false)); // Having snapshot -> false. + indexPolicy.releaseCommit(snapshot); assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(true)); commitList.forEach(this::resetDeletion); indexPolicy.onCommit(commitList); From e4f822979572a28cce5b41a0fe8574ecd4aa88d0 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 12 Feb 2018 11:19:15 -0500 Subject: [PATCH 2/9] Revert "Only revisit deletion policy when no pending snapshot" This reverts commit 58f86ac0f90ea9021be861671e75b72228b22eb0. --- .../elasticsearch/index/engine/CombinedDeletionPolicy.java | 6 +----- .../index/engine/CombinedDeletionPolicyTests.java | 3 --- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index f1940c70447f8..2a0d89cb1e2a2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -50,7 +50,6 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { private final LongSupplier globalCheckpointSupplier; private final IndexCommit startingCommit; private final ObjectIntHashMap snapshottedCommits; // Number of snapshots held against each commit point. - private volatile int pendingSnapshots; private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint. private volatile IndexCommit lastCommit; // the most recent commit point @@ -62,7 +61,6 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { this.globalCheckpointSupplier = globalCheckpointSupplier; this.startingCommit = startingCommit; this.snapshottedCommits = new ObjectIntHashMap<>(); - this.pendingSnapshots = 0; } @Override @@ -165,7 +163,6 @@ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) { assert lastCommit != null : "Last commit is not initialized yet"; final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit; snapshottedCommits.addTo(snapshotting, 1); // increase refCount - pendingSnapshots++; return new SnapshotIndexCommit(snapshotting); } @@ -181,7 +178,6 @@ synchronized void releaseCommit(final IndexCommit snapshotCommit) { if (refCount == 0) { snapshottedCommits.remove(releasingCommit); } - pendingSnapshots--; } /** @@ -239,7 +235,7 @@ private static int indexOfKeptCommits(List commits, long */ boolean hasUnreferencedCommits() throws IOException { final IndexCommit lastCommit = this.lastCommit; - if (safeCommit != lastCommit && pendingSnapshots == 0) { // Race condition can happen but harmless + if (safeCommit != lastCommit) { // Race condition can happen but harmless if (lastCommit.getUserData().containsKey(SequenceNumbers.MAX_SEQ_NO)) { final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)); // We can clean up the current safe commit if the last commit is safe diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 1aa81a0765162..f663504da9fa7 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -277,9 +277,6 @@ public void testCheckUnreferencedCommits() throws Exception { assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false)); // Advanced enough globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE)); - final IndexCommit snapshot = indexPolicy.acquireIndexCommit(randomBoolean()); - assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false)); // Having snapshot -> false. - indexPolicy.releaseCommit(snapshot); assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(true)); commitList.forEach(this::resetDeletion); indexPolicy.onCommit(commitList); From 4cb6cae14814c9b12178f1697518e0e262ecd350 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 12 Feb 2018 12:31:37 -0500 Subject: [PATCH 3/9] Revisit deletion policy after release the last snapshot --- .../index/engine/CombinedDeletionPolicy.java | 8 ++++- .../index/engine/InternalEngine.java | 7 +++- .../engine/CombinedDeletionPolicyTests.java | 4 ++- .../index/engine/InternalEngineTests.java | 34 +++++++++++++++++++ 4 files changed, 50 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 2a0d89cb1e2a2..c03c74a0e2e6d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -168,8 +168,11 @@ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) { /** * Releases an index commit that acquired by {@link #acquireIndexCommit(boolean)}. + * + * @return true if the snapshotting commit can be clean up. + * The commit can be removed only if the releasing snapshot is its last snapshot and the commit isn't the last commit. */ - synchronized void releaseCommit(final IndexCommit snapshotCommit) { + synchronized boolean releaseCommit(final IndexCommit snapshotCommit) { final IndexCommit releasingCommit = ((SnapshotIndexCommit) snapshotCommit).delegate; assert snapshottedCommits.containsKey(releasingCommit) : "Release non-snapshotted commit;" + "snapshotted commits [" + snapshottedCommits + "], releasing commit [" + releasingCommit + "]"; @@ -177,6 +180,9 @@ synchronized void releaseCommit(final IndexCommit snapshotCommit) { assert refCount >= 0 : "Number of snapshots can not be negative [" + refCount + "]"; if (refCount == 0) { snapshottedCommits.remove(releasingCommit); + return releasingCommit.equals(lastCommit) == false; + } else { + return false; } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index e3f4c456643e4..76ff3fc74ba5e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1716,7 +1716,12 @@ public IndexCommitRef acquireIndexCommit(final boolean safeCommit, final boolean logger.trace("finish flush for snapshot"); } final IndexCommit snapshotCommit = combinedDeletionPolicy.acquireIndexCommit(safeCommit); - return new Engine.IndexCommitRef(snapshotCommit, () -> combinedDeletionPolicy.releaseCommit(snapshotCommit)); + return new Engine.IndexCommitRef(snapshotCommit, () -> { + // Revisit the deletion policy if we can clean up the snapshotting commit. + if (combinedDeletionPolicy.releaseCommit(snapshotCommit)) { + indexWriter.deleteUnusedFiles(); + } + }); } private boolean failOnTragicEvent(AlreadyClosedException ex) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index f663504da9fa7..665f0e3327749 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -133,7 +133,9 @@ public void testAcquireIndexCommit() throws Exception { } randomSubsetOf(snapshottingCommits).forEach(snapshot -> { snapshottingCommits.remove(snapshot); - indexPolicy.releaseCommit(snapshot); + final long pendingSnapshots = snapshottingCommits.stream().filter(snapshot::equals).count(); + final IndexCommit lastCommit = commitList.get(commitList.size() - 1); + assertThat(indexPolicy.releaseCommit(snapshot), equalTo(pendingSnapshots == 0 && snapshot.equals(lastCommit) == false)); }); // Snapshotting commits must not be deleted. snapshottingCommits.forEach(snapshot -> assertThat(snapshot.isDeleted(), equalTo(false))); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 127f33b13654d..039bcb8afeeae 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4449,6 +4449,40 @@ public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception { } } + public void testCleanupCommitsWhenReleaseSnapshot() throws Exception { + IOUtils.close(engine, store); + store = createStore(); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) { + final int numDocs = scaledRandomIntBetween(10, 100); + for (int docId = 0; docId < numDocs; docId++) { + index(engine, docId); + if (frequently()) { + engine.flush(randomBoolean(), randomBoolean()); + } + } + engine.flush(false, randomBoolean()); + List commits = DirectoryReader.listCommits(store.directory()); + final IndexCommit lastCommit = commits.get(commits.size() - 1); + final IndexCommit safeCommit = commits.get(0); + int numSnapshots = between(1, 10); + final List snapshots = new ArrayList<>(); + for (int i = 0; i < numSnapshots; i++) { + snapshots.add(engine.acquireIndexCommit(true, false)); // taking snapshots from the safe commit. + } + // Global checkpoint advanced - clean up all commits except the last commit and the safe commit (snapshotted). + globalCheckpoint.set(randomLongBetween(engine.getLocalCheckpointTracker().getCheckpoint(), Long.MAX_VALUE)); + engine.syncTranslog(); + assertThat(DirectoryReader.listCommits(store.directory()), contains(safeCommit, lastCommit)); + for (int i = 0; i < numSnapshots - 1; i++) { + snapshots.get(i).close(); + assertThat(DirectoryReader.listCommits(store.directory()), contains(safeCommit, lastCommit)); + } + snapshots.get(numSnapshots - 1).close(); // released last snapshot - delete the commit. + assertThat(DirectoryReader.listCommits(store.directory()), contains(lastCommit)); + } + } + public void testShouldPeriodicallyFlush() throws Exception { assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false)); int numDocs = between(10, 100); From 1472e53befbfba482239c7be4db163dd775d924c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 12 Feb 2018 13:16:23 -0500 Subject: [PATCH 4/9] Add comment --- .../elasticsearch/index/engine/CombinedDeletionPolicy.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index c03c74a0e2e6d..56c4693b04060 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -170,7 +170,6 @@ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) { * Releases an index commit that acquired by {@link #acquireIndexCommit(boolean)}. * * @return true if the snapshotting commit can be clean up. - * The commit can be removed only if the releasing snapshot is its last snapshot and the commit isn't the last commit. */ synchronized boolean releaseCommit(final IndexCommit snapshotCommit) { final IndexCommit releasingCommit = ((SnapshotIndexCommit) snapshotCommit).delegate; @@ -180,10 +179,9 @@ synchronized boolean releaseCommit(final IndexCommit snapshotCommit) { assert refCount >= 0 : "Number of snapshots can not be negative [" + refCount + "]"; if (refCount == 0) { snapshottedCommits.remove(releasingCommit); - return releasingCommit.equals(lastCommit) == false; - } else { - return false; } + // The commit can be clean up only if no pending snapshot and it is not the last commit. + return refCount == 0 && releasingCommit.equals(lastCommit) == false; } /** From 567dadcdc58b636af3fecaa22ba316e8e056872a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 12 Feb 2018 13:41:59 -0500 Subject: [PATCH 5/9] Only revisit if writer is still open --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 76ff3fc74ba5e..1afe28f4189f4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1719,7 +1719,9 @@ public IndexCommitRef acquireIndexCommit(final boolean safeCommit, final boolean return new Engine.IndexCommitRef(snapshotCommit, () -> { // Revisit the deletion policy if we can clean up the snapshotting commit. if (combinedDeletionPolicy.releaseCommit(snapshotCommit)) { - indexWriter.deleteUnusedFiles(); + if (indexWriter.isOpen()) { + indexWriter.deleteUnusedFiles(); + } } }); } From 087a8236b5fe8e1520e5c28eb4aeaf6c09bf792f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 12 Feb 2018 14:06:56 -0500 Subject: [PATCH 6/9] No need to check if engine is closed or not --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 4 +--- .../org/elasticsearch/index/engine/InternalEngineTests.java | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1afe28f4189f4..76ff3fc74ba5e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1719,9 +1719,7 @@ public IndexCommitRef acquireIndexCommit(final boolean safeCommit, final boolean return new Engine.IndexCommitRef(snapshotCommit, () -> { // Revisit the deletion policy if we can clean up the snapshotting commit. if (combinedDeletionPolicy.releaseCommit(snapshotCommit)) { - if (indexWriter.isOpen()) { - indexWriter.deleteUnusedFiles(); - } + indexWriter.deleteUnusedFiles(); } }); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 039bcb8afeeae..0dba3084dcc23 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2079,9 +2079,9 @@ public void testSeqNoAndCheckpoints() throws IOException { // this test writes documents to the engine while concurrently flushing/commit // and ensuring that the commit points contain the correct sequence number data public void testConcurrentWritesAndCommits() throws Exception { - List commits = new ArrayList<>(); try (Store store = createStore(); InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { + final List commits = new ArrayList<>(); final int numIndexingThreads = scaledRandomIntBetween(2, 4); final int numDocsPerThread = randomIntBetween(500, 1000); @@ -2166,8 +2166,6 @@ public void testConcurrentWritesAndCommits() throws Exception { prevLocalCheckpoint = localCheckpoint; prevMaxSeqNo = maxSeqNo; } - } finally { - IOUtils.close(commits); } } From 37f38fc5b6a01101e9507e0e9229597b84f47cea Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 13 Feb 2018 13:28:57 -0500 Subject: [PATCH 7/9] Check also againts the safe commit --- .../index/engine/CombinedDeletionPolicy.java | 4 ++-- .../index/engine/CombinedDeletionPolicyTests.java | 9 ++++++--- .../index/engine/InternalEngineTests.java | 12 ++++-------- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 7dc564520fb55..f26a24c47a6f3 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -180,8 +180,8 @@ synchronized boolean releaseCommit(final IndexCommit snapshotCommit) { if (refCount == 0) { snapshottedCommits.remove(releasingCommit); } - // The commit can be clean up only if no pending snapshot and it is not the last commit. - return refCount == 0 && releasingCommit.equals(lastCommit) == false; + // The commit can be clean up only if no pending snapshot and it is neither the safe commit nor last commit. + return refCount == 0 && releasingCommit.equals(safeCommit) == false && releasingCommit.equals(lastCommit) == false; } /** diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 665f0e3327749..4588010fe9c63 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -131,12 +131,15 @@ public void testAcquireIndexCommit() throws Exception { assertThat(snapshot.getUserData(), equalTo(commitList.get(commitList.size() - 1).getUserData())); } } - randomSubsetOf(snapshottingCommits).forEach(snapshot -> { + final List releasingSnapshots = randomSubsetOf(snapshottingCommits); + for (IndexCommit snapshot : releasingSnapshots) { snapshottingCommits.remove(snapshot); final long pendingSnapshots = snapshottingCommits.stream().filter(snapshot::equals).count(); final IndexCommit lastCommit = commitList.get(commitList.size() - 1); - assertThat(indexPolicy.releaseCommit(snapshot), equalTo(pendingSnapshots == 0 && snapshot.equals(lastCommit) == false)); - }); + final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get()); + assertThat(indexPolicy.releaseCommit(snapshot), + equalTo(pendingSnapshots == 0 && snapshot.equals(lastCommit) == false && snapshot.equals(safeCommit) == false)); + } // Snapshotting commits must not be deleted. snapshottingCommits.forEach(snapshot -> assertThat(snapshot.isDeleted(), equalTo(false))); // We don't need to retain translog for snapshotting commits. diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index bc4593ddf6c95..639992d81d9df 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4461,24 +4461,20 @@ public void testCleanupCommitsWhenReleaseSnapshot() throws Exception { } } engine.flush(false, randomBoolean()); - List commits = DirectoryReader.listCommits(store.directory()); - final IndexCommit lastCommit = commits.get(commits.size() - 1); - final IndexCommit safeCommit = commits.get(0); int numSnapshots = between(1, 10); final List snapshots = new ArrayList<>(); for (int i = 0; i < numSnapshots; i++) { snapshots.add(engine.acquireIndexCommit(true, false)); // taking snapshots from the safe commit. } - // Global checkpoint advanced - clean up all commits except the last commit and the safe commit (snapshotted). globalCheckpoint.set(randomLongBetween(engine.getLocalCheckpointTracker().getCheckpoint(), Long.MAX_VALUE)); engine.syncTranslog(); - assertThat(DirectoryReader.listCommits(store.directory()), contains(safeCommit, lastCommit)); + final List commits = DirectoryReader.listCommits(store.directory()); for (int i = 0; i < numSnapshots - 1; i++) { snapshots.get(i).close(); - assertThat(DirectoryReader.listCommits(store.directory()), contains(safeCommit, lastCommit)); + assertThat(DirectoryReader.listCommits(store.directory()), equalTo(commits)); // Should not release any commit. } - snapshots.get(numSnapshots - 1).close(); // released last snapshot - delete the commit. - assertThat(DirectoryReader.listCommits(store.directory()), contains(lastCommit)); + snapshots.get(numSnapshots - 1).close(); // released last snapshot - delete all except the last commit + assertThat(DirectoryReader.listCommits(store.directory()), hasSize(1)); } } From 1b62bd45789ea276f8f0ac85555a6346253c40cc Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 13 Feb 2018 21:15:17 -0500 Subject: [PATCH 8/9] Improve comment in an engine test --- .../elasticsearch/index/engine/InternalEngineTests.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 639992d81d9df..e6c56f6701ebe 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4466,14 +4466,15 @@ public void testCleanupCommitsWhenReleaseSnapshot() throws Exception { for (int i = 0; i < numSnapshots; i++) { snapshots.add(engine.acquireIndexCommit(true, false)); // taking snapshots from the safe commit. } - globalCheckpoint.set(randomLongBetween(engine.getLocalCheckpointTracker().getCheckpoint(), Long.MAX_VALUE)); + globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); engine.syncTranslog(); final List commits = DirectoryReader.listCommits(store.directory()); for (int i = 0; i < numSnapshots - 1; i++) { snapshots.get(i).close(); - assertThat(DirectoryReader.listCommits(store.directory()), equalTo(commits)); // Should not release any commit. + // pending snapshots - should not release any commit. + assertThat(DirectoryReader.listCommits(store.directory()), equalTo(commits)); } - snapshots.get(numSnapshots - 1).close(); // released last snapshot - delete all except the last commit + snapshots.get(numSnapshots - 1).close(); // release the last snapshot - delete all except the last commit assertThat(DirectoryReader.listCommits(store.directory()), hasSize(1)); } } From e729538847beeccffe3353fdd6ec3632c8d89ab4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 19 Feb 2018 09:26:05 -0500 Subject: [PATCH 9/9] Do not duplicate release logic --- .../index/engine/InternalEngine.java | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3270fa3be96dc..bce0804f410e2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1712,23 +1712,21 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En logger.trace("finish flush for snapshot"); } final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false); - return new Engine.IndexCommitRef(lastCommit, () -> { - // Revisit the deletion policy if we can clean up the snapshotting commit. - if (combinedDeletionPolicy.releaseCommit(lastCommit)) { - indexWriter.deleteUnusedFiles(); - } - }); + return new Engine.IndexCommitRef(lastCommit, () -> releaseIndexCommit(lastCommit)); } @Override public IndexCommitRef acquireSafeIndexCommit() throws EngineException { final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true); - return new Engine.IndexCommitRef(safeCommit, () -> { - // Revisit the deletion policy if we can clean up the snapshotting commit. - if (combinedDeletionPolicy.releaseCommit(safeCommit)) { - indexWriter.deleteUnusedFiles(); - } - }); + return new Engine.IndexCommitRef(safeCommit, () -> releaseIndexCommit(safeCommit)); + } + + private void releaseIndexCommit(IndexCommit snapshot) throws IOException { + // Revisit the deletion policy if we can clean up the snapshotting commit. + if (combinedDeletionPolicy.releaseCommit(snapshot)) { + ensureOpen(); + indexWriter.deleteUnusedFiles(); + } } private boolean failOnTragicEvent(AlreadyClosedException ex) {