Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ private void maybeFSyncTranslogs() {
try {
Translog translog = shard.getTranslog();
if (translog.syncNeeded()) {
translog.sync();
shard.sync();
}
} catch (AlreadyClosedException ex) {
// fine - continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final LongSupplier globalCheckpointSupplier;
private final IndexCommit startingCommit;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private IndexCommit lastCommit; // the most recent commit point
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private volatile IndexCommit lastCommit; // the most recent commit point

CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) {
Expand Down Expand Up @@ -214,6 +214,21 @@ private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long
return 0;
}

/**
* Checks if the deletion policy can release some index commits with the latest global checkpoint.
*/
boolean hasUnreferencedCommits() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just an idea for a follow up, shall we extend this method to look at the fact that a snapshotted commit was released (via a special flag we set when a snapshot count reaches 0)? we need to figure where to call it, but I think might be worth exploring.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes, I am thinking to always revisit the index deletion policy (without checking the unreferenced condition) when releasing a snapshotted commit in an Engine (InternalEngine#acquireIndexCommit). We don't acquire index commits too frequently and revisiting the policy should not be expensive. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dnhatn I think I wasn't clear. The idea was to add a "pendingSnapshots" flag that is set when a snapshot reference goes to 0 and is cleared onCommit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bleskes. I got the idea.

final IndexCommit lastCommit = this.lastCommit;
if (safeCommit != lastCommit) { // Race condition can happen but harmless
if (lastCommit.getUserData().containsKey(SequenceNumbers.MAX_SEQ_NO)) {
final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO));
// We can clean up the current safe commit if the last commit is safe
return globalCheckpointSupplier.getAsLong() >= maxSeqNoFromLastCommit;
}
}
return false;
}

/**
* A wrapper of an index commit that prevents it from being deleted.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.stream.Stream;

public abstract class Engine implements Closeable {

Expand Down Expand Up @@ -549,6 +550,13 @@ public enum SearcherScope {
/** returns the translog for this engine */
public abstract Translog getTranslog();

/**
* Ensures that all locations in the given stream have been written to the underlying storage.
*/
public abstract boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException;

public abstract void syncTranslog() throws IOException;

protected void ensureOpen() {
if (isClosed.get()) {
throw new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
Expand Down Expand Up @@ -94,6 +93,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import java.util.stream.Stream;

public class InternalEngine extends Engine {

Expand Down Expand Up @@ -520,6 +520,27 @@ public Translog getTranslog() {
return translog;
}

@Override
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException {
final boolean synced = translog.ensureSynced(locations);
if (synced) {
revisitIndexDeletionPolicyOnTranslogSynced();
}
return synced;
}

@Override
public void syncTranslog() throws IOException {
translog.sync();
revisitIndexDeletionPolicyOnTranslogSynced();
}

private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
indexWriter.deleteUnusedFiles();
}
}

@Override
public String getHistoryUUID() {
return historyUUID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,9 @@ protected ReplicaResult shardOperationOnReplica(final Request request, final Ind
}

private void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
final Translog translog = indexShard.getTranslog();
if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST &&
translog.getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
indexShard.getTranslog().sync();
indexShard.getTranslog().getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
indexShard.sync();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2316,8 +2316,7 @@ public int getActiveOperationsCount() {
@Override
protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candidates) throws IOException {
try {
final Engine engine = getEngine();
engine.getTranslog().ensureSynced(candidates.stream().map(Tuple::v1));
getEngine().ensureTranslogSynced(candidates.stream().map(Tuple::v1));
} catch (AlreadyClosedException ex) {
// that's fine since we already synced everything on engine close - this also is conform with the methods
// documentation
Expand All @@ -2342,9 +2341,9 @@ public final void sync(Translog.Location location, Consumer<Exception> syncListe
translogSyncProcessor.put(location, syncListener);
}

public final void sync() throws IOException {
public void sync() throws IOException {
verifyNotClosed();
getEngine().getTranslog().sync();
getEngine().syncTranslog();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,44 @@ public void testKeepOnlyStartingCommitOnInit() throws Exception {
equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
}

public void testCheckUnreferencedCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final UUID translogUUID = UUID.randomUUID();
final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
final List<IndexCommit> commitList = new ArrayList<>();
int totalCommits = between(2, 20);
long lastMaxSeqNo = between(1, 1000);
long lastTranslogGen = between(1, 50);
for (int i = 0; i < totalCommits; i++) {
lastMaxSeqNo += between(1, 10000);
lastTranslogGen += between(1, 100);
commitList.add(mockIndexCommit(lastMaxSeqNo, translogUUID, lastTranslogGen));
}
IndexCommit safeCommit = randomFrom(commitList);
globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)));
indexPolicy.onCommit(commitList);
if (safeCommit == commitList.get(commitList.size() - 1)) {
// Safe commit is the last commit - no need to clean up
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
} else {
// Advanced but not enough
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), lastMaxSeqNo - 1));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
// Advanced enough
globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(true));
indexPolicy.onCommit(commitList);
// Safe commit is the last commit - no need to clean up
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
}
}

IndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen) throws IOException {
final Map<String, String> userData = new HashMap<>();
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4414,4 +4414,29 @@ public void testOpenIndexCreateTranslogKeepOnlyLastCommit() throws Exception {
assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo("1"));
}
}

public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
IOUtils.close(engine, store);
store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
final int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
index(engine, docId);
if (frequently()) {
engine.flush(randomBoolean(), randomBoolean());
}
}
engine.flush(false, randomBoolean());
List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
// Global checkpoint advanced but not enough - all commits are kept.
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getCheckpoint() - 1));
engine.syncTranslog();
assertThat(DirectoryReader.listCommits(store.directory()), equalTo(commits));
// Global checkpoint advanced enough - only the last commit is kept.
globalCheckpoint.set(randomLongBetween(engine.getLocalCheckpointTracker().getCheckpoint(), Long.MAX_VALUE));
engine.syncTranslog();
assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ protected PrimaryResult performOnPrimary(

@Override
protected void performOnReplica(final GlobalCheckpointSyncAction.Request request, final IndexShard replica) throws IOException {
replica.getTranslog().sync();
replica.sync();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception {
}

if (durability == Translog.Durability.ASYNC || lastSyncedGlobalCheckpoint == globalCheckpoint) {
verify(translog, never()).sync();
verify(indexShard, never()).sync();
} else {
verify(translog).sync();
verify(indexShard).sync();
}
}

Expand Down