From 7319310ee8cdadd63093e42d33fb3b926f740942 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 17 Mar 2018 23:50:27 +0100 Subject: [PATCH 1/8] no starting commit for you --- .../index/engine/CombinedDeletionPolicy.java | 47 ++-------- .../index/engine/EngineDiskUtils.java | 89 +++++++++++++++++-- .../index/engine/InternalEngine.java | 52 +++-------- .../index/seqno/SequenceNumbers.java | 8 ++ .../elasticsearch/index/shard/IndexShard.java | 7 +- .../index/translog/Translog.java | 23 ++++- .../engine/CombinedDeletionPolicyTests.java | 38 ++------ .../index/engine/EngineDiskUtilsTests.java | 64 ++++++++++++- .../index/engine/InternalEngineTests.java | 37 +++----- 9 files changed, 214 insertions(+), 151 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 6f06c310e4cd5..d0575c8a8c977 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -47,60 +47,27 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { private final Logger logger; private final TranslogDeletionPolicy translogDeletionPolicy; private final LongSupplier globalCheckpointSupplier; - private final IndexCommit startingCommit; private final ObjectIntHashMap snapshottedCommits; // Number of snapshots held against each commit point. private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint. private volatile IndexCommit lastCommit; // the most recent commit point - CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy, - LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) { + CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) { this.logger = logger; this.translogDeletionPolicy = translogDeletionPolicy; this.globalCheckpointSupplier = globalCheckpointSupplier; - this.startingCommit = startingCommit; this.snapshottedCommits = new ObjectIntHashMap<>(); } @Override public synchronized void onInit(List commits) throws IOException { assert commits.isEmpty() == false : "index is opened, but we have no commits"; - assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; " - + "startingCommit [" + startingCommit + "], commit list [" + commits + "]"; - keepOnlyStartingCommitOnInit(commits); - updateTranslogDeletionPolicy(); - } - - /** - * Keeping existing unsafe commits when opening an engine can be problematic because these commits are not safe - * at the recovering time but they can suddenly become safe in the future. - * The following issues can happen if unsafe commits are kept oninit. - *

- * 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1(max_seqno=1) - * and an unsafe commit c2(max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document(seqno=2) - * is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use - * the unsafe commit c2(max_seqno=2 at most gcp=2) as the starting commit for sequenced-based recovery even the - * commit c2 contains a stale operation and the document(with seqno=2) will not be replicated to the replica. - *

- * 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when are replica with a safe commit - * c1(local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2(local_checkpoint=2, recovery_translog_gen=2). - * The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new - * commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery - * translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1 - * while the local checkpoint of c2 is 2. - *

- * 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced - * (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit, - * the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog. - */ - private void keepOnlyStartingCommitOnInit(List commits) throws IOException { - for (IndexCommit commit : commits) { - if (startingCommit.equals(commit) == false) { - this.deleteCommit(commit); - } + onCommit(commits); + if (safeCommit != commits.get(commits.size() - 1)) { + throw new IllegalStateException("Engine is opened, but the last commit isn't safe. Global checkpoint [" + + globalCheckpointSupplier.getAsLong() + "], seqNo is last commit [" + + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommit.getUserData().entrySet()) + "], " + + "seqNos in safe commit [" + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getUserData().entrySet()) + "]"); } - assert startingCommit.isDeleted() == false : "Starting commit must not be deleted"; - lastCommit = startingCommit; - safeCommit = startingCommit; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineDiskUtils.java b/server/src/main/java/org/elasticsearch/index/engine/EngineDiskUtils.java index f7f3aa8e9fe1d..acb6336dd662d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineDiskUtils.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineDiskUtils.java @@ -26,6 +26,7 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.store.Directory; import org.elasticsearch.Assertions; +import org.elasticsearch.Version; import org.elasticsearch.common.UUIDs; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; @@ -34,6 +35,7 @@ import java.io.IOException; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -49,7 +51,7 @@ public abstract class EngineDiskUtils { * creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted. */ public static void createEmpty(final Directory dir, final Path translogPath, final ShardId shardId) throws IOException { - try (IndexWriter writer = newIndexWriter(true, dir)) { + try (IndexWriter writer = newIndexWriter(true, dir, null)) { final String translogUuid = Translog.createEmptyTranslog(translogPath, SequenceNumbers.NO_OPS_PERFORMED, shardId); final Map map = new HashMap<>(); map.put(Translog.TRANSLOG_GENERATION_KEY, "1"); @@ -62,6 +64,78 @@ public static void createEmpty(final Directory dir, final Path translogPath, fin } } + /** + * Keeping existing unsafe commits when opening an engine can be problematic because these commits are not safe + * at the recovering time but they can suddenly become safe in the future. + * The following issues can happen if unsafe commits are kept oninit. + *

+ * 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1(max_seqno=1) + * and an unsafe commit c2(max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document(seqno=2) + * is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use + * the unsafe commit c2(max_seqno=2 at most gcp=2) as the starting commit for sequenced-based recovery even the + * commit c2 contains a stale operation and the document(with seqno=2) will not be replicated to the replica. + *

+ * 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when are replica with a safe commit + * c1(local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2(local_checkpoint=2, recovery_translog_gen=2). + * The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new + * commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery + * translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1 + * while the local checkpoint of c2 is 2. + *

+ * 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced + * (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit, + * the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog. + */ + public static void trimUnsafeCommits(final Directory dir, final Path translogPath, + final Version indexVersionCreated) throws IOException { + final List existingCommits = DirectoryReader.listCommits(dir); + if (existingCommits.size() == 0) { + throw new IllegalArgumentException("No index found to trim"); + } + final String translogUUID = existingCommits.get(existingCommits.size()-1).getUserData().get(Translog.TRANSLOG_UUID_KEY); + final IndexCommit startingIndexCommit; + final long lastSyncedGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID); + final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUUID); + // We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog + // are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit. + // To avoid this issue, we only select index commits whose translog are fully retained. + if (indexVersionCreated.before(Version.V_6_2_0)) { + final List recoverableCommits = new ArrayList<>(); + for (IndexCommit commit : existingCommits) { + if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) { + recoverableCommits.add(commit); + } + } + assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " + + "commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]"; + startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint); + } else { + // TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint. + startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint); + } + + if (translogUUID.equals(startingIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY)) == false) { + throw new IllegalStateException("starting commit translog uuid [" + + startingIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY) + "] is not equal to last commit's translog uuid [" + + translogUUID + "]"); + } + if (startingIndexCommit.equals(existingCommits.get(existingCommits.size() - 1)) == false) { + try (IndexWriter writer = newIndexWriter(false, dir, startingIndexCommit)) { + // this achieves two things: + // - by committing a new commit based on the starting commit, it make sure the starting commit will be opened + // - deletes any other commit (by lucene standard deletion policy) + // + // note that we can't just use IndexCommit.delete() as we really want to make sure that those files won't be used + // even if a virus scanner causes the files not to be used. + + // TODO: speak to @s1monw about the fact that we can' use getUserData(writer) as that uses that last's commit user + // data rather then the starting commit. + writer.setLiveCommitData(startingIndexCommit.getUserData().entrySet()); + writer.commit(); + } + } + } + /** * Converts an existing lucene index and marks it with a new history uuid. Also creates a new empty translog file. @@ -69,7 +143,8 @@ public static void createEmpty(final Directory dir, final Path translogPath, fin */ public static void bootstrapNewHistoryFromLuceneIndex(final Directory dir, final Path translogPath, final ShardId shardId) throws IOException { - try (IndexWriter writer = newIndexWriter(false, dir)) { + assert DirectoryReader.listCommits(dir).size() == 1 : "bootstrapping a new history from an index with multiple commits"; + try (IndexWriter writer = newIndexWriter(false, dir, null)) { final Map userData = getUserData(writer); final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)); final String translogUuid = Translog.createEmptyTranslog(translogPath, maxSeqNo, shardId); @@ -96,7 +171,7 @@ public static void createNewTranslog(final Directory dir, final Path translogPat + initialGlobalCheckpoint + "]"; } - try (IndexWriter writer = newIndexWriter(false, dir)) { + try (IndexWriter writer = newIndexWriter(false, dir, null)) { final String translogUuid = Translog.createEmptyTranslog(translogPath, initialGlobalCheckpoint, shardId); final Map map = new HashMap<>(); map.put(Translog.TRANSLOG_GENERATION_KEY, "1"); @@ -110,7 +185,8 @@ public static void createNewTranslog(final Directory dir, final Path translogPat * Checks that the Lucene index contains a history uuid marker. If not, a new one is generated and committed. */ public static void ensureIndexHasHistoryUUID(final Directory dir) throws IOException { - try (IndexWriter writer = newIndexWriter(false, dir)) { + assert DirectoryReader.listCommits(dir).size() == 1 : "can't ensure index history with multiple commits"; + try (IndexWriter writer = newIndexWriter(false, dir, null)) { final Map userData = getUserData(writer); if (userData.containsKey(Engine.HISTORY_UUID_KEY) == false) { updateCommitData(writer, Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID())); @@ -131,14 +207,17 @@ private static Map getUserData(IndexWriter writer) { return userData; } - private static IndexWriter newIndexWriter(final boolean create, final Directory dir) throws IOException { + private static IndexWriter newIndexWriter(final boolean create, final Directory dir, final IndexCommit commit) throws IOException { + assert create == false || commit == null : "can't specify create flag with a commit"; IndexWriterConfig iwc = new IndexWriterConfig(null) .setCommitOnClose(false) + .setIndexCommit(commit) // we don't want merges to happen here - we call maybe merge on the engine // later once we stared it up otherwise we would need to wait for it here // we also don't specify a codec here and merges should use the engines for this index .setMergePolicy(NoMergePolicy.INSTANCE) .setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND); + return new IndexWriter(dir, iwc); } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 49be68efcad5d..89f8609aab2f5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -42,10 +42,8 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.core.internal.io.IOUtils; import org.apache.lucene.util.InfoStream; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.SuppressForbidden; @@ -60,6 +58,7 @@ import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.IdFieldMapper; @@ -71,7 +70,6 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogCorruptedException; @@ -79,7 +77,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -180,12 +177,10 @@ public InternalEngine(EngineConfig engineConfig) { translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier()); assert translog.getGeneration() != null; this.translog = translog; - final IndexCommit startingCommit = getStartingCommitPoint(); - assert startingCommit != null : "Starting commit should be non-null"; - this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit); - this.combinedDeletionPolicy = new CombinedDeletionPolicy(logger, translogDeletionPolicy, - translog::getLastSyncedGlobalCheckpoint, startingCommit); - writer = createWriter(startingCommit); + this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); + this.combinedDeletionPolicy = + new CombinedDeletionPolicy(logger, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint); + writer = createWriter(); updateMaxUnsafeAutoIdTimestampFromWriter(writer); historyUUID = loadOrGenerateHistoryUUID(writer); Objects.requireNonNull(historyUUID, "history uuid should not be null"); @@ -229,10 +224,11 @@ public InternalEngine(EngineConfig engineConfig) { } private LocalCheckpointTracker createLocalCheckpointTracker( - BiFunction localCheckpointTrackerSupplier, IndexCommit startingCommit) throws IOException { + BiFunction localCheckpointTrackerSupplier) throws IOException { final long maxSeqNo; final long localCheckpoint; - final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(startingCommit); + final SequenceNumbers.CommitInfo seqNoStats = + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(store.readLastCommittedSegmentsInfo().userData.entrySet()); maxSeqNo = seqNoStats.maxSeqNo; localCheckpoint = seqNoStats.localCheckpoint; logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint); @@ -387,31 +383,6 @@ public void skipTranslogRecovery() { pendingTranslogRecovery.set(false); // we are good - now we can commit } - private IndexCommit getStartingCommitPoint() throws IOException { - final IndexCommit startingIndexCommit; - final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint(); - final long minRetainedTranslogGen = translog.getMinFileGeneration(); - final List existingCommits = DirectoryReader.listCommits(store.directory()); - // We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog - // are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit. - // To avoid this issue, we only select index commits whose translog are fully retained. - if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) { - final List recoverableCommits = new ArrayList<>(); - for (IndexCommit commit : existingCommits) { - if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) { - recoverableCommits.add(commit); - } - } - assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " + - "commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]"; - startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint); - } else { - // TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint. - startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint); - } - return startingIndexCommit; - } - private void recoverFromTranslogInternal() throws IOException { Translog.TranslogGeneration translogGeneration = translog.getGeneration(); final int opsRecovered; @@ -1810,9 +1781,9 @@ private long loadCurrentVersionFromIndex(Term uid) throws IOException { } } - private IndexWriter createWriter(IndexCommit startingCommit) throws IOException { + private IndexWriter createWriter() throws IOException { try { - final IndexWriterConfig iwc = getIndexWriterConfig(startingCommit); + final IndexWriterConfig iwc = getIndexWriterConfig(); return createWriter(store.directory(), iwc); } catch (LockObtainFailedException ex) { logger.warn("could not lock IndexWriter", ex); @@ -1825,11 +1796,10 @@ IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOEx return new IndexWriter(directory, iwc); } - private IndexWriterConfig getIndexWriterConfig(IndexCommit startingCommit) { + private IndexWriterConfig getIndexWriterConfig() { final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer()); iwc.setCommitOnClose(false); // we by default don't commit on close iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND); - iwc.setIndexCommit(startingCommit); iwc.setIndexDeletionPolicy(combinedDeletionPolicy); // with tests.verbose, lucene sets this up: plumb to align with filesystem stream boolean verbose = false; diff --git a/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java b/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java index b30743c2cff93..0c071f4b2d422 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java @@ -122,5 +122,13 @@ public CommitInfo(long maxSeqNo, long localCheckpoint) { this.maxSeqNo = maxSeqNo; this.localCheckpoint = localCheckpoint; } + + @Override + public String toString() { + return "CommitInfo{" + + "maxSeqNo=" + maxSeqNo + + ", localCheckpoint=" + localCheckpoint + + '}'; + } } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 13708add48124..bcf4bd9a943e8 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -23,7 +23,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CheckIndex; -import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.LeafReaderContext; @@ -37,7 +36,6 @@ import org.apache.lucene.search.UsageTrackingQueryCachingPolicy; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.core.internal.io.IOUtils; import org.apache.lucene.util.ThreadInterruptedException; import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchException; @@ -67,6 +65,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AsyncIOProcessor; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexNotFoundException; @@ -80,6 +79,7 @@ import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.EngineDiskUtils; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngine; @@ -1330,6 +1330,9 @@ private void innerOpenEngineAndTranslog() throws IOException { assertMaxUnsafeAutoIdInCommit(); + EngineDiskUtils.trimUnsafeCommits(store.directory(), translogConfig.getTranslogPath(), + config.getIndexSettings().getIndexVersionCreated()); + createNewEngine(config); verifyNotClosed(); // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 6a32ae14fdd3a..0d7c3bbd4da8e 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -23,7 +23,6 @@ import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.index.Term; import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.UUIDs; @@ -39,6 +38,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; @@ -1703,6 +1703,11 @@ static Checkpoint readCheckpoint(final Path location) throws IOException { * @throws TranslogCorruptedException if the translog is corrupted or mismatched with the given uuid */ public static long readGlobalCheckpoint(final Path location, final String expectedTranslogUUID) throws IOException { + final Checkpoint checkpoint = readCheckpoint(location, expectedTranslogUUID); + return checkpoint.globalCheckpoint; + } + + private static Checkpoint readCheckpoint(Path location, String expectedTranslogUUID) throws IOException { final Checkpoint checkpoint = readCheckpoint(location); // We need to open at least translog reader to validate the translogUUID. final Path translogFile = location.resolve(getFilename(checkpoint.generation)); @@ -1713,7 +1718,21 @@ public static long readGlobalCheckpoint(final Path location, final String expect } catch (Exception ex) { throw new TranslogCorruptedException("Translog at [" + location + "] is corrupted", ex); } - return checkpoint.globalCheckpoint; + return checkpoint; + } + + /** + * Returns the minimum translog generation retained by the translog at the given location. + * This ensures that the translogUUID from this translog matches with the provided translogUUID. + * + * @param location the location of the translog + * @return the minimum translog generation + * @throws IOException if an I/O exception occurred reading the checkpoint + * @throws TranslogCorruptedException if the translog is corrupted or mismatched with the given uuid + */ + public static long readMinTranslogGeneration(final Path location, final String expectedTranslogUUID) throws IOException { + final Checkpoint checkpoint = readCheckpoint(location, expectedTranslogUUID); + return checkpoint.minTranslogGeneration; } /** diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 67fd385955f3e..ea7de50b7b34c 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -52,7 +52,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get); final LongArrayList maxSeqNoList = new LongArrayList(); final LongArrayList translogGenList = new LongArrayList(); @@ -91,7 +91,7 @@ public void testAcquireIndexCommit() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(); final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get); long lastMaxSeqNo = between(1, 1000); long lastTranslogGen = between(1, 20); int safeIndex = 0; @@ -161,7 +161,7 @@ public void testLegacyIndex() throws Exception { final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get); long legacyTranslogGen = randomNonNegativeLong(); IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen); @@ -194,7 +194,7 @@ public void testLegacyIndex() throws Exception { public void testDeleteInvalidCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get); final int invalidCommits = between(1, 10); final List commitList = new ArrayList<>(); @@ -217,39 +217,11 @@ public void testDeleteInvalidCommits() throws Exception { } } - /** - * Keeping existing unsafe commits can be problematic because these commits are not safe at the recovering time - * but they can suddenly become safe in the future. See {@link CombinedDeletionPolicy#keepOnlyStartingCommitOnInit(List)} - */ - public void testKeepOnlyStartingCommitOnInit() throws Exception { - final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); - TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - final UUID translogUUID = UUID.randomUUID(); - final List commitList = new ArrayList<>(); - int totalCommits = between(2, 20); - for (int i = 0; i < totalCommits; i++) { - commitList.add(mockIndexCommit(randomNonNegativeLong(), translogUUID, randomNonNegativeLong())); - } - final IndexCommit startingCommit = randomFrom(commitList); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, startingCommit); - indexPolicy.onInit(commitList); - for (IndexCommit commit : commitList) { - if (commit.equals(startingCommit) == false) { - verify(commit, times(1)).delete(); - } - } - verify(startingCommit, never()).delete(); - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), - equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), - equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); - } - public void testCheckUnreferencedCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); final UUID translogUUID = UUID.randomUUID(); final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get); final List commitList = new ArrayList<>(); int totalCommits = between(2, 20); long lastMaxSeqNo = between(1, 1000); diff --git a/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java b/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java index c57af9b448671..2702f6b968434 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java @@ -19,9 +19,14 @@ package org.elasticsearch.index.engine; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.search.IndexSearcher; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -33,6 +38,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -40,18 +46,25 @@ import org.elasticsearch.test.IndexSettingsModule; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.LongStream; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; +import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; public class EngineDiskUtilsTests extends EngineTestCase { - public void testHistoryUUIDIsSetIfMissing() throws IOException { final int numDocs = randomIntBetween(0, 3); for (int i = 0; i < numDocs; i++) { @@ -204,4 +217,53 @@ public void testHistoryUUIDCanBeForced() throws IOException { assertVisibleCount(engine, 0, false); assertThat(engine.getHistoryUUID(), not(equalTo(oldHistoryUUID))); } + + public void testTrimUnsafeCommits() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final int maxSeqNo = 40; + final List seqNos = LongStream.rangeClosed(0, maxSeqNo).boxed().collect(Collectors.toList()); + Collections.shuffle(seqNos, random()); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + final List commitMaxSeqNo = new ArrayList<>(); + try (InternalEngine engine = createEngine(config)) { + for (int i = 0; i < seqNos.size(); i++) { + ParsedDocument doc = testParsedDocument(Long.toString(seqNos.get(i)), null, testDocument(), new BytesArray("{}"), null); + Engine.Index index = new Engine.Index(newUid(doc), doc, seqNos.get(i), 0, + 1, VersionType.EXTERNAL, REPLICA, System.nanoTime(), -1, false); + engine.index(index); + if (randomBoolean()) { + engine.flush(); + final Long maxSeqNoInCommit = seqNos.subList(0, i + 1).stream().max(Long::compareTo).orElse(-1L); + commitMaxSeqNo.add(maxSeqNoInCommit); + } + } + globalCheckpoint.set(randomInt(maxSeqNo)); + engine.syncTranslog(); + } + + EngineDiskUtils.trimUnsafeCommits(store.directory(), config.getTranslogConfig().getTranslogPath(), + config.getIndexSettings().getIndexVersionCreated()); + long safeMaxSeqNo = + commitMaxSeqNo.stream().filter(s -> s <= globalCheckpoint.get()) + .reduce((s1, s2) -> s2) // get the last one. + .orElse(SequenceNumbers.NO_OPS_PERFORMED); + final List commits = DirectoryReader.listCommits(store.directory()); + assertThat(commits, hasSize(1)); + assertThat(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO), equalTo(Long.toString(safeMaxSeqNo))); + try (IndexReader reader = DirectoryReader.open(commits.get(0))) { + for (LeafReaderContext context: reader.leaves()) { + final NumericDocValues values = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + if (values != null) { + for (int docID = 0; docID < context.reader().maxDoc(); docID++) { + if (values.advanceExact(docID) == false) { + throw new AssertionError("Document does not have a seq number: " + docID); + } + assertThat(values.longValue(), lessThanOrEqualTo(globalCheckpoint.get())); + } + } + } + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 2488ca79fe482..fee2180e3bdfc 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -65,7 +65,6 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; @@ -91,6 +90,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; @@ -759,6 +759,7 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException { } } initialEngine.close(); + trimUnsafeCommits(initialEngine.config()); recoveringEngine = new InternalEngine(initialEngine.config()); recoveringEngine.recoverFromTranslog(); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { @@ -1163,6 +1164,7 @@ public void testSyncedFlushVanishesOnReplay() throws IOException { engine.index(indexForDoc(doc)); EngineConfig config = engine.config(); engine.close(); + trimUnsafeCommits(config); engine = new InternalEngine(config); engine.recoverFromTranslog(); assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); @@ -3494,7 +3496,7 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro } finally { IOUtils.close(initialEngine); } - + trimUnsafeCommits(initialEngine.config()); try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) { recoveringEngine.recoverFromTranslog(); recoveringEngine.fillSeqNoGaps(2); @@ -3846,6 +3848,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { // now do it again to make sure we preserve values etc. try { + trimUnsafeCommits(replicaEngine.config()); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); if (flushed) { assertEquals(0, recoveringEngine.getTranslog().uncommittedOperations()); @@ -4168,31 +4171,6 @@ public void testAcquireIndexCommit() throws Exception { } } - public void testOpenIndexAndTranslogKeepOnlySafeCommit() throws Exception { - IOUtils.close(engine); - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - final EngineConfig config = copy(engine.config(), globalCheckpoint::get); - final IndexCommit safeCommit; - try (InternalEngine engine = createEngine(config)) { - final int numDocs = between(5, 50); - for (int i = 0; i < numDocs; i++) { - index(engine, i); - if (randomBoolean()) { - engine.flush(); - } - } - // Selects a starting commit and advances and persists the global checkpoint to that commit. - final List commits = DirectoryReader.listCommits(engine.store.directory()); - safeCommit = randomFrom(commits); - globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO))); - engine.getTranslog().sync(); - } - try (InternalEngine engine = new InternalEngine(config)) { - final List existingCommits = DirectoryReader.listCommits(engine.store.directory()); - assertThat("safe commit should be kept", existingCommits, contains(safeCommit)); - } - } - public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception { IOUtils.close(engine, store); store = createStore(); @@ -4337,4 +4315,9 @@ public void testStressUpdateSameDocWhileGettingIt() throws IOException, Interrup } } } + + private static void trimUnsafeCommits(EngineConfig config) throws IOException { + EngineDiskUtils.trimUnsafeCommits(config.getStore().directory(), + config.getTranslogConfig().getTranslogPath(), config.getIndexSettings().getIndexVersionCreated()); + } } From 0850ae3e06881733e26907a2c6c6619b69e4fb41 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 18 Mar 2018 21:26:38 +0100 Subject: [PATCH 2/8] doc change --- docs/reference/indices/flush.asciidoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/reference/indices/flush.asciidoc b/docs/reference/indices/flush.asciidoc index 91fac0908ef7f..e172b53f1a83c 100644 --- a/docs/reference/indices/flush.asciidoc +++ b/docs/reference/indices/flush.asciidoc @@ -93,12 +93,12 @@ which returns something similar to: { "commit" : { "id" : "3M3zkw2GHMo2Y4h4/KFKCg==", - "generation" : 3, + "generation" : 2, "user_data" : { "translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA", "history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ", "local_checkpoint" : "-1", - "translog_generation" : "3", + "translog_generation" : "2", "max_seq_no" : "-1", "sync_id" : "AVvFY-071siAOuFGEO9P", <1> "max_unsafe_auto_id_timestamp" : "-1" From 3fbce14d4d9c6f9d583a14636189f3e7234da69c Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 26 Mar 2018 17:42:19 +0200 Subject: [PATCH 3/8] add logc to store --- .../elasticsearch/index/shard/IndexShard.java | 4 +- .../org/elasticsearch/index/store/Store.java | 92 ++++++++++++++++++- .../index/engine/InternalEngineTests.java | 58 +++++++++++- 3 files changed, 145 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 60dd401f9644d..07441e9119f79 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1329,8 +1329,8 @@ private void innerOpenEngineAndTranslog() throws IOException { assertMaxUnsafeAutoIdInCommit(); - EngineDiskUtils.trimUnsafeCommits(store.directory(), translogConfig.getTranslogPath(), - config.getIndexSettings().getIndexVersionCreated()); + final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID); + store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated()); createNewEngine(config); verifyNotClosed(); diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 297790890c1b0..c59066fa4e143 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -24,6 +24,7 @@ import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexFormatTooNewException; @@ -75,6 +76,7 @@ import org.elasticsearch.env.ShardLock; import org.elasticsearch.env.ShardLockObtainFailedException; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.CombinedDeletionPolicy; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -1463,7 +1465,7 @@ private static long estimateSize(Directory directory) throws IOException { */ public void createEmpty() throws IOException { metadataLock.writeLock().lock(); - try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.CREATE, directory)) { + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.CREATE, directory, null)) { final Map map = new HashMap<>(); map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED)); @@ -1482,7 +1484,7 @@ public void createEmpty() throws IOException { */ public void bootstrapNewHistory() throws IOException { metadataLock.writeLock().lock(); - try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) { + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) { final Map userData = getUserData(writer); final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)); final Map map = new HashMap<>(); @@ -1501,7 +1503,7 @@ public void bootstrapNewHistory() throws IOException { */ public void associateIndexWithNewTranslog(final String translogUUID) throws IOException { metadataLock.writeLock().lock(); - try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) { + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) { if (translogUUID.equals(getUserData(writer).get(Translog.TRANSLOG_UUID_KEY))) { throw new IllegalArgumentException("a new translog uuid can't be equal to existing one. got [" + translogUUID + "]"); } @@ -1520,7 +1522,7 @@ public void associateIndexWithNewTranslog(final String translogUUID) throws IOEx */ public void ensureIndexHasHistoryUUID() throws IOException { metadataLock.writeLock().lock(); - try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) { + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) { final Map userData = getUserData(writer); if (userData.containsKey(Engine.HISTORY_UUID_KEY) == false) { updateCommitData(writer, Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID())); @@ -1530,6 +1532,82 @@ public void ensureIndexHasHistoryUUID() throws IOException { } } + /** + * Keeping existing unsafe commits when opening an engine can be problematic because these commits are not safe + * at the recovering time but they can suddenly become safe in the future. + * The following issues can happen if unsafe commits are kept oninit. + *

+ * 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1(max_seqno=1) + * and an unsafe commit c2(max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document(seqno=2) + * is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use + * the unsafe commit c2(max_seqno=2 at most gcp=2) as the starting commit for sequenced-based recovery even the + * commit c2 contains a stale operation and the document(with seqno=2) will not be replicated to the replica. + *

+ * 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when are replica with a safe commit + * c1(local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2(local_checkpoint=2, recovery_translog_gen=2). + * The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new + * commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery + * translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1 + * while the local checkpoint of c2 is 2. + *

+ * 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced + * (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit, + * the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog. + */ + public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long minRetainedTranslogGen, + final org.elasticsearch.Version indexVersionCreated) throws IOException { + metadataLock.writeLock().lock(); + try { + final List existingCommits = DirectoryReader.listCommits(directory); + if (existingCommits.size() == 0) { + throw new IllegalArgumentException("No index found to trim"); + } + final String translogUUID = existingCommits.get(existingCommits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY); + final IndexCommit startingIndexCommit; + // We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog + // are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit. + // To avoid this issue, we only select index commits whose translog are fully retained. + if (indexVersionCreated.before(org.elasticsearch.Version.V_6_2_0)) { + final List recoverableCommits = new ArrayList<>(); + for (IndexCommit commit : existingCommits) { + if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) { + recoverableCommits.add(commit); + } + } + assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " + + "commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]"; + startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint); + } else { + // TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint. + startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint); + } + + if (translogUUID.equals(startingIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY)) == false) { + throw new IllegalStateException("starting commit translog uuid [" + + startingIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY) + "] is not equal to last commit's translog uuid [" + + translogUUID + "]"); + } + if (startingIndexCommit.equals(existingCommits.get(existingCommits.size() - 1)) == false) { + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, startingIndexCommit)) { + // this achieves two things: + // - by committing a new commit based on the starting commit, it make sure the starting commit will be opened + // - deletes any other commit (by lucene standard deletion policy) + // + // note that we can't just use IndexCommit.delete() as we really want to make sure that those files won't be used + // even if a virus scanner causes the files not to be used. + + // TODO: speak to @s1monw about the fact that we can' use getUserData(writer) as that uses that last's commit user + // data rather then the starting commit. + writer.setLiveCommitData(startingIndexCommit.getUserData().entrySet()); + writer.commit(); + } + } + } finally { + metadataLock.writeLock().unlock(); + } + } + + private void updateCommitData(IndexWriter writer, Map keysToUpdate) throws IOException { final Map userData = getUserData(writer); userData.putAll(keysToUpdate); @@ -1543,14 +1621,18 @@ private Map getUserData(IndexWriter writer) { return userData; } - private IndexWriter newIndexWriter(IndexWriterConfig.OpenMode openMode, final Directory dir) throws IOException { + private static IndexWriter newIndexWriter(final IndexWriterConfig.OpenMode openMode, final Directory dir, final IndexCommit commit) + throws IOException { + assert openMode == IndexWriterConfig.OpenMode.APPEND || commit == null : "can't specify create flag with a commit"; IndexWriterConfig iwc = new IndexWriterConfig(null) .setCommitOnClose(false) + .setIndexCommit(commit) // we don't want merges to happen here - we call maybe merge on the engine // later once we stared it up otherwise we would need to wait for it here // we also don't specify a codec here and merges should use the engines for this index .setMergePolicy(NoMergePolicy.INSTANCE) .setOpenMode(openMode); + return new IndexWriter(dir, iwc); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 18cb87621bbe1..a3e3d4d209741 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4443,8 +4443,62 @@ public void testStressUpdateSameDocWhileGettingIt() throws IOException, Interrup } } + public void testTrimUnsafeCommits() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final int maxSeqNo = 40; + final List seqNos = LongStream.rangeClosed(0, maxSeqNo).boxed().collect(Collectors.toList()); + Collections.shuffle(seqNos, random()); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + final List commitMaxSeqNo = new ArrayList<>(); + final long minTranslogGen; + try (InternalEngine engine = createEngine(config)) { + for (int i = 0; i < seqNos.size(); i++) { + ParsedDocument doc = testParsedDocument(Long.toString(seqNos.get(i)), null, testDocument(), new BytesArray("{}"), null); + Engine.Index index = new Engine.Index(newUid(doc), doc, seqNos.get(i), 0, + 1, VersionType.EXTERNAL, REPLICA, System.nanoTime(), -1, false); + engine.index(index); + if (randomBoolean()) { + engine.flush(); + final Long maxSeqNoInCommit = seqNos.subList(0, i + 1).stream().max(Long::compareTo).orElse(-1L); + commitMaxSeqNo.add(maxSeqNoInCommit); + } + } + globalCheckpoint.set(randomInt(maxSeqNo)); + engine.syncTranslog(); + minTranslogGen = engine.getTranslog().getMinFileGeneration(); + } + + store.trimUnsafeCommits(globalCheckpoint.get(), minTranslogGen,config.getIndexSettings().getIndexVersionCreated()); + long safeMaxSeqNo = + commitMaxSeqNo.stream().filter(s -> s <= globalCheckpoint.get()) + .reduce((s1, s2) -> s2) // get the last one. + .orElse(SequenceNumbers.NO_OPS_PERFORMED); + final List commits = DirectoryReader.listCommits(store.directory()); + assertThat(commits, hasSize(1)); + assertThat(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO), equalTo(Long.toString(safeMaxSeqNo))); + try (IndexReader reader = DirectoryReader.open(commits.get(0))) { + for (LeafReaderContext context: reader.leaves()) { + final NumericDocValues values = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + if (values != null) { + for (int docID = 0; docID < context.reader().maxDoc(); docID++) { + if (values.advanceExact(docID) == false) { + throw new AssertionError("Document does not have a seq number: " + docID); + } + assertThat(values.longValue(), lessThanOrEqualTo(globalCheckpoint.get())); + } + } + } + } + } + } + private static void trimUnsafeCommits(EngineConfig config) throws IOException { - EngineDiskUtils.trimUnsafeCommits(config.getStore().directory(), - config.getTranslogConfig().getTranslogPath(), config.getIndexSettings().getIndexVersionCreated()); + final Store store = config.getStore(); + final TranslogConfig translogConfig = config.getTranslogConfig(); + final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); + final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); + final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID); + store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated()); } } From ca26f996ecc56af14cd3e52bdfcaa7044e62ea30 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 26 Mar 2018 23:08:13 +0200 Subject: [PATCH 4/8] back to 3 --- docs/reference/indices/flush.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/indices/flush.asciidoc b/docs/reference/indices/flush.asciidoc index 29b4fb0a705d0..db1f7c2fe00a9 100644 --- a/docs/reference/indices/flush.asciidoc +++ b/docs/reference/indices/flush.asciidoc @@ -93,7 +93,7 @@ which returns something similar to: { "commit" : { "id" : "3M3zkw2GHMo2Y4h4/KFKCg==", - "generation" : 4, + "generation" : 3, "user_data" : { "translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA", "history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ", From a17911382797613fb7bc8006290be0d2347be03e Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 27 Mar 2018 08:44:58 +0200 Subject: [PATCH 5/8] remove new line --- server/src/main/java/org/elasticsearch/index/store/Store.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index c59066fa4e143..2824340b3e98d 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -1632,7 +1632,6 @@ private static IndexWriter newIndexWriter(final IndexWriterConfig.OpenMode openM // we also don't specify a codec here and merges should use the engines for this index .setMergePolicy(NoMergePolicy.INSTANCE) .setOpenMode(openMode); - return new IndexWriter(dir, iwc); } From 491fb12cc6fe5abb5d0c964da2beff0759482c91 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 29 Mar 2018 10:40:02 -0400 Subject: [PATCH 6/8] add comment why manual assign user data --- server/src/main/java/org/elasticsearch/index/store/Store.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 2824340b3e98d..b20dc67474f33 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -1596,8 +1596,8 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long // note that we can't just use IndexCommit.delete() as we really want to make sure that those files won't be used // even if a virus scanner causes the files not to be used. - // TODO: speak to @s1monw about the fact that we can' use getUserData(writer) as that uses that last's commit user - // data rather then the starting commit. + // The new commit will use segment files from the starting commit but userData from the last commit by default. + // Thus, we need to manually set the userData from the starting commit to the new commit. writer.setLiveCommitData(startingIndexCommit.getUserData().entrySet()); writer.commit(); } From 8447be16a8b8d0ce2410f5abf154327b633f0c54 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 29 Mar 2018 11:07:29 -0400 Subject: [PATCH 7/8] Use empty method --- server/src/main/java/org/elasticsearch/index/store/Store.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index b20dc67474f33..83fded4a1f18b 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -1559,7 +1559,7 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long metadataLock.writeLock().lock(); try { final List existingCommits = DirectoryReader.listCommits(directory); - if (existingCommits.size() == 0) { + if (existingCommits.isEmpty()) { throw new IllegalArgumentException("No index found to trim"); } final String translogUUID = existingCommits.get(existingCommits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY); From cccd66312f76fb12129aa6a29a3fe997f4e21ca7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 29 Mar 2018 11:15:00 -0400 Subject: [PATCH 8/8] Remove unused imports --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index b79e51e0ef5c6..24d1fc16b702d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -76,8 +76,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap;