From 98c865a6e754f47d7b1ebfc2ddf082e5842a4228 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 4 Sep 2019 13:00:42 -0400 Subject: [PATCH 1/2] Always rebuild checkpoint tracker for old indices --- .../elasticsearch/common/lucene/Lucene.java | 12 ++-- .../index/engine/InternalEngine.java | 8 ++- .../index/engine/InternalEngineTests.java | 56 +++++++++++++++++++ .../index/engine/EngineTestCase.java | 2 +- 4 files changed, 67 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index d7e20abae2c6a..79a17f22ca640 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -1017,20 +1017,18 @@ public CacheHelper getReaderCacheHelper() { } /** - * Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) between {@code fromSeqNo}(inclusive) and {@code toSeqNo}(inclusive) - * in the provided directory reader. This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found. + * Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) from {@code fromSeqNo}(inclusive) in the provided directory reader. + * This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found. * * @param directoryReader the directory reader to scan * @param fromSeqNo the lower bound of a range of seq_no to scan (inclusive) - * @param toSeqNo the upper bound of a range of seq_no to scan (inclusive) * @param onNewSeqNo the callback to be called whenever a new valid sequence number is found */ - public static void scanSeqNosInReader(DirectoryReader directoryReader, long fromSeqNo, long toSeqNo, - LongConsumer onNewSeqNo) throws IOException { + public static void scanSeqNosInReader(DirectoryReader directoryReader, long fromSeqNo, LongConsumer onNewSeqNo) throws IOException { final DirectoryReader reader = Lucene.wrapAllDocsLive(directoryReader); final IndexSearcher searcher = new IndexSearcher(reader); searcher.setQueryCache(null); - final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo); + final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, Long.MAX_VALUE); final Weight weight = searcher.createWeight(query, false, 1.0f); for (LeafReaderContext leaf : reader.leaves()) { final Scorer scorer = weight.scorer(leaf); @@ -1045,7 +1043,7 @@ public static void scanSeqNosInReader(DirectoryReader directoryReader, long from throw new IllegalStateException("seq_no doc_values not found for doc_id=" + docId); } final long seqNo = seqNoDocValues.longValue(); - assert fromSeqNo <= seqNo && seqNo <= toSeqNo : "from_seq_no=" + fromSeqNo + " seq_no=" + seqNo + " to_seq_no=" + toSeqNo; + assert seqNo >= fromSeqNo : seqNo + " < " + fromSeqNo; onNewSeqNo.accept(seqNo); } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1ed0f5d6f7994..dede898d361be 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -261,10 +261,12 @@ private static LocalCheckpointTracker createLocalCheckpointTracker(EngineConfig // Thus, we need to restore the LocalCheckpointTracker bit by bit to ensure the consistency between LocalCheckpointTracker and // Lucene index. This is not the only solution since we can bootstrap max_seq_no_of_updates with max_seq_no of the commit to // disable the MSU optimization during recovery. Here we prefer to maintain the consistency of LocalCheckpointTracker. - if (localCheckpoint < maxSeqNo && engineConfig.getIndexSettings().isSoftDeleteEnabled()) { + // The max_seq_no of Lucene commit in the old indices might be smaller than seq_no of some documents in the commit. + // We have to rebuild the LocalCheckpointTracker for those indices. See https://github.com/elastic/elasticsearch/pull/38879. + final boolean mustRebuild = engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_6_2); + if (engineConfig.getIndexSettings().isSoftDeleteEnabled() && (localCheckpoint < maxSeqNo || mustRebuild)) { try (Searcher searcher = searcherSupplier.get()) { - Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, maxSeqNo, - tracker::markSeqNoAsCompleted); + Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, tracker::markSeqNoAsCompleted); } } return tracker; diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index b5e34b7218417..2c0370b20654d 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -132,6 +132,7 @@ import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.test.VersionUtils; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; @@ -5632,6 +5633,61 @@ public void testRebuildLocalCheckpointTracker() throws Exception { } } + /** + * Simulate a bug in https://github.com/elastic/elasticsearch/pull/38879 where the max_seq_no + * of the index commit can be smaller than seq_no of some documents in the commit. + */ + public void testAlwaysRebuildLocalCheckpointForOldIndex() throws Exception { + Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomVersionBetween(random(), Version.V_6_5_0, Version.V_6_6_2)) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); + final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); + Path translogPath = createTempDir(); + List operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean()); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get); + final List docs; + try (InternalEngine engine = createEngine(config)) { + for (Engine.Operation op : operations) { + applyOperation(engine, op); + if (randomInt(100) < 10) { + engine.flush(); + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + } + } + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + engine.syncTranslog(); + docs = getDocIds(engine, true); + } + trimUnsafeCommits(config); + // Simulate a bug in https://github.com/elastic/elasticsearch/pull/38879 where max_seq_no is smaller than seq_no of some docs. + if (randomBoolean()) { + IndexWriterConfig iwc = new IndexWriterConfig(null) + .setSoftDeletesField(Lucene.SOFT_DELETES_FIELD) + .setCommitOnClose(false) + .setMergePolicy(NoMergePolicy.INSTANCE) + .setOpenMode(IndexWriterConfig.OpenMode.APPEND); + try (IndexWriter writer = new IndexWriter(config.getStore().directory(), iwc)) { + Map userData = new HashMap<>(); + writer.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue())); + SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet()); + long maxSeqNo = randomLongBetween(commitInfo.localCheckpoint, commitInfo.maxSeqNo); + userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); + writer.setLiveCommitData(userData.entrySet()); + writer.commit(); + } + } + try (InternalEngine engine = new InternalEngine(config)) { + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + assertThat(getDocIds(engine, true), equalTo(docs)); + } + } + } + public void testOpenSoftDeletesIndexWithSoftDeletesDisabled() throws Exception { try (Store store = createStore()) { Path translogPath = createTempDir(); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 7ae6f6ce8f873..6fcd5c63fd52b 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1095,7 +1095,7 @@ public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exceptio for (IndexCommit commit : commits) { try (DirectoryReader reader = DirectoryReader.open(commit)) { AtomicLong maxSeqNoFromDocs = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - Lucene.scanSeqNosInReader(reader, 0, Long.MAX_VALUE, n -> maxSeqNoFromDocs.set(Math.max(n, maxSeqNoFromDocs.get()))); + Lucene.scanSeqNosInReader(reader, 0, n -> maxSeqNoFromDocs.set(Math.max(n, maxSeqNoFromDocs.get()))); assertThat(Long.parseLong(commit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)), greaterThanOrEqualTo(maxSeqNoFromDocs.get())); } From 797862b3e2a5e4d277027138afe2cfeb4ec811a4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 5 Sep 2019 12:28:35 -0400 Subject: [PATCH 2/2] =?UTF-8?q?yannick=E2=80=99s=20feedback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/elasticsearch/common/lucene/Lucene.java | 12 +++++++----- .../elasticsearch/index/engine/InternalEngine.java | 4 +++- .../index/engine/InternalEngineTests.java | 2 +- .../elasticsearch/index/engine/EngineTestCase.java | 2 +- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 79a17f22ca640..d7e20abae2c6a 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -1017,18 +1017,20 @@ public CacheHelper getReaderCacheHelper() { } /** - * Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) from {@code fromSeqNo}(inclusive) in the provided directory reader. - * This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found. + * Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) between {@code fromSeqNo}(inclusive) and {@code toSeqNo}(inclusive) + * in the provided directory reader. This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found. * * @param directoryReader the directory reader to scan * @param fromSeqNo the lower bound of a range of seq_no to scan (inclusive) + * @param toSeqNo the upper bound of a range of seq_no to scan (inclusive) * @param onNewSeqNo the callback to be called whenever a new valid sequence number is found */ - public static void scanSeqNosInReader(DirectoryReader directoryReader, long fromSeqNo, LongConsumer onNewSeqNo) throws IOException { + public static void scanSeqNosInReader(DirectoryReader directoryReader, long fromSeqNo, long toSeqNo, + LongConsumer onNewSeqNo) throws IOException { final DirectoryReader reader = Lucene.wrapAllDocsLive(directoryReader); final IndexSearcher searcher = new IndexSearcher(reader); searcher.setQueryCache(null); - final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, Long.MAX_VALUE); + final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo); final Weight weight = searcher.createWeight(query, false, 1.0f); for (LeafReaderContext leaf : reader.leaves()) { final Scorer scorer = weight.scorer(leaf); @@ -1043,7 +1045,7 @@ public static void scanSeqNosInReader(DirectoryReader directoryReader, long from throw new IllegalStateException("seq_no doc_values not found for doc_id=" + docId); } final long seqNo = seqNoDocValues.longValue(); - assert seqNo >= fromSeqNo : seqNo + " < " + fromSeqNo; + assert fromSeqNo <= seqNo && seqNo <= toSeqNo : "from_seq_no=" + fromSeqNo + " seq_no=" + seqNo + " to_seq_no=" + toSeqNo; onNewSeqNo.accept(seqNo); } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index dede898d361be..0495290c29fd9 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -263,10 +263,12 @@ private static LocalCheckpointTracker createLocalCheckpointTracker(EngineConfig // disable the MSU optimization during recovery. Here we prefer to maintain the consistency of LocalCheckpointTracker. // The max_seq_no of Lucene commit in the old indices might be smaller than seq_no of some documents in the commit. // We have to rebuild the LocalCheckpointTracker for those indices. See https://github.com/elastic/elasticsearch/pull/38879. + // Note that this bug affects only indices created between 6.5.0 and 6.6.1 with soft-deletes is explicitly enabled. final boolean mustRebuild = engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_6_2); if (engineConfig.getIndexSettings().isSoftDeleteEnabled() && (localCheckpoint < maxSeqNo || mustRebuild)) { try (Searcher searcher = searcherSupplier.get()) { - Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, tracker::markSeqNoAsCompleted); + final long toSeqNo = mustRebuild ? Long.MAX_VALUE : maxSeqNo; + Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, toSeqNo, tracker::markSeqNoAsCompleted); } } return tracker; diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 2c0370b20654d..e201e9fb78f89 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5640,7 +5640,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception { public void testAlwaysRebuildLocalCheckpointForOldIndex() throws Exception { Settings.Builder settings = Settings.builder() .put(defaultSettings.getSettings()) - .put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomVersionBetween(random(), Version.V_6_5_0, Version.V_6_6_2)) + .put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomVersionBetween(random(), Version.V_6_5_0, Version.V_6_6_1)) .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 6fcd5c63fd52b..7ae6f6ce8f873 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1095,7 +1095,7 @@ public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exceptio for (IndexCommit commit : commits) { try (DirectoryReader reader = DirectoryReader.open(commit)) { AtomicLong maxSeqNoFromDocs = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - Lucene.scanSeqNosInReader(reader, 0, n -> maxSeqNoFromDocs.set(Math.max(n, maxSeqNoFromDocs.get()))); + Lucene.scanSeqNosInReader(reader, 0, Long.MAX_VALUE, n -> maxSeqNoFromDocs.set(Math.max(n, maxSeqNoFromDocs.get()))); assertThat(Long.parseLong(commit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)), greaterThanOrEqualTo(maxSeqNoFromDocs.get())); }