Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,10 @@ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {

/**
* Releases an index commit that acquired by {@link #acquireIndexCommit(boolean)}.
*
* @return true if the snapshotting commit can be clean up.
*/
synchronized void releaseCommit(final IndexCommit snapshotCommit) {
synchronized boolean releaseCommit(final IndexCommit snapshotCommit) {
final IndexCommit releasingCommit = ((SnapshotIndexCommit) snapshotCommit).delegate;
assert snapshottedCommits.containsKey(releasingCommit) : "Release non-snapshotted commit;" +
"snapshotted commits [" + snapshottedCommits + "], releasing commit [" + releasingCommit + "]";
Expand All @@ -178,6 +180,8 @@ synchronized void releaseCommit(final IndexCommit snapshotCommit) {
if (refCount == 0) {
snapshottedCommits.remove(releasingCommit);
}
// The commit can be clean up only if no pending snapshot and it is neither the safe commit nor last commit.
return refCount == 0 && releasingCommit.equals(safeCommit) == false && releasingCommit.equals(lastCommit) == false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1712,13 +1712,21 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En
logger.trace("finish flush for snapshot");
}
final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false);
return new Engine.IndexCommitRef(lastCommit, () -> combinedDeletionPolicy.releaseCommit(lastCommit));
return new Engine.IndexCommitRef(lastCommit, () -> releaseIndexCommit(lastCommit));
}

@Override
public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true);
return new Engine.IndexCommitRef(safeCommit, () -> combinedDeletionPolicy.releaseCommit(safeCommit));
return new Engine.IndexCommitRef(safeCommit, () -> releaseIndexCommit(safeCommit));
}

private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
// Revisit the deletion policy if we can clean up the snapshotting commit.
if (combinedDeletionPolicy.releaseCommit(snapshot)) {
ensureOpen();
indexWriter.deleteUnusedFiles();
}
}

private boolean failOnTragicEvent(AlreadyClosedException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,15 @@ public void testAcquireIndexCommit() throws Exception {
assertThat(snapshot.getUserData(), equalTo(commitList.get(commitList.size() - 1).getUserData()));
}
}
randomSubsetOf(snapshottingCommits).forEach(snapshot -> {
final List<IndexCommit> releasingSnapshots = randomSubsetOf(snapshottingCommits);
for (IndexCommit snapshot : releasingSnapshots) {
snapshottingCommits.remove(snapshot);
indexPolicy.releaseCommit(snapshot);
});
final long pendingSnapshots = snapshottingCommits.stream().filter(snapshot::equals).count();
final IndexCommit lastCommit = commitList.get(commitList.size() - 1);
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get());
assertThat(indexPolicy.releaseCommit(snapshot),
equalTo(pendingSnapshots == 0 && snapshot.equals(lastCommit) == false && snapshot.equals(safeCommit) == false));
}
// Snapshotting commits must not be deleted.
snapshottingCommits.forEach(snapshot -> assertThat(snapshot.isDeleted(), equalTo(false)));
// We don't need to retain translog for snapshotting commits.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2079,9 +2079,9 @@ public void testSeqNoAndCheckpoints() throws IOException {
// this test writes documents to the engine while concurrently flushing/commit
// and ensuring that the commit points contain the correct sequence number data
public void testConcurrentWritesAndCommits() throws Exception {
List<Engine.IndexCommitRef> commits = new ArrayList<>();
try (Store store = createStore();
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) {
final List<Engine.IndexCommitRef> commits = new ArrayList<>();

final int numIndexingThreads = scaledRandomIntBetween(2, 4);
final int numDocsPerThread = randomIntBetween(500, 1000);
Expand Down Expand Up @@ -2166,8 +2166,6 @@ public void testConcurrentWritesAndCommits() throws Exception {
prevLocalCheckpoint = localCheckpoint;
prevMaxSeqNo = maxSeqNo;
}
} finally {
IOUtils.close(commits);
}
}

Expand Down Expand Up @@ -4456,6 +4454,37 @@ public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
}
}

public void testCleanupCommitsWhenReleaseSnapshot() throws Exception {
IOUtils.close(engine, store);
store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
final int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
index(engine, docId);
if (frequently()) {
engine.flush(randomBoolean(), randomBoolean());
}
}
engine.flush(false, randomBoolean());
int numSnapshots = between(1, 10);
final List<Engine.IndexCommitRef> snapshots = new ArrayList<>();
for (int i = 0; i < numSnapshots; i++) {
snapshots.add(engine.acquireSafeIndexCommit()); // taking snapshots from the safe commit.
}
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
engine.syncTranslog();
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
for (int i = 0; i < numSnapshots - 1; i++) {
snapshots.get(i).close();
// pending snapshots - should not release any commit.
assertThat(DirectoryReader.listCommits(store.directory()), equalTo(commits));
}
snapshots.get(numSnapshots - 1).close(); // release the last snapshot - delete all except the last commit
assertThat(DirectoryReader.listCommits(store.directory()), hasSize(1));
}
}

public void testShouldPeriodicallyFlush() throws Exception {
assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false));
int numDocs = between(10, 100);
Expand Down