diff --git a/docs/reference/indices/flush.asciidoc b/docs/reference/indices/flush.asciidoc index 87b1e90a4d517..db1f7c2fe00a9 100644 --- a/docs/reference/indices/flush.asciidoc +++ b/docs/reference/indices/flush.asciidoc @@ -93,12 +93,12 @@ which returns something similar to: { "commit" : { "id" : "3M3zkw2GHMo2Y4h4/KFKCg==", - "generation" : 4, + "generation" : 3, "user_data" : { "translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA", "history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ", "local_checkpoint" : "-1", - "translog_generation" : "3", + "translog_generation" : "2", "max_seq_no" : "-1", "sync_id" : "AVvFY-071siAOuFGEO9P", <1> "max_unsafe_auto_id_timestamp" : "-1" diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 6f06c310e4cd5..d0575c8a8c977 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -47,60 +47,27 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { private final Logger logger; private final TranslogDeletionPolicy translogDeletionPolicy; private final LongSupplier globalCheckpointSupplier; - private final IndexCommit startingCommit; private final ObjectIntHashMap snapshottedCommits; // Number of snapshots held against each commit point. private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint. private volatile IndexCommit lastCommit; // the most recent commit point - CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy, - LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) { + CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) { this.logger = logger; this.translogDeletionPolicy = translogDeletionPolicy; this.globalCheckpointSupplier = globalCheckpointSupplier; - this.startingCommit = startingCommit; this.snapshottedCommits = new ObjectIntHashMap<>(); } @Override public synchronized void onInit(List commits) throws IOException { assert commits.isEmpty() == false : "index is opened, but we have no commits"; - assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; " - + "startingCommit [" + startingCommit + "], commit list [" + commits + "]"; - keepOnlyStartingCommitOnInit(commits); - updateTranslogDeletionPolicy(); - } - - /** - * Keeping existing unsafe commits when opening an engine can be problematic because these commits are not safe - * at the recovering time but they can suddenly become safe in the future. - * The following issues can happen if unsafe commits are kept oninit. - *

- * 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1(max_seqno=1) - * and an unsafe commit c2(max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document(seqno=2) - * is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use - * the unsafe commit c2(max_seqno=2 at most gcp=2) as the starting commit for sequenced-based recovery even the - * commit c2 contains a stale operation and the document(with seqno=2) will not be replicated to the replica. - *

- * 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when are replica with a safe commit - * c1(local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2(local_checkpoint=2, recovery_translog_gen=2). - * The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new - * commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery - * translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1 - * while the local checkpoint of c2 is 2. - *

- * 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced - * (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit, - * the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog. - */ - private void keepOnlyStartingCommitOnInit(List commits) throws IOException { - for (IndexCommit commit : commits) { - if (startingCommit.equals(commit) == false) { - this.deleteCommit(commit); - } + onCommit(commits); + if (safeCommit != commits.get(commits.size() - 1)) { + throw new IllegalStateException("Engine is opened, but the last commit isn't safe. Global checkpoint [" + + globalCheckpointSupplier.getAsLong() + "], seqNo is last commit [" + + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommit.getUserData().entrySet()) + "], " + + "seqNos in safe commit [" + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getUserData().entrySet()) + "]"); } - assert startingCommit.isDeleted() == false : "Starting commit must not be deleted"; - lastCommit = startingCommit; - safeCommit = startingCommit; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 864385667f5fe..24d1fc16b702d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -41,10 +41,8 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.core.internal.io.IOUtils; import org.apache.lucene.util.InfoStream; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.SuppressForbidden; @@ -59,6 +57,7 @@ import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.IdFieldMapper; @@ -70,7 +69,6 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogCorruptedException; @@ -78,8 +76,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -183,12 +179,10 @@ public InternalEngine(EngineConfig engineConfig) { translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier()); assert translog.getGeneration() != null; this.translog = translog; - final IndexCommit startingCommit = getStartingCommitPoint(); - assert startingCommit != null : "Starting commit should be non-null"; - this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit); - this.combinedDeletionPolicy = new CombinedDeletionPolicy(logger, translogDeletionPolicy, - translog::getLastSyncedGlobalCheckpoint, startingCommit); - writer = createWriter(startingCommit); + this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); + this.combinedDeletionPolicy = + new CombinedDeletionPolicy(logger, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint); + writer = createWriter(); bootstrapAppendOnlyInfoFromWriter(writer); historyUUID = loadOrGenerateHistoryUUID(writer); Objects.requireNonNull(historyUUID, "history uuid should not be null"); @@ -232,10 +226,11 @@ public InternalEngine(EngineConfig engineConfig) { } private LocalCheckpointTracker createLocalCheckpointTracker( - BiFunction localCheckpointTrackerSupplier, IndexCommit startingCommit) throws IOException { + BiFunction localCheckpointTrackerSupplier) throws IOException { final long maxSeqNo; final long localCheckpoint; - final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(startingCommit); + final SequenceNumbers.CommitInfo seqNoStats = + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(store.readLastCommittedSegmentsInfo().userData.entrySet()); maxSeqNo = seqNoStats.maxSeqNo; localCheckpoint = seqNoStats.localCheckpoint; logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint); @@ -395,31 +390,6 @@ public void skipTranslogRecovery() { pendingTranslogRecovery.set(false); // we are good - now we can commit } - private IndexCommit getStartingCommitPoint() throws IOException { - final IndexCommit startingIndexCommit; - final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint(); - final long minRetainedTranslogGen = translog.getMinFileGeneration(); - final List existingCommits = DirectoryReader.listCommits(store.directory()); - // We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog - // are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit. - // To avoid this issue, we only select index commits whose translog are fully retained. - if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) { - final List recoverableCommits = new ArrayList<>(); - for (IndexCommit commit : existingCommits) { - if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) { - recoverableCommits.add(commit); - } - } - assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " + - "commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]"; - startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint); - } else { - // TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint. - startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint); - } - return startingIndexCommit; - } - private void recoverFromTranslogInternal() throws IOException { Translog.TranslogGeneration translogGeneration = translog.getGeneration(); final int opsRecovered; @@ -1907,9 +1877,9 @@ private long loadCurrentVersionFromIndex(Term uid) throws IOException { } } - private IndexWriter createWriter(IndexCommit startingCommit) throws IOException { + private IndexWriter createWriter() throws IOException { try { - final IndexWriterConfig iwc = getIndexWriterConfig(startingCommit); + final IndexWriterConfig iwc = getIndexWriterConfig(); return createWriter(store.directory(), iwc); } catch (LockObtainFailedException ex) { logger.warn("could not lock IndexWriter", ex); @@ -1922,11 +1892,10 @@ IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOEx return new IndexWriter(directory, iwc); } - private IndexWriterConfig getIndexWriterConfig(IndexCommit startingCommit) { + private IndexWriterConfig getIndexWriterConfig() { final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer()); iwc.setCommitOnClose(false); // we by default don't commit on close iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND); - iwc.setIndexCommit(startingCommit); iwc.setIndexDeletionPolicy(combinedDeletionPolicy); // with tests.verbose, lucene sets this up: plumb to align with filesystem stream boolean verbose = false; diff --git a/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java b/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java index b30743c2cff93..0c071f4b2d422 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java @@ -122,5 +122,13 @@ public CommitInfo(long maxSeqNo, long localCheckpoint) { this.maxSeqNo = maxSeqNo; this.localCheckpoint = localCheckpoint; } + + @Override + public String toString() { + return "CommitInfo{" + + "maxSeqNo=" + maxSeqNo + + ", localCheckpoint=" + localCheckpoint + + '}'; + } } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 0ab2cc699d355..e2e8459943c26 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1317,6 +1317,9 @@ private void innerOpenEngineAndTranslog() throws IOException { assertMaxUnsafeAutoIdInCommit(); + final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID); + store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated()); + createNewEngine(config); verifyNotClosed(); // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 297790890c1b0..83fded4a1f18b 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -24,6 +24,7 @@ import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexFormatTooNewException; @@ -75,6 +76,7 @@ import org.elasticsearch.env.ShardLock; import org.elasticsearch.env.ShardLockObtainFailedException; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.CombinedDeletionPolicy; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -1463,7 +1465,7 @@ private static long estimateSize(Directory directory) throws IOException { */ public void createEmpty() throws IOException { metadataLock.writeLock().lock(); - try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.CREATE, directory)) { + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.CREATE, directory, null)) { final Map map = new HashMap<>(); map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED)); @@ -1482,7 +1484,7 @@ public void createEmpty() throws IOException { */ public void bootstrapNewHistory() throws IOException { metadataLock.writeLock().lock(); - try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) { + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) { final Map userData = getUserData(writer); final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)); final Map map = new HashMap<>(); @@ -1501,7 +1503,7 @@ public void bootstrapNewHistory() throws IOException { */ public void associateIndexWithNewTranslog(final String translogUUID) throws IOException { metadataLock.writeLock().lock(); - try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) { + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) { if (translogUUID.equals(getUserData(writer).get(Translog.TRANSLOG_UUID_KEY))) { throw new IllegalArgumentException("a new translog uuid can't be equal to existing one. got [" + translogUUID + "]"); } @@ -1520,7 +1522,7 @@ public void associateIndexWithNewTranslog(final String translogUUID) throws IOEx */ public void ensureIndexHasHistoryUUID() throws IOException { metadataLock.writeLock().lock(); - try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) { + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) { final Map userData = getUserData(writer); if (userData.containsKey(Engine.HISTORY_UUID_KEY) == false) { updateCommitData(writer, Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID())); @@ -1530,6 +1532,82 @@ public void ensureIndexHasHistoryUUID() throws IOException { } } + /** + * Keeping existing unsafe commits when opening an engine can be problematic because these commits are not safe + * at the recovering time but they can suddenly become safe in the future. + * The following issues can happen if unsafe commits are kept oninit. + *

+ * 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1(max_seqno=1) + * and an unsafe commit c2(max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document(seqno=2) + * is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use + * the unsafe commit c2(max_seqno=2 at most gcp=2) as the starting commit for sequenced-based recovery even the + * commit c2 contains a stale operation and the document(with seqno=2) will not be replicated to the replica. + *

+ * 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when are replica with a safe commit + * c1(local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2(local_checkpoint=2, recovery_translog_gen=2). + * The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new + * commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery + * translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1 + * while the local checkpoint of c2 is 2. + *

+ * 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced + * (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit, + * the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog. + */ + public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long minRetainedTranslogGen, + final org.elasticsearch.Version indexVersionCreated) throws IOException { + metadataLock.writeLock().lock(); + try { + final List existingCommits = DirectoryReader.listCommits(directory); + if (existingCommits.isEmpty()) { + throw new IllegalArgumentException("No index found to trim"); + } + final String translogUUID = existingCommits.get(existingCommits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY); + final IndexCommit startingIndexCommit; + // We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog + // are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit. + // To avoid this issue, we only select index commits whose translog are fully retained. + if (indexVersionCreated.before(org.elasticsearch.Version.V_6_2_0)) { + final List recoverableCommits = new ArrayList<>(); + for (IndexCommit commit : existingCommits) { + if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) { + recoverableCommits.add(commit); + } + } + assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " + + "commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]"; + startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint); + } else { + // TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint. + startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint); + } + + if (translogUUID.equals(startingIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY)) == false) { + throw new IllegalStateException("starting commit translog uuid [" + + startingIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY) + "] is not equal to last commit's translog uuid [" + + translogUUID + "]"); + } + if (startingIndexCommit.equals(existingCommits.get(existingCommits.size() - 1)) == false) { + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, startingIndexCommit)) { + // this achieves two things: + // - by committing a new commit based on the starting commit, it make sure the starting commit will be opened + // - deletes any other commit (by lucene standard deletion policy) + // + // note that we can't just use IndexCommit.delete() as we really want to make sure that those files won't be used + // even if a virus scanner causes the files not to be used. + + // The new commit will use segment files from the starting commit but userData from the last commit by default. + // Thus, we need to manually set the userData from the starting commit to the new commit. + writer.setLiveCommitData(startingIndexCommit.getUserData().entrySet()); + writer.commit(); + } + } + } finally { + metadataLock.writeLock().unlock(); + } + } + + private void updateCommitData(IndexWriter writer, Map keysToUpdate) throws IOException { final Map userData = getUserData(writer); userData.putAll(keysToUpdate); @@ -1543,9 +1621,12 @@ private Map getUserData(IndexWriter writer) { return userData; } - private IndexWriter newIndexWriter(IndexWriterConfig.OpenMode openMode, final Directory dir) throws IOException { + private static IndexWriter newIndexWriter(final IndexWriterConfig.OpenMode openMode, final Directory dir, final IndexCommit commit) + throws IOException { + assert openMode == IndexWriterConfig.OpenMode.APPEND || commit == null : "can't specify create flag with a commit"; IndexWriterConfig iwc = new IndexWriterConfig(null) .setCommitOnClose(false) + .setIndexCommit(commit) // we don't want merges to happen here - we call maybe merge on the engine // later once we stared it up otherwise we would need to wait for it here // we also don't specify a codec here and merges should use the engines for this index diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 62e47d08ded54..b6b6f656be44f 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -22,7 +22,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.Term; import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.UUIDs; @@ -38,6 +37,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; @@ -1705,6 +1705,11 @@ static Checkpoint readCheckpoint(final Path location) throws IOException { * @throws TranslogCorruptedException if the translog is corrupted or mismatched with the given uuid */ public static long readGlobalCheckpoint(final Path location, final String expectedTranslogUUID) throws IOException { + final Checkpoint checkpoint = readCheckpoint(location, expectedTranslogUUID); + return checkpoint.globalCheckpoint; + } + + private static Checkpoint readCheckpoint(Path location, String expectedTranslogUUID) throws IOException { final Checkpoint checkpoint = readCheckpoint(location); // We need to open at least translog reader to validate the translogUUID. final Path translogFile = location.resolve(getFilename(checkpoint.generation)); @@ -1715,7 +1720,21 @@ public static long readGlobalCheckpoint(final Path location, final String expect } catch (Exception ex) { throw new TranslogCorruptedException("Translog at [" + location + "] is corrupted", ex); } - return checkpoint.globalCheckpoint; + return checkpoint; + } + + /** + * Returns the minimum translog generation retained by the translog at the given location. + * This ensures that the translogUUID from this translog matches with the provided translogUUID. + * + * @param location the location of the translog + * @return the minimum translog generation + * @throws IOException if an I/O exception occurred reading the checkpoint + * @throws TranslogCorruptedException if the translog is corrupted or mismatched with the given uuid + */ + public static long readMinTranslogGeneration(final Path location, final String expectedTranslogUUID) throws IOException { + final Checkpoint checkpoint = readCheckpoint(location, expectedTranslogUUID); + return checkpoint.minTranslogGeneration; } /** diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 67fd385955f3e..ea7de50b7b34c 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -52,7 +52,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get); final LongArrayList maxSeqNoList = new LongArrayList(); final LongArrayList translogGenList = new LongArrayList(); @@ -91,7 +91,7 @@ public void testAcquireIndexCommit() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(); final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get); long lastMaxSeqNo = between(1, 1000); long lastTranslogGen = between(1, 20); int safeIndex = 0; @@ -161,7 +161,7 @@ public void testLegacyIndex() throws Exception { final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get); long legacyTranslogGen = randomNonNegativeLong(); IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen); @@ -194,7 +194,7 @@ public void testLegacyIndex() throws Exception { public void testDeleteInvalidCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get); final int invalidCommits = between(1, 10); final List commitList = new ArrayList<>(); @@ -217,39 +217,11 @@ public void testDeleteInvalidCommits() throws Exception { } } - /** - * Keeping existing unsafe commits can be problematic because these commits are not safe at the recovering time - * but they can suddenly become safe in the future. See {@link CombinedDeletionPolicy#keepOnlyStartingCommitOnInit(List)} - */ - public void testKeepOnlyStartingCommitOnInit() throws Exception { - final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); - TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - final UUID translogUUID = UUID.randomUUID(); - final List commitList = new ArrayList<>(); - int totalCommits = between(2, 20); - for (int i = 0; i < totalCommits; i++) { - commitList.add(mockIndexCommit(randomNonNegativeLong(), translogUUID, randomNonNegativeLong())); - } - final IndexCommit startingCommit = randomFrom(commitList); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, startingCommit); - indexPolicy.onInit(commitList); - for (IndexCommit commit : commitList) { - if (commit.equals(startingCommit) == false) { - verify(commit, times(1)).delete(); - } - } - verify(startingCommit, never()).delete(); - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), - equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), - equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); - } - public void testCheckUnreferencedCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); final UUID translogUUID = UUID.randomUUID(); final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get); final List commitList = new ArrayList<>(); int totalCommits = between(2, 20); long lastMaxSeqNo = between(1, 1000); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index bba05401d4155..3a7fd94f61905 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -763,6 +763,7 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException { } } initialEngine.close(); + trimUnsafeCommits(initialEngine.config()); recoveringEngine = new InternalEngine(initialEngine.config()); recoveringEngine.recoverFromTranslog(); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { @@ -1168,6 +1169,7 @@ public void testSyncedFlushVanishesOnReplay() throws IOException { engine.index(indexForDoc(doc)); EngineConfig config = engine.config(); engine.close(); + trimUnsafeCommits(config); engine = new InternalEngine(config); engine.recoverFromTranslog(); assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); @@ -3581,7 +3583,7 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro } finally { IOUtils.close(initialEngine); } - + trimUnsafeCommits(initialEngine.config()); try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) { recoveringEngine.recoverFromTranslog(); recoveringEngine.fillSeqNoGaps(2); @@ -3933,6 +3935,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { // now do it again to make sure we preserve values etc. try { + trimUnsafeCommits(replicaEngine.config()); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); if (flushed) { assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); @@ -4256,31 +4259,6 @@ public void testAcquireIndexCommit() throws Exception { } } - public void testOpenIndexAndTranslogKeepOnlySafeCommit() throws Exception { - IOUtils.close(engine); - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - final EngineConfig config = copy(engine.config(), globalCheckpoint::get); - final IndexCommit safeCommit; - try (InternalEngine engine = createEngine(config)) { - final int numDocs = between(5, 50); - for (int i = 0; i < numDocs; i++) { - index(engine, i); - if (randomBoolean()) { - engine.flush(); - } - } - // Selects a starting commit and advances and persists the global checkpoint to that commit. - final List commits = DirectoryReader.listCommits(engine.store.directory()); - safeCommit = randomFrom(commits); - globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO))); - engine.getTranslog().sync(); - } - try (InternalEngine engine = new InternalEngine(config)) { - final List existingCommits = DirectoryReader.listCommits(engine.store.directory()); - assertThat("safe commit should be kept", existingCommits, contains(safeCommit)); - } - } - public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception { IOUtils.close(engine, store); store = createStore(); @@ -4615,4 +4593,64 @@ public void testSkipOptimizeForExposedAppendOnlyOperations() throws Exception { false, randomNonNegativeLong(), localCheckpointTracker.generateSeqNo())); assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); } + + public void testTrimUnsafeCommits() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final int maxSeqNo = 40; + final List seqNos = LongStream.rangeClosed(0, maxSeqNo).boxed().collect(Collectors.toList()); + Collections.shuffle(seqNos, random()); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + final List commitMaxSeqNo = new ArrayList<>(); + final long minTranslogGen; + try (InternalEngine engine = createEngine(config)) { + for (int i = 0; i < seqNos.size(); i++) { + ParsedDocument doc = testParsedDocument(Long.toString(seqNos.get(i)), null, testDocument(), new BytesArray("{}"), null); + Engine.Index index = new Engine.Index(newUid(doc), doc, seqNos.get(i), 0, + 1, VersionType.EXTERNAL, REPLICA, System.nanoTime(), -1, false); + engine.index(index); + if (randomBoolean()) { + engine.flush(); + final Long maxSeqNoInCommit = seqNos.subList(0, i + 1).stream().max(Long::compareTo).orElse(-1L); + commitMaxSeqNo.add(maxSeqNoInCommit); + } + } + globalCheckpoint.set(randomInt(maxSeqNo)); + engine.syncTranslog(); + minTranslogGen = engine.getTranslog().getMinFileGeneration(); + } + + store.trimUnsafeCommits(globalCheckpoint.get(), minTranslogGen,config.getIndexSettings().getIndexVersionCreated()); + long safeMaxSeqNo = + commitMaxSeqNo.stream().filter(s -> s <= globalCheckpoint.get()) + .reduce((s1, s2) -> s2) // get the last one. + .orElse(SequenceNumbers.NO_OPS_PERFORMED); + final List commits = DirectoryReader.listCommits(store.directory()); + assertThat(commits, hasSize(1)); + assertThat(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO), equalTo(Long.toString(safeMaxSeqNo))); + try (IndexReader reader = DirectoryReader.open(commits.get(0))) { + for (LeafReaderContext context: reader.leaves()) { + final NumericDocValues values = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + if (values != null) { + for (int docID = 0; docID < context.reader().maxDoc(); docID++) { + if (values.advanceExact(docID) == false) { + throw new AssertionError("Document does not have a seq number: " + docID); + } + assertThat(values.longValue(), lessThanOrEqualTo(globalCheckpoint.get())); + } + } + } + } + } + } + + private static void trimUnsafeCommits(EngineConfig config) throws IOException { + final Store store = config.getStore(); + final TranslogConfig translogConfig = config.getTranslogConfig(); + final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); + final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); + final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID); + store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated()); + } + }