From 214b469ed008f8a9fc199d6fb07098289ecf9d63 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 27 Apr 2018 18:36:10 -0400 Subject: [PATCH 01/15] Exclude soft-deleted documents in commit stats Since #29458, we use a searcher to calculate the number of documents for a commit stats. Sadly, that approach is flawed. The searcher might no longer point to the last commit if it's refreshed. This commit uses SoftDeletesDirectoryReaderWrapper to exclude the soft-deleted documents from numDocs in a SegmentInfos. I chose to modify the method Luence#getNumDocs so that we can read a store metadata snapshot correctly without opening an engine. Relates #29458 --- .../elasticsearch/common/lucene/Lucene.java | 12 +++---- .../elasticsearch/index/engine/Engine.java | 8 +++-- .../org/elasticsearch/index/store/Store.java | 2 +- .../common/lucene/LuceneTests.java | 10 +++--- .../index/engine/InternalEngineTests.java | 32 +++++++++++++++++++ 5 files changed, 50 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index f50995048eadd..818dafe865987 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -44,6 +44,8 @@ import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; +import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Explanation; import org.apache.lucene.search.FieldDoc; @@ -139,14 +141,12 @@ public static Iterable files(SegmentInfos infos) throws IOException { } /** - * Returns the number of documents in the index referenced by this {@link SegmentInfos} + * Returns the number of live documents in the index referenced by this {@link SegmentInfos} */ - public static int getNumDocs(SegmentInfos info) { - int numDocs = 0; - for (SegmentCommitInfo si : info) { - numDocs += si.info.maxDoc() - si.getDelCount(); + public static int getNumDocs(Directory directory, SegmentInfos sis) throws IOException { + try (DirectoryReader reader = StandardDirectoryReader.open(directory, sis, Collections.emptyList())) { + return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETE_FIELD).numDocs(); } - return numDocs; } /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 47060aa016574..4166564a73636 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -71,6 +71,7 @@ import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.file.NoSuchFileException; import java.util.Arrays; import java.util.Base64; @@ -577,8 +578,11 @@ protected final void ensureOpen() { /** get commits stats for the last commit */ public CommitStats commitStats() { - try (Engine.Searcher searcher = acquireSearcher("commit_stats", Engine.SearcherScope.INTERNAL)) { - return new CommitStats(getLastCommittedSegmentInfos(), searcher.reader().numDocs()); + try (IndexCommitRef commitRef = acquireLastIndexCommit(false)) { + final SegmentInfos commitInfos = Lucene.readSegmentInfos(commitRef.indexCommit); + return new CommitStats(commitInfos, Lucene.getNumDocs(store.directory(), commitInfos)); + } catch (IOException ex) { + throw new UncheckedIOException(ex); } } diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 78b2119d79a36..4a807db9d2b08 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -862,7 +862,7 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg Map commitUserDataBuilder = new HashMap<>(); try { final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory); - numDocs = Lucene.getNumDocs(segmentCommitInfos); + numDocs = Lucene.getNumDocs(directory, segmentCommitInfos); commitUserDataBuilder.putAll(segmentCommitInfos.getUserData()); Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion(); // we don't know which version was used to write so we take the max version. for (SegmentCommitInfo info : segmentCommitInfos) { diff --git a/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java b/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java index 753aedea01e02..152042daca744 100644 --- a/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java +++ b/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java @@ -283,7 +283,7 @@ public void testNumDocs() throws IOException { writer.addDocument(doc); writer.commit(); SegmentInfos segmentCommitInfos = Lucene.readSegmentInfos(dir); - assertEquals(1, Lucene.getNumDocs(segmentCommitInfos)); + assertEquals(1, Lucene.getNumDocs(dir, segmentCommitInfos)); doc = new Document(); doc.add(new TextField("id", "2", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); @@ -293,14 +293,14 @@ public void testNumDocs() throws IOException { doc.add(new TextField("id", "3", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); writer.addDocument(doc); segmentCommitInfos = Lucene.readSegmentInfos(dir); - assertEquals(1, Lucene.getNumDocs(segmentCommitInfos)); + assertEquals(1, Lucene.getNumDocs(dir, segmentCommitInfos)); writer.commit(); segmentCommitInfos = Lucene.readSegmentInfos(dir); - assertEquals(3, Lucene.getNumDocs(segmentCommitInfos)); + assertEquals(3, Lucene.getNumDocs(dir, segmentCommitInfos)); writer.deleteDocuments(new Term("id", "2")); writer.commit(); segmentCommitInfos = Lucene.readSegmentInfos(dir); - assertEquals(2, Lucene.getNumDocs(segmentCommitInfos)); + assertEquals(2, Lucene.getNumDocs(dir, segmentCommitInfos)); int numDocsToIndex = randomIntBetween(10, 50); List deleteTerms = new ArrayList<>(); @@ -318,7 +318,7 @@ public void testNumDocs() throws IOException { } writer.commit(); segmentCommitInfos = Lucene.readSegmentInfos(dir); - assertEquals(2 + deleteTerms.size(), Lucene.getNumDocs(segmentCommitInfos)); + assertEquals(2 + deleteTerms.size(), Lucene.getNumDocs(dir, segmentCommitInfos)); writer.close(); dir.close(); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index a61f2b462f616..4607fdf017ada 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -626,6 +626,38 @@ public long getCheckpoint() { } } + public void testCommitStatsNumDocs() throws Exception { + final MergePolicy keepSoftDeleteDocsMP = new SoftDeletesRetentionMergePolicy( + Lucene.SOFT_DELETE_FIELD, () -> new MatchAllDocsQuery(), engine.config().getMergePolicy()); + try (Store store = createStore(); + Engine engine = createEngine(config(defaultSettings, store, createTempDir(), keepSoftDeleteDocsMP, null))) { + final Set pendingDocs = new HashSet<>(); + int flushedDocs = 0; + final int iters = scaledRandomIntBetween(10, 100); + for (int i = 0; i < iters; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), SOURCE, null); + engine.index(indexForDoc(doc)); + pendingDocs.add(doc.id()); + if (randomBoolean()) { + engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get())); + pendingDocs.remove(doc.id()); + } + if (randomBoolean()) { + engine.index(indexForDoc(doc)); + pendingDocs.add(doc.id()); + } + if (randomBoolean()) { + engine.flush(); + flushedDocs = pendingDocs.size(); + } + if (randomBoolean()) { + engine.refresh("test"); + } + assertThat(engine.commitStats().getNumDocs(), equalTo(flushedDocs)); + } + } + } + public void testIndexSearcherWrapper() throws Exception { final AtomicInteger counter = new AtomicInteger(); IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { From fe542961befb7d8ba45e4f28bbcd2f865fff34de Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 28 Apr 2018 19:32:25 -0400 Subject: [PATCH 02/15] Optimize for no soft-deletes case --- .../elasticsearch/index/engine/Engine.java | 25 ++++++++++++++++--- .../index/store/CorruptedFileIT.java | 3 ++- .../test/InternalTestCluster.java | 7 +++++- 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 4166564a73636..0715754279d82 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -578,10 +578,29 @@ protected final void ensureOpen() { /** get commits stats for the last commit */ public CommitStats commitStats() { - try (IndexCommitRef commitRef = acquireLastIndexCommit(false)) { - final SegmentInfos commitInfos = Lucene.readSegmentInfos(commitRef.indexCommit); - return new CommitStats(commitInfos, Lucene.getNumDocs(store.directory(), commitInfos)); + try (IndexCommitRef commitRef = acquireLastIndexCommit(false)) { // Need to retain the commit as we might open it. + final SegmentInfos sis = Lucene.readSegmentInfos(commitRef.getIndexCommit()); + boolean hasSoftDeletes = false; + try (Searcher searcher = acquireSearcher("commit_stats", SearcherScope.INTERNAL)) { + for (LeafReaderContext leaf : searcher.reader().leaves()) { + if (leaf.reader().getNumericDocValues(Lucene.SOFT_DELETE_FIELD) != null) { + hasSoftDeletes = true; + break; + } + } + } + if (hasSoftDeletes) { + return new CommitStats(sis, Lucene.getNumDocs(store.directory(), sis)); + } else { + // We can use static counters if there is no soft-deletes. + int numDocs = 0; + for (SegmentCommitInfo si : sis) { + numDocs += (si.info.maxDoc() - si.getDelCount()); + } + return new CommitStats(sis, numDocs); + } } catch (IOException ex) { + maybeFailEngine("commit_stats", ex); throw new UncheckedIOException(ex); } } diff --git a/server/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java b/server/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java index bbfa56a0e55fe..99db081eceda1 100644 --- a/server/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java +++ b/server/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java @@ -685,7 +685,8 @@ private void pruneOldDeleteGenerations(Set files) { } public List listShardFiles(ShardRouting routing) throws IOException { - NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(routing.currentNodeId()).setFs(true).get(); + NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(routing.currentNodeId()) + .clear().setIndices(false).setFs(true).get(); ClusterState state = client().admin().cluster().prepareState().get().getState(); final Index test = state.metaData().index("test").getIndex(); assertThat(routing.toString(), nodeStatses.getNodes().size(), equalTo(1)); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 12acd21903ec4..cd18489746f43 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1114,7 +1114,12 @@ private void assertSameSyncIdSameDocs() { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { - CommitStats commitStats = indexShard.commitStats(); + CommitStats commitStats = null; + try { + commitStats = indexShard.commitStats(); + } catch (Exception ex) { + logger.warn("Failed to read commit stats", ex); + } if (commitStats != null) { // null if the engine is closed or if the shard is recovering String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID); if (syncId != null) { From f479d3c5ab0f80f1ac57debbb1aadd958c6fde74 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 28 Apr 2018 22:55:12 -0400 Subject: [PATCH 03/15] Remove optimization It might be incorrect if forceMerge happens --- .../elasticsearch/index/engine/Engine.java | 23 +++---------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 0715754279d82..c30b736a52e20 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -578,27 +578,10 @@ protected final void ensureOpen() { /** get commits stats for the last commit */ public CommitStats commitStats() { - try (IndexCommitRef commitRef = acquireLastIndexCommit(false)) { // Need to retain the commit as we might open it. + // Need to retain the commit as we will open it. + try (IndexCommitRef commitRef = acquireLastIndexCommit(false)) { final SegmentInfos sis = Lucene.readSegmentInfos(commitRef.getIndexCommit()); - boolean hasSoftDeletes = false; - try (Searcher searcher = acquireSearcher("commit_stats", SearcherScope.INTERNAL)) { - for (LeafReaderContext leaf : searcher.reader().leaves()) { - if (leaf.reader().getNumericDocValues(Lucene.SOFT_DELETE_FIELD) != null) { - hasSoftDeletes = true; - break; - } - } - } - if (hasSoftDeletes) { - return new CommitStats(sis, Lucene.getNumDocs(store.directory(), sis)); - } else { - // We can use static counters if there is no soft-deletes. - int numDocs = 0; - for (SegmentCommitInfo si : sis) { - numDocs += (si.info.maxDoc() - si.getDelCount()); - } - return new CommitStats(sis, numDocs); - } + return new CommitStats(sis, Lucene.getNumDocs(store.directory(), sis)); } catch (IOException ex) { maybeFailEngine("commit_stats", ex); throw new UncheckedIOException(ex); From 3480024e6d10e013dcdbf6d8e0ccc41a1a672b08 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 30 Apr 2018 22:11:46 -0400 Subject: [PATCH 04/15] =?UTF-8?q?store=20soft-delete=20option=20in=20commi?= =?UTF-8?q?t=E2=80=99s=20userdata?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../elasticsearch/common/lucene/Lucene.java | 25 +++++-- .../elasticsearch/index/engine/Engine.java | 13 +--- .../index/engine/InternalEngine.java | 30 ++++++++ .../org/elasticsearch/index/store/Store.java | 2 +- .../common/lucene/LuceneTests.java | 10 +-- .../index/engine/InternalEngineTests.java | 69 ++++++++++++------- 6 files changed, 104 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 818dafe865987..1eb2bd268f0e4 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -78,6 +78,7 @@ import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.analysis.AnalyzerScope; import org.elasticsearch.index.analysis.NamedAnalyzer; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.fielddata.IndexFieldData; import java.io.IOException; @@ -141,11 +142,27 @@ public static Iterable files(SegmentInfos infos) throws IOException { } /** - * Returns the number of live documents in the index referenced by this {@link SegmentInfos} + * Returns the number of documents in the index referenced by this {@link SegmentInfos} */ - public static int getNumDocs(Directory directory, SegmentInfos sis) throws IOException { - try (DirectoryReader reader = StandardDirectoryReader.open(directory, sis, Collections.emptyList())) { - return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETE_FIELD).numDocs(); + public static int getNumDocs(SegmentInfos info) { + int numDocs = 0; + for (SegmentCommitInfo si : info) { + numDocs += si.info.maxDoc() - si.getDelCount(); + } + return numDocs; + } + + /** + * Returns the number of documents in the given index commit. + */ + public static int getNumDocs(IndexCommit indexCommit) throws IOException { + final String softDeleteEnabled = indexCommit.getUserData().get(Engine.SOFT_DELETES_COMMIT_KEY); + if (Boolean.parseBoolean(softDeleteEnabled)) { + try (DirectoryReader reader = StandardDirectoryReader.open(indexCommit)) { + return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETE_FIELD).numDocs(); + } + } else { + return getNumDocs(readSegmentInfos(indexCommit)); } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index c30b736a52e20..d00ded15cf862 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -71,7 +71,6 @@ import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.file.NoSuchFileException; import java.util.Arrays; import java.util.Base64; @@ -97,6 +96,7 @@ public abstract class Engine implements Closeable { public static final String SYNC_COMMIT_ID = "sync_id"; public static final String HISTORY_UUID_KEY = "history_uuid"; + public static final String SOFT_DELETES_COMMIT_KEY = "soft_deletes"; protected final ShardId shardId; protected final String allocationId; @@ -577,16 +577,7 @@ protected final void ensureOpen() { } /** get commits stats for the last commit */ - public CommitStats commitStats() { - // Need to retain the commit as we will open it. - try (IndexCommitRef commitRef = acquireLastIndexCommit(false)) { - final SegmentInfos sis = Lucene.readSegmentInfos(commitRef.getIndexCommit()); - return new CommitStats(sis, Lucene.getNumDocs(store.directory(), sis)); - } catch (IOException ex) { - maybeFailEngine("commit_stats", ex); - throw new UncheckedIOException(ex); - } - } + public abstract CommitStats commitStats(); /** * The sequence number service for this engine. diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 88f271914239a..40c85f4f28726 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -80,6 +80,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -121,6 +122,7 @@ public class InternalEngine extends Engine { private final LiveVersionMap versionMap = new LiveVersionMap(); private volatile SegmentInfos lastCommittedSegmentInfos; + private volatile CommitStats lastComputedCommitStats; private final IndexThrottle throttle; @@ -1823,6 +1825,33 @@ protected SegmentInfos getLastCommittedSegmentInfos() { return lastCommittedSegmentInfos; } + @Override + public CommitStats commitStats() { + if (softDeleteEnabled) { + // Need to retain the commit as we might open it. + try (IndexCommitRef commitRef = acquireLastIndexCommit(false)) { + final IndexCommit indexCommit = commitRef.getIndexCommit(); + CommitStats lastCommitStats = this.lastComputedCommitStats; + if (lastCommitStats != null && lastCommitStats.getGeneration() == indexCommit.getGeneration()) { + return lastCommitStats; + } + lastCommitStats = new CommitStats(Lucene.readSegmentInfos(indexCommit), Lucene.getNumDocs(indexCommit)); + this.lastComputedCommitStats = lastCommitStats; + return lastCommitStats; + } catch (IOException e) { + try { + maybeFailEngine("commit_stats", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw new UncheckedIOException(e); + } + } else { + final SegmentInfos sis = this.lastCommittedSegmentInfos; + return new CommitStats(this.lastCommittedSegmentInfos, Lucene.getNumDocs(sis)); + } + } + @Override protected final void writerSegmentStats(SegmentsStats stats) { stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed()); @@ -2179,6 +2208,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo())); commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); commitData.put(HISTORY_UUID_KEY, historyUUID); + commitData.put(SOFT_DELETES_COMMIT_KEY, Boolean.toString(softDeleteEnabled)); logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); }); diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 4a807db9d2b08..dbfcf111dea3d 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -862,7 +862,7 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg Map commitUserDataBuilder = new HashMap<>(); try { final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory); - numDocs = Lucene.getNumDocs(directory, segmentCommitInfos); + numDocs = commit != null ? Lucene.getNumDocs(commit) : 0; commitUserDataBuilder.putAll(segmentCommitInfos.getUserData()); Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion(); // we don't know which version was used to write so we take the max version. for (SegmentCommitInfo info : segmentCommitInfos) { diff --git a/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java b/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java index 152042daca744..753aedea01e02 100644 --- a/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java +++ b/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java @@ -283,7 +283,7 @@ public void testNumDocs() throws IOException { writer.addDocument(doc); writer.commit(); SegmentInfos segmentCommitInfos = Lucene.readSegmentInfos(dir); - assertEquals(1, Lucene.getNumDocs(dir, segmentCommitInfos)); + assertEquals(1, Lucene.getNumDocs(segmentCommitInfos)); doc = new Document(); doc.add(new TextField("id", "2", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); @@ -293,14 +293,14 @@ public void testNumDocs() throws IOException { doc.add(new TextField("id", "3", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); writer.addDocument(doc); segmentCommitInfos = Lucene.readSegmentInfos(dir); - assertEquals(1, Lucene.getNumDocs(dir, segmentCommitInfos)); + assertEquals(1, Lucene.getNumDocs(segmentCommitInfos)); writer.commit(); segmentCommitInfos = Lucene.readSegmentInfos(dir); - assertEquals(3, Lucene.getNumDocs(dir, segmentCommitInfos)); + assertEquals(3, Lucene.getNumDocs(segmentCommitInfos)); writer.deleteDocuments(new Term("id", "2")); writer.commit(); segmentCommitInfos = Lucene.readSegmentInfos(dir); - assertEquals(2, Lucene.getNumDocs(dir, segmentCommitInfos)); + assertEquals(2, Lucene.getNumDocs(segmentCommitInfos)); int numDocsToIndex = randomIntBetween(10, 50); List deleteTerms = new ArrayList<>(); @@ -318,7 +318,7 @@ public void testNumDocs() throws IOException { } writer.commit(); segmentCommitInfos = Lucene.readSegmentInfos(dir); - assertEquals(2 + deleteTerms.size(), Lucene.getNumDocs(dir, segmentCommitInfos)); + assertEquals(2 + deleteTerms.size(), Lucene.getNumDocs(segmentCommitInfos)); writer.close(); dir.close(); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 4607fdf017ada..5c32a834e3ad7 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -122,6 +122,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.IndexSettingsModule; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; @@ -627,33 +628,53 @@ public long getCheckpoint() { } public void testCommitStatsNumDocs() throws Exception { - final MergePolicy keepSoftDeleteDocsMP = new SoftDeletesRetentionMergePolicy( - Lucene.SOFT_DELETE_FIELD, () -> new MatchAllDocsQuery(), engine.config().getMergePolicy()); + Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); + IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); + final MergePolicy keepSoftDeleteDocsMP = new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, + () -> new MatchAllDocsQuery(), newMergePolicy()); try (Store store = createStore(); - Engine engine = createEngine(config(defaultSettings, store, createTempDir(), keepSoftDeleteDocsMP, null))) { - final Set pendingDocs = new HashSet<>(); - int flushedDocs = 0; - final int iters = scaledRandomIntBetween(10, 100); - for (int i = 0; i < iters; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), SOURCE, null); + Engine softDeletesEngine = createEngine(config(IndexSettingsModule.newIndexSettings(indexMetaData), store, createTempDir(), + keepSoftDeleteDocsMP, null))) { + assertNumDocsInCommitStats(softDeletesEngine); + } + // Without soft-deletes + settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false); + indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); + try (Store store = createStore(); + Engine hardDeleteEngine = createEngine(config(IndexSettingsModule.newIndexSettings(indexMetaData), store, createTempDir(), + newMergePolicy(), null))) { + assertNumDocsInCommitStats(hardDeleteEngine); + } + } + + private void assertNumDocsInCommitStats(Engine engine) throws IOException { + final Set pendingDocs = new HashSet<>(); + int flushedDocs = 0; + final int iters = scaledRandomIntBetween(5, 20); + for (int i = 0; i < iters; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), SOURCE, null); + engine.index(indexForDoc(doc)); + pendingDocs.add(doc.id()); + if (randomBoolean()) { + engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get())); + pendingDocs.remove(doc.id()); + } + if (randomBoolean()) { engine.index(indexForDoc(doc)); pendingDocs.add(doc.id()); - if (randomBoolean()) { - engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get())); - pendingDocs.remove(doc.id()); - } - if (randomBoolean()) { - engine.index(indexForDoc(doc)); - pendingDocs.add(doc.id()); - } - if (randomBoolean()) { - engine.flush(); - flushedDocs = pendingDocs.size(); - } - if (randomBoolean()) { - engine.refresh("test"); - } - assertThat(engine.commitStats().getNumDocs(), equalTo(flushedDocs)); + } + if (randomBoolean()) { + engine.flush(); + flushedDocs = pendingDocs.size(); + } + if (randomBoolean()) { + engine.refresh("test"); + } + assertThat(engine.commitStats().getNumDocs(), equalTo(flushedDocs)); + try (Engine.IndexCommitRef commitRef = engine.acquireLastIndexCommit(false)) { + assertThat(Lucene.getNumDocs(commitRef.getIndexCommit()), equalTo(flushedDocs)); } } } From 27119cdd6e08b50babec2c5a2871cabe99e1ff9e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 30 Apr 2018 22:28:02 -0400 Subject: [PATCH 05/15] Read local var --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +- .../org/elasticsearch/index/engine/InternalEngineTests.java | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 40c85f4f28726..6d00ec7937ac9 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1848,7 +1848,7 @@ public CommitStats commitStats() { } } else { final SegmentInfos sis = this.lastCommittedSegmentInfos; - return new CommitStats(this.lastCommittedSegmentInfos, Lucene.getNumDocs(sis)); + return new CommitStats(sis, Lucene.getNumDocs(sis)); } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5c32a834e3ad7..b4240006eb04e 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -634,6 +634,7 @@ public void testCommitStatsNumDocs() throws Exception { IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); final MergePolicy keepSoftDeleteDocsMP = new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, () -> new MatchAllDocsQuery(), newMergePolicy()); + // With soft-deletes try (Store store = createStore(); Engine softDeletesEngine = createEngine(config(IndexSettingsModule.newIndexSettings(indexMetaData), store, createTempDir(), keepSoftDeleteDocsMP, null))) { @@ -643,9 +644,9 @@ public void testCommitStatsNumDocs() throws Exception { settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false); indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); try (Store store = createStore(); - Engine hardDeleteEngine = createEngine(config(IndexSettingsModule.newIndexSettings(indexMetaData), store, createTempDir(), + Engine hardDeletesEngine = createEngine(config(IndexSettingsModule.newIndexSettings(indexMetaData), store, createTempDir(), newMergePolicy(), null))) { - assertNumDocsInCommitStats(hardDeleteEngine); + assertNumDocsInCommitStats(hardDeletesEngine); } } From 67cdffa593f0eb73ae7545d853a215d83cea1697 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 30 Apr 2018 22:32:29 -0400 Subject: [PATCH 06/15] test once is enough --- .../org/elasticsearch/index/engine/InternalEngineTests.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index b4240006eb04e..9bc5181cbbc28 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -674,9 +674,6 @@ private void assertNumDocsInCommitStats(Engine engine) throws IOException { engine.refresh("test"); } assertThat(engine.commitStats().getNumDocs(), equalTo(flushedDocs)); - try (Engine.IndexCommitRef commitRef = engine.acquireLastIndexCommit(false)) { - assertThat(Lucene.getNumDocs(commitRef.getIndexCommit()), equalTo(flushedDocs)); - } } } From 9e4a98d58caf99c6d533b9163c30b16ec00dfb2e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 30 Apr 2018 23:25:53 -0400 Subject: [PATCH 07/15] Use the last commit if commit is null --- .../java/org/elasticsearch/index/store/Store.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index dbfcf111dea3d..ed9f26df5cba7 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -857,12 +857,19 @@ static class LoadedMetadata { } static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logger logger) throws IOException { - long numDocs; + final long numDocs; Map builder = new HashMap<>(); Map commitUserDataBuilder = new HashMap<>(); try { + final IndexCommit snapshottingCommit; + if (commit == null) { + final List existingCommits = DirectoryReader.listCommits(directory); + snapshottingCommit = existingCommits.get(existingCommits.size() - 1); // take the latest commit in store. + } else { + snapshottingCommit = commit; + } final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory); - numDocs = commit != null ? Lucene.getNumDocs(commit) : 0; + numDocs = Lucene.getNumDocs(snapshottingCommit); commitUserDataBuilder.putAll(segmentCommitInfos.getUserData()); Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion(); // we don't know which version was used to write so we take the max version. for (SegmentCommitInfo info : segmentCommitInfos) { From 7827f4176b33c8cc5b1bdbc111efce00e387df4d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 30 Apr 2018 23:30:27 -0400 Subject: [PATCH 08/15] Use snapshotting commit for both --- .../main/java/org/elasticsearch/index/store/Store.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index ed9f26df5cba7..788efade23a51 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -858,8 +858,8 @@ static class LoadedMetadata { static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logger logger) throws IOException { final long numDocs; - Map builder = new HashMap<>(); - Map commitUserDataBuilder = new HashMap<>(); + final Map builder = new HashMap<>(); + final Map commitUserData; try { final IndexCommit snapshottingCommit; if (commit == null) { @@ -868,9 +868,9 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg } else { snapshottingCommit = commit; } - final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory); + final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(snapshottingCommit, directory); numDocs = Lucene.getNumDocs(snapshottingCommit); - commitUserDataBuilder.putAll(segmentCommitInfos.getUserData()); + commitUserData = Collections.unmodifiableMap(segmentCommitInfos.getUserData()); Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion(); // we don't know which version was used to write so we take the max version. for (SegmentCommitInfo info : segmentCommitInfos) { final Version version = info.info.getVersion(); @@ -909,7 +909,7 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg } throw ex; } - return new LoadedMetadata(unmodifiableMap(builder), unmodifiableMap(commitUserDataBuilder), numDocs); + return new LoadedMetadata(unmodifiableMap(builder), commitUserData, numDocs); } private static void checksumFromLuceneFile(Directory directory, String file, Map builder, From 577291dcad7ab4f273459dfa48c5d6a0a5e5c3bd Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 1 May 2018 16:02:11 -0400 Subject: [PATCH 09/15] Calculate numDocs in engine only --- .../elasticsearch/common/lucene/Lucene.java | 17 ------- .../elasticsearch/index/engine/Engine.java | 1 - .../index/engine/InternalEngine.java | 44 ++++++++++--------- .../org/elasticsearch/index/store/Store.java | 21 +++------ .../index/store/CorruptedFileIT.java | 3 +- .../test/InternalTestCluster.java | 7 +-- 6 files changed, 32 insertions(+), 61 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 1eb2bd268f0e4..f50995048eadd 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -44,8 +44,6 @@ import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; -import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; -import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Explanation; import org.apache.lucene.search.FieldDoc; @@ -78,7 +76,6 @@ import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.analysis.AnalyzerScope; import org.elasticsearch.index.analysis.NamedAnalyzer; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.fielddata.IndexFieldData; import java.io.IOException; @@ -152,20 +149,6 @@ public static int getNumDocs(SegmentInfos info) { return numDocs; } - /** - * Returns the number of documents in the given index commit. - */ - public static int getNumDocs(IndexCommit indexCommit) throws IOException { - final String softDeleteEnabled = indexCommit.getUserData().get(Engine.SOFT_DELETES_COMMIT_KEY); - if (Boolean.parseBoolean(softDeleteEnabled)) { - try (DirectoryReader reader = StandardDirectoryReader.open(indexCommit)) { - return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETE_FIELD).numDocs(); - } - } else { - return getNumDocs(readSegmentInfos(indexCommit)); - } - } - /** * Reads the segments infos from the given commit, failing if it fails to load */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index d00ded15cf862..0252ec26af136 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -96,7 +96,6 @@ public abstract class Engine implements Closeable { public static final String SYNC_COMMIT_ID = "sync_id"; public static final String HISTORY_UUID_KEY = "history_uuid"; - public static final String SOFT_DELETES_COMMIT_KEY = "soft_deletes"; protected final ShardId shardId; protected final String allocationId; diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 6d00ec7937ac9..7523ff3e3bdfb 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -34,6 +34,7 @@ import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ReferenceManager; @@ -1827,29 +1828,31 @@ protected SegmentInfos getLastCommittedSegmentInfos() { @Override public CommitStats commitStats() { - if (softDeleteEnabled) { - // Need to retain the commit as we might open it. - try (IndexCommitRef commitRef = acquireLastIndexCommit(false)) { - final IndexCommit indexCommit = commitRef.getIndexCommit(); - CommitStats lastCommitStats = this.lastComputedCommitStats; - if (lastCommitStats != null && lastCommitStats.getGeneration() == indexCommit.getGeneration()) { - return lastCommitStats; - } - lastCommitStats = new CommitStats(Lucene.readSegmentInfos(indexCommit), Lucene.getNumDocs(indexCommit)); - this.lastComputedCommitStats = lastCommitStats; - return lastCommitStats; - } catch (IOException e) { - try { - maybeFailEngine("commit_stats", e); - } catch (Exception inner) { - e.addSuppressed(inner); - } - throw new UncheckedIOException(e); - } - } else { + if (softDeleteEnabled == false) { final SegmentInfos sis = this.lastCommittedSegmentInfos; return new CommitStats(sis, Lucene.getNumDocs(sis)); } + // Need to retain the commit as we might open it. + try (IndexCommitRef commitRef = acquireLastIndexCommit(false)) { + final IndexCommit indexCommit = commitRef.getIndexCommit(); + CommitStats commitStats = this.lastComputedCommitStats; + if (commitStats != null && commitStats.getGeneration() == indexCommit.getGeneration()) { + return commitStats; + } + try (DirectoryReader reader = DirectoryReader.open(indexCommit)) { + final int numDocs = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETE_FIELD).numDocs(); + commitStats = new CommitStats(Lucene.readSegmentInfos(indexCommit), numDocs); + this.lastComputedCommitStats = commitStats; + return commitStats; + } + } catch (IOException e) { + try { + maybeFailEngine("commit_stats", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw new UncheckedIOException(e); + } } @Override @@ -2208,7 +2211,6 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo())); commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); commitData.put(HISTORY_UUID_KEY, historyUUID); - commitData.put(SOFT_DELETES_COMMIT_KEY, Boolean.toString(softDeleteEnabled)); logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); }); diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 788efade23a51..78b2119d79a36 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -857,20 +857,13 @@ static class LoadedMetadata { } static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logger logger) throws IOException { - final long numDocs; - final Map builder = new HashMap<>(); - final Map commitUserData; + long numDocs; + Map builder = new HashMap<>(); + Map commitUserDataBuilder = new HashMap<>(); try { - final IndexCommit snapshottingCommit; - if (commit == null) { - final List existingCommits = DirectoryReader.listCommits(directory); - snapshottingCommit = existingCommits.get(existingCommits.size() - 1); // take the latest commit in store. - } else { - snapshottingCommit = commit; - } - final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(snapshottingCommit, directory); - numDocs = Lucene.getNumDocs(snapshottingCommit); - commitUserData = Collections.unmodifiableMap(segmentCommitInfos.getUserData()); + final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory); + numDocs = Lucene.getNumDocs(segmentCommitInfos); + commitUserDataBuilder.putAll(segmentCommitInfos.getUserData()); Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion(); // we don't know which version was used to write so we take the max version. for (SegmentCommitInfo info : segmentCommitInfos) { final Version version = info.info.getVersion(); @@ -909,7 +902,7 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg } throw ex; } - return new LoadedMetadata(unmodifiableMap(builder), commitUserData, numDocs); + return new LoadedMetadata(unmodifiableMap(builder), unmodifiableMap(commitUserDataBuilder), numDocs); } private static void checksumFromLuceneFile(Directory directory, String file, Map builder, diff --git a/server/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java b/server/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java index 99db081eceda1..bbfa56a0e55fe 100644 --- a/server/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java +++ b/server/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java @@ -685,8 +685,7 @@ private void pruneOldDeleteGenerations(Set files) { } public List listShardFiles(ShardRouting routing) throws IOException { - NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(routing.currentNodeId()) - .clear().setIndices(false).setFs(true).get(); + NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(routing.currentNodeId()).setFs(true).get(); ClusterState state = client().admin().cluster().prepareState().get().getState(); final Index test = state.metaData().index("test").getIndex(); assertThat(routing.toString(), nodeStatses.getNodes().size(), equalTo(1)); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index cd18489746f43..12acd21903ec4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1114,12 +1114,7 @@ private void assertSameSyncIdSameDocs() { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { - CommitStats commitStats = null; - try { - commitStats = indexShard.commitStats(); - } catch (Exception ex) { - logger.warn("Failed to read commit stats", ex); - } + CommitStats commitStats = indexShard.commitStats(); if (commitStats != null) { // null if the engine is closed or if the shard is recovering String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID); if (syncId != null) { From e35c71c03ac312bb68501ec2b0bbf0482b06152b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 1 May 2018 16:52:21 -0400 Subject: [PATCH 10/15] Make exact numDocs optional --- .../stats/TransportClusterStatsAction.java | 2 +- .../stats/TransportIndicesStatsAction.java | 3 ++- .../elasticsearch/index/engine/Engine.java | 2 +- .../index/engine/InternalEngine.java | 4 ++-- .../elasticsearch/index/shard/IndexShard.java | 9 ++++--- .../index/shard/LocalShardSnapshot.java | 2 +- .../elasticsearch/indices/IndicesService.java | 2 +- .../indices/flush/SyncedFlushService.java | 2 +- .../index/engine/InternalEngineTests.java | 24 +++++++++---------- .../index/shard/IndexShardTests.java | 2 +- .../elasticsearch/indices/flush/FlushIT.java | 6 ++--- .../indices/recovery/RecoveryTests.java | 2 +- .../test/InternalTestCluster.java | 2 +- 13 files changed, 33 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index c87b55b0bbd7d..d213f5e2a64f2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -104,7 +104,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS), - indexShard.commitStats(), + indexShard.commitStats(false), indexShard.seqNoStats())); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index eeefe793db701..21b6410618952 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -158,6 +158,7 @@ protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting sh return new ShardStats( indexShard.routingEntry(), indexShard.shardPath(), - new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags), indexShard.commitStats(), indexShard.seqNoStats()); + new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags), + indexShard.commitStats(false), indexShard.seqNoStats()); } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 0252ec26af136..31822fad134fa 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -576,7 +576,7 @@ protected final void ensureOpen() { } /** get commits stats for the last commit */ - public abstract CommitStats commitStats(); + public abstract CommitStats commitStats(boolean requireExactNumDocs); /** * The sequence number service for this engine. diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7523ff3e3bdfb..f7567d67202e8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1827,8 +1827,8 @@ protected SegmentInfos getLastCommittedSegmentInfos() { } @Override - public CommitStats commitStats() { - if (softDeleteEnabled == false) { + public CommitStats commitStats(boolean requireExactNumDocs) { + if (softDeleteEnabled == false || requireExactNumDocs == false) { final SegmentInfos sis = this.lastCommittedSegmentInfos; return new CommitStats(sis, Lucene.getNumDocs(sis)); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 63db5912c37b4..9e1d3343fa7c1 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -861,12 +861,15 @@ public DocsStats docStats() { } /** - * @return {@link CommitStats} if engine is open, otherwise null + * Returns {@link CommitStats} if engine is open, otherwise null + * + * @param requireExactNumDocs if true the exact number of documents in commit is returned; otherwise an estimate is returned. + * An estimate numDocs value is good enough for any general purpose. */ @Nullable - public CommitStats commitStats() { + public CommitStats commitStats(boolean requireExactNumDocs) { Engine engine = getEngineOrNull(); - return engine == null ? null : engine.commitStats(); + return engine == null ? null : engine.commitStats(requireExactNumDocs); } /** diff --git a/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java b/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java index d7105c0c14d38..8f4707ae94163 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java @@ -66,7 +66,7 @@ long maxSeqNo() { } long maxUnsafeAutoIdTimestamp() { - return Long.parseLong(shard.getEngine().commitStats().getUserData().get(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)); + return Long.parseLong(shard.commitStats(false).getUserData().get(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)); } Directory getSnapshotDirectory() { diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 4954f048b57fb..37950cc0a77ec 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -340,7 +340,7 @@ IndexShardStats indexShardStats(final IndicesService indicesService, final Index new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags), - indexShard.commitStats(), + indexShard.commitStats(false), indexShard.seqNoStats()) }); } diff --git a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index 553744e66ef04..1db59106e278b 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -471,7 +471,7 @@ private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true); logger.trace("{} performing pre sync flush", request.shardId()); indexShard.flush(flushRequest); - final CommitStats commitStats = indexShard.commitStats(); + final CommitStats commitStats = indexShard.commitStats(true); final Engine.CommitId commitId = commitStats.getRawCommitId(); logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs()); return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs(), commitStats.syncId()); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 9bc5181cbbc28..c1ece1fc8d973 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -587,7 +587,7 @@ public long getCheckpoint() { } } )) { - CommitStats stats1 = engine.commitStats(); + CommitStats stats1 = engine.commitStats(randomBoolean()); assertThat(stats1.getGeneration(), greaterThan(0L)); assertThat(stats1.getId(), notNullValue()); assertThat(stats1.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); @@ -610,7 +610,7 @@ public long getCheckpoint() { final Engine.CommitId commitId = engine.flush(true, true); - CommitStats stats2 = engine.commitStats(); + CommitStats stats2 = engine.commitStats(randomBoolean()); assertThat(stats2.getRawCommitId(), equalTo(commitId)); assertThat(stats2.getGeneration(), greaterThan(stats1.getGeneration())); assertThat(stats2.getId(), notNullValue()); @@ -627,7 +627,7 @@ public long getCheckpoint() { } } - public void testCommitStatsNumDocs() throws Exception { + public void testCommitStatsExactNumDocs() throws Exception { Settings.Builder settings = Settings.builder() .put(defaultSettings.getSettings()) .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); @@ -638,7 +638,7 @@ public void testCommitStatsNumDocs() throws Exception { try (Store store = createStore(); Engine softDeletesEngine = createEngine(config(IndexSettingsModule.newIndexSettings(indexMetaData), store, createTempDir(), keepSoftDeleteDocsMP, null))) { - assertNumDocsInCommitStats(softDeletesEngine); + assertExactNumDocsInCommitStats(softDeletesEngine); } // Without soft-deletes settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false); @@ -646,11 +646,11 @@ public void testCommitStatsNumDocs() throws Exception { try (Store store = createStore(); Engine hardDeletesEngine = createEngine(config(IndexSettingsModule.newIndexSettings(indexMetaData), store, createTempDir(), newMergePolicy(), null))) { - assertNumDocsInCommitStats(hardDeletesEngine); + assertExactNumDocsInCommitStats(hardDeletesEngine); } } - private void assertNumDocsInCommitStats(Engine engine) throws IOException { + private void assertExactNumDocsInCommitStats(Engine engine) throws IOException { final Set pendingDocs = new HashSet<>(); int flushedDocs = 0; final int iters = scaledRandomIntBetween(5, 20); @@ -673,7 +673,7 @@ private void assertNumDocsInCommitStats(Engine engine) throws IOException { if (randomBoolean()) { engine.refresh("test"); } - assertThat(engine.commitStats().getNumDocs(), equalTo(flushedDocs)); + assertThat(engine.commitStats(true).getNumDocs(), equalTo(flushedDocs)); } } @@ -1990,14 +1990,14 @@ public void testSeqNoAndCheckpoints() throws IOException { assertThat(globalCheckpoint, equalTo(replicaLocalCheckpoint)); assertThat( - Long.parseLong(initialEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), + Long.parseLong(initialEngine.commitStats(randomBoolean()).getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint)); initialEngine.getTranslog().sync(); // to guarantee the global checkpoint is written to the translog checkpoint assertThat( initialEngine.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); assertThat( - Long.parseLong(initialEngine.commitStats().getUserData().get(SequenceNumbers.MAX_SEQ_NO)), + Long.parseLong(initialEngine.commitStats(randomBoolean()).getUserData().get(SequenceNumbers.MAX_SEQ_NO)), equalTo(maxSeqNo)); } finally { @@ -2010,13 +2010,13 @@ public void testSeqNoAndCheckpoints() throws IOException { assertEquals(primarySeqNo, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); assertThat( - Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), + Long.parseLong(recoveringEngine.commitStats(randomBoolean()).getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(primarySeqNo)); assertThat( recoveringEngine.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); assertThat( - Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.MAX_SEQ_NO)), + Long.parseLong(recoveringEngine.commitStats(randomBoolean()).getUserData().get(SequenceNumbers.MAX_SEQ_NO)), // after recovering from translog, all docs have been flushed to Lucene segments, so here we will assert // that the committed max seq no is equivalent to what the current primary seq no is, as all data // we have assigned sequence numbers to should be in the commit @@ -3743,7 +3743,7 @@ public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierExcepti int i = 0; for (final Map.Entry entry : threads.entrySet()) { - final Map userData = finalActualEngine.commitStats().getUserData(); + final Map userData = finalActualEngine.commitStats(randomBoolean()).getUserData(); assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(Long.toString(3 * i))); assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo(Long.toString(i + generation))); entry.getValue().countDown(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 0c394945bcbf3..6da9d9d5c901d 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1162,7 +1162,7 @@ public void testShardStats() throws IOException { IndexShard shard = newStartedShard(); ShardStats stats = new ShardStats(shard.routingEntry(), shard.shardPath(), - new CommonStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags()), shard.commitStats(), shard.seqNoStats()); + new CommonStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags()), shard.commitStats(false), shard.seqNoStats()); assertEquals(shard.shardPath().getRootDataPath().toString(), stats.getDataPath()); assertEquals(shard.shardPath().getRootStatePath().toString(), stats.getStatePath()); assertEquals(shard.shardPath().isCustomDataPath(), stats.isCustomDataPath()); diff --git a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java index a2149b9d28a0b..343440b937479 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java @@ -330,12 +330,12 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception { .getShardOrNull(shardId); if (randomBoolean()) { // Change the existing sync-id of a single shard. - shard.syncFlush(UUIDs.randomBase64UUID(random()), shard.commitStats().getRawCommitId()); - assertThat(shard.commitStats().syncId(), not(equalTo(thirdSeal.syncId()))); + shard.syncFlush(UUIDs.randomBase64UUID(random()), shard.commitStats(randomBoolean()).getRawCommitId()); + assertThat(shard.commitStats(randomBoolean()).syncId(), not(equalTo(thirdSeal.syncId()))); } else { // Flush will create a new commit without sync-id shard.flush(new FlushRequest(shardId.getIndexName()).force(true).waitIfOngoing(true)); - assertThat(shard.commitStats().syncId(), nullValue()); + assertThat(shard.commitStats(randomBoolean()).syncId(), nullValue()); } final ShardsSyncedFlushResult forthSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId); logger.info("Forth seal: {}", syncedFlushDescription(forthSeal)); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 4e9d0ccb22e11..bfa2f38b3e208 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -223,7 +223,7 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { // history uuid was restored assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID)); - assertThat(newReplica.commitStats().getUserData().get(Engine.HISTORY_UUID_KEY), equalTo(historyUUID)); + assertThat(newReplica.commitStats(false).getUserData().get(Engine.HISTORY_UUID_KEY), equalTo(historyUUID)); shards.assertAllEqual(numDocs); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 12acd21903ec4..062e5f79ec6a7 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1114,7 +1114,7 @@ private void assertSameSyncIdSameDocs() { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { - CommitStats commitStats = indexShard.commitStats(); + CommitStats commitStats = indexShard.commitStats(true); if (commitStats != null) { // null if the engine is closed or if the shard is recovering String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID); if (syncId != null) { From 15d0695f4edbe76e135c2e9e85f08c39b205f45b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 1 May 2018 17:07:00 -0400 Subject: [PATCH 11/15] Fix assertSameSyncIdSameDocs --- .../java/org/elasticsearch/test/InternalTestCluster.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 062e5f79ec6a7..6bc2a7b96a353 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1114,7 +1114,14 @@ private void assertSameSyncIdSameDocs() { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { - CommitStats commitStats = indexShard.commitStats(true); + final CommitStats commitStats; + try { + // In some corrupted tests, we leave a corrupted index with which we may not be able to read commit stats. + commitStats = indexShard.commitStats(true); + } catch (Exception ex) { + logger.warn("Failed to get commit_stats", ex); + continue; + } if (commitStats != null) { // null if the engine is closed or if the shard is recovering String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID); if (syncId != null) { From bbfc2239b7126f989247eba5e6a8a28d87c43356 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 9 May 2018 15:31:35 -0400 Subject: [PATCH 12/15] Back out previous changes --- .../stats/TransportClusterStatsAction.java | 2 +- .../stats/TransportIndicesStatsAction.java | 3 +- .../elasticsearch/index/engine/Engine.java | 6 +- .../index/engine/InternalEngine.java | 32 ---------- .../elasticsearch/index/shard/IndexShard.java | 9 +-- .../index/shard/LocalShardSnapshot.java | 2 +- .../elasticsearch/indices/IndicesService.java | 2 +- .../indices/flush/SyncedFlushService.java | 2 +- .../index/engine/InternalEngineTests.java | 64 ++----------------- .../index/shard/IndexShardTests.java | 2 +- .../elasticsearch/indices/flush/FlushIT.java | 6 +- .../indices/recovery/RecoveryTests.java | 2 +- .../test/InternalTestCluster.java | 9 +-- 13 files changed, 26 insertions(+), 115 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index d213f5e2a64f2..c87b55b0bbd7d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -104,7 +104,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS), - indexShard.commitStats(false), + indexShard.commitStats(), indexShard.seqNoStats())); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index 21b6410618952..eeefe793db701 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -158,7 +158,6 @@ protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting sh return new ShardStats( indexShard.routingEntry(), indexShard.shardPath(), - new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags), - indexShard.commitStats(false), indexShard.seqNoStats()); + new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags), indexShard.commitStats(), indexShard.seqNoStats()); } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 665236758fe8f..179134699925c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -624,7 +624,11 @@ protected final void ensureOpen() { } /** get commits stats for the last commit */ - public abstract CommitStats commitStats(boolean requireExactNumDocs); + public CommitStats commitStats() { + try (Engine.Searcher searcher = acquireSearcher("commit_stats", Engine.SearcherScope.INTERNAL)) { + return new CommitStats(getLastCommittedSegmentInfos(), searcher.reader().numDocs()); + } + } /** * The sequence number service for this engine. diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index d2cf14fb615f8..fc0e23bfdab43 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -35,7 +35,6 @@ import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; @@ -85,7 +84,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -127,7 +125,6 @@ public class InternalEngine extends Engine { private final LiveVersionMap versionMap = new LiveVersionMap(); private volatile SegmentInfos lastCommittedSegmentInfos; - private volatile CommitStats lastComputedCommitStats; private final IndexThrottle throttle; @@ -1855,35 +1852,6 @@ protected SegmentInfos getLastCommittedSegmentInfos() { return lastCommittedSegmentInfos; } - @Override - public CommitStats commitStats(boolean requireExactNumDocs) { - if (softDeleteEnabled == false || requireExactNumDocs == false) { - final SegmentInfos sis = this.lastCommittedSegmentInfos; - return new CommitStats(sis, Lucene.getNumDocs(sis)); - } - // Need to retain the commit as we might open it. - try (IndexCommitRef commitRef = acquireLastIndexCommit(false)) { - final IndexCommit indexCommit = commitRef.getIndexCommit(); - CommitStats commitStats = this.lastComputedCommitStats; - if (commitStats != null && commitStats.getGeneration() == indexCommit.getGeneration()) { - return commitStats; - } - try (DirectoryReader reader = DirectoryReader.open(indexCommit)) { - final int numDocs = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETE_FIELD).numDocs(); - commitStats = new CommitStats(Lucene.readSegmentInfos(indexCommit), numDocs); - this.lastComputedCommitStats = commitStats; - return commitStats; - } - } catch (IOException e) { - try { - maybeFailEngine("commit_stats", e); - } catch (Exception inner) { - e.addSuppressed(inner); - } - throw new UncheckedIOException(e); - } - } - @Override protected final void writerSegmentStats(SegmentsStats stats) { stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed()); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 543b456f94806..6f0e1fc7ce54d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -861,15 +861,12 @@ public DocsStats docStats() { } /** - * Returns {@link CommitStats} if engine is open, otherwise null - * - * @param requireExactNumDocs if true the exact number of documents in commit is returned; otherwise an estimate is returned. - * An estimate numDocs value is good enough for any general purpose. + * @return {@link CommitStats} if engine is open, otherwise null */ @Nullable - public CommitStats commitStats(boolean requireExactNumDocs) { + public CommitStats commitStats() { Engine engine = getEngineOrNull(); - return engine == null ? null : engine.commitStats(requireExactNumDocs); + return engine == null ? null : engine.commitStats(); } /** diff --git a/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java b/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java index 8f4707ae94163..d7105c0c14d38 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java @@ -66,7 +66,7 @@ long maxSeqNo() { } long maxUnsafeAutoIdTimestamp() { - return Long.parseLong(shard.commitStats(false).getUserData().get(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)); + return Long.parseLong(shard.getEngine().commitStats().getUserData().get(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)); } Directory getSnapshotDirectory() { diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 057bda7991423..e787f574fe97d 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -340,7 +340,7 @@ IndexShardStats indexShardStats(final IndicesService indicesService, final Index new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags), - indexShard.commitStats(false), + indexShard.commitStats(), indexShard.seqNoStats()) }); } diff --git a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index 1db59106e278b..553744e66ef04 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -471,7 +471,7 @@ private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true); logger.trace("{} performing pre sync flush", request.shardId()); indexShard.flush(flushRequest); - final CommitStats commitStats = indexShard.commitStats(true); + final CommitStats commitStats = indexShard.commitStats(); final Engine.CommitId commitId = commitStats.getRawCommitId(); logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs()); return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs(), commitStats.syncId()); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 6436f7857a3d4..b623b61f19692 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -594,7 +594,7 @@ public long getCheckpoint() { } } )) { - CommitStats stats1 = engine.commitStats(randomBoolean()); + CommitStats stats1 = engine.commitStats(); assertThat(stats1.getGeneration(), greaterThan(0L)); assertThat(stats1.getId(), notNullValue()); assertThat(stats1.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); @@ -617,7 +617,7 @@ public long getCheckpoint() { final Engine.CommitId commitId = engine.flush(true, true); - CommitStats stats2 = engine.commitStats(randomBoolean()); + CommitStats stats2 = engine.commitStats(); assertThat(stats2.getRawCommitId(), equalTo(commitId)); assertThat(stats2.getGeneration(), greaterThan(stats1.getGeneration())); assertThat(stats2.getId(), notNullValue()); @@ -634,56 +634,6 @@ public long getCheckpoint() { } } - public void testCommitStatsExactNumDocs() throws Exception { - Settings.Builder settings = Settings.builder() - .put(defaultSettings.getSettings()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); - IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); - final MergePolicy keepSoftDeleteDocsMP = new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, - () -> new MatchAllDocsQuery(), newMergePolicy()); - // With soft-deletes - try (Store store = createStore(); - Engine softDeletesEngine = createEngine(config(IndexSettingsModule.newIndexSettings(indexMetaData), store, createTempDir(), - keepSoftDeleteDocsMP, null))) { - assertExactNumDocsInCommitStats(softDeletesEngine); - } - // Without soft-deletes - settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false); - indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); - try (Store store = createStore(); - Engine hardDeletesEngine = createEngine(config(IndexSettingsModule.newIndexSettings(indexMetaData), store, createTempDir(), - newMergePolicy(), null))) { - assertExactNumDocsInCommitStats(hardDeletesEngine); - } - } - - private void assertExactNumDocsInCommitStats(Engine engine) throws IOException { - final Set pendingDocs = new HashSet<>(); - int flushedDocs = 0; - final int iters = scaledRandomIntBetween(5, 20); - for (int i = 0; i < iters; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), SOURCE, null); - engine.index(indexForDoc(doc)); - pendingDocs.add(doc.id()); - if (randomBoolean()) { - engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get())); - pendingDocs.remove(doc.id()); - } - if (randomBoolean()) { - engine.index(indexForDoc(doc)); - pendingDocs.add(doc.id()); - } - if (randomBoolean()) { - engine.flush(); - flushedDocs = pendingDocs.size(); - } - if (randomBoolean()) { - engine.refresh("test"); - } - assertThat(engine.commitStats(true).getNumDocs(), equalTo(flushedDocs)); - } - } - public void testIndexSearcherWrapper() throws Exception { final AtomicInteger counter = new AtomicInteger(); IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { @@ -2117,14 +2067,14 @@ public void testSeqNoAndCheckpoints() throws IOException { assertThat(globalCheckpoint, equalTo(replicaLocalCheckpoint)); assertThat( - Long.parseLong(initialEngine.commitStats(randomBoolean()).getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), + Long.parseLong(initialEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint)); initialEngine.getTranslog().sync(); // to guarantee the global checkpoint is written to the translog checkpoint assertThat( initialEngine.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); assertThat( - Long.parseLong(initialEngine.commitStats(randomBoolean()).getUserData().get(SequenceNumbers.MAX_SEQ_NO)), + Long.parseLong(initialEngine.commitStats().getUserData().get(SequenceNumbers.MAX_SEQ_NO)), equalTo(maxSeqNo)); } finally { @@ -2137,13 +2087,13 @@ public void testSeqNoAndCheckpoints() throws IOException { assertEquals(primarySeqNo, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); assertThat( - Long.parseLong(recoveringEngine.commitStats(randomBoolean()).getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), + Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(primarySeqNo)); assertThat( recoveringEngine.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); assertThat( - Long.parseLong(recoveringEngine.commitStats(randomBoolean()).getUserData().get(SequenceNumbers.MAX_SEQ_NO)), + Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.MAX_SEQ_NO)), // after recovering from translog, all docs have been flushed to Lucene segments, so here we will assert // that the committed max seq no is equivalent to what the current primary seq no is, as all data // we have assigned sequence numbers to should be in the commit @@ -3937,7 +3887,7 @@ public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierExcepti int i = 0; for (final Map.Entry entry : threads.entrySet()) { - final Map userData = finalActualEngine.commitStats(randomBoolean()).getUserData(); + final Map userData = finalActualEngine.commitStats().getUserData(); assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(Long.toString(3 * i))); assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo(Long.toString(i + generation))); entry.getValue().countDown(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 624c0319d300a..7bc4096741073 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1164,7 +1164,7 @@ public void testShardStats() throws IOException { IndexShard shard = newStartedShard(); ShardStats stats = new ShardStats(shard.routingEntry(), shard.shardPath(), - new CommonStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags()), shard.commitStats(false), shard.seqNoStats()); + new CommonStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags()), shard.commitStats(), shard.seqNoStats()); assertEquals(shard.shardPath().getRootDataPath().toString(), stats.getDataPath()); assertEquals(shard.shardPath().getRootStatePath().toString(), stats.getStatePath()); assertEquals(shard.shardPath().isCustomDataPath(), stats.isCustomDataPath()); diff --git a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java index 77413283bf235..27e1c1af2bb83 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java @@ -334,12 +334,12 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception { .getShardOrNull(shardId); if (randomBoolean()) { // Change the existing sync-id of a single shard. - shard.syncFlush(UUIDs.randomBase64UUID(random()), shard.commitStats(randomBoolean()).getRawCommitId()); - assertThat(shard.commitStats(randomBoolean()).syncId(), not(equalTo(thirdSeal.syncId()))); + shard.syncFlush(UUIDs.randomBase64UUID(random()), shard.commitStats().getRawCommitId()); + assertThat(shard.commitStats().syncId(), not(equalTo(thirdSeal.syncId()))); } else { // Flush will create a new commit without sync-id shard.flush(new FlushRequest(shardId.getIndexName()).force(true).waitIfOngoing(true)); - assertThat(shard.commitStats(randomBoolean()).syncId(), nullValue()); + assertThat(shard.commitStats().syncId(), nullValue()); } final ShardsSyncedFlushResult forthSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId); logger.info("Forth seal: {}", syncedFlushDescription(forthSeal)); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 1695666235efd..537409f35d175 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -223,7 +223,7 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { // history uuid was restored assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID)); - assertThat(newReplica.commitStats(false).getUserData().get(Engine.HISTORY_UUID_KEY), equalTo(historyUUID)); + assertThat(newReplica.commitStats().getUserData().get(Engine.HISTORY_UUID_KEY), equalTo(historyUUID)); shards.assertAllEqual(numDocs); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index a9b34236fca02..8e9fdeac76a73 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1116,14 +1116,7 @@ private void assertSameSyncIdSameDocs() { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { - final CommitStats commitStats; - try { - // In some corrupted tests, we leave a corrupted index with which we may not be able to read commit stats. - commitStats = indexShard.commitStats(true); - } catch (Exception ex) { - logger.warn("Failed to get commit_stats", ex); - continue; - } + CommitStats commitStats = indexShard.commitStats(); if (commitStats != null) { // null if the engine is closed or if the shard is recovering String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID); if (syncId != null) { From 5ac26d4e700a759f5067ac8e8fe4376b65cbbde5 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 9 May 2018 21:18:25 -0400 Subject: [PATCH 13/15] Use exact numDocs in synced-flush --- .../elasticsearch/common/lucene/Lucene.java | 11 +++++ .../indices/flush/SyncedFlushService.java | 21 ++++++--- .../recovery/PeerRecoveryTargetService.java | 22 ++++++++-- .../recovery/RecoverySourceHandler.java | 4 +- .../PeerRecoveryTargetServiceTests.java | 35 +++++++++++++++ .../test/InternalTestCluster.java | 43 +++++++++++++------ 6 files changed, 115 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 0fbb0c0e2b23f..25138a2909606 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -44,6 +44,7 @@ import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Explanation; import org.apache.lucene.search.FieldDoc; @@ -149,6 +150,16 @@ public static int getNumDocs(SegmentInfos info) { return numDocs; } + /** + * Unlike {@link #getNumDocs(SegmentInfos)} this method returns a numDocs that always excludes soft-deleted docs. + * This method is expensive thus prefer using {@link #getNumDocs(SegmentInfos)} unless an exact numDocs is required. + */ + public static int getExactNumDocs(IndexCommit commit) throws IOException { + try (DirectoryReader reader = DirectoryReader.open(commit)) { + return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETE_FIELD).numDocs(); + } + } + /** * Reads the segments infos from the given commit, failing if it fails to load */ diff --git a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index 553744e66ef04..53e92a8efba72 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.flush; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.SegmentInfos; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -40,6 +41,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.CountDown; @@ -466,15 +468,24 @@ public String executor() { } } - private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) { + private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) throws IOException { IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true); logger.trace("{} performing pre sync flush", request.shardId()); indexShard.flush(flushRequest); - final CommitStats commitStats = indexShard.commitStats(); - final Engine.CommitId commitId = commitStats.getRawCommitId(); - logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs()); - return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs(), commitStats.syncId()); + try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) { + final SegmentInfos segmentInfos = Lucene.readSegmentInfos(commitRef.getIndexCommit()); + final int numDocs; + if (indexShard.indexSettings().isSoftDeleteEnabled()) { + numDocs = Lucene.getExactNumDocs(commitRef.getIndexCommit()); + } else { + numDocs = Lucene.getNumDocs(segmentInfos); + } + final Engine.CommitId commitId = new Engine.CommitId(segmentInfos.getId()); + final String syncId = segmentInfos.userData.get(Engine.SYNC_COMMIT_ID); + logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, numDocs); + return new PreSyncedFlushResponse(commitId, numDocs, syncId); + } } private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index cb49eed25f8fe..f9c6e05ec6e24 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -34,7 +34,9 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -42,6 +44,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.CombinedDeletionPolicy; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -65,6 +68,7 @@ import java.io.IOException; import java.util.List; +import java.util.Objects; import java.util.StringJoiner; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -289,9 +293,21 @@ public RecoveryResponse newInstance() { * @param recoveryTarget the target of the recovery * @return a snapshot of the store metadata */ - private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) { + static Store.MetadataSnapshot getStoreMetadataSnapshot(final Logger logger, final RecoveryTarget recoveryTarget) { try { - return recoveryTarget.indexShard().snapshotStoreMetadata(); + final Store.MetadataSnapshot snapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); + // The primary shard may need the "exact" numDocs to verify if the commit has syncId. + final boolean softDeleteEnabled = recoveryTarget.indexShard().indexSettings().isSoftDeleteEnabled(); + if (softDeleteEnabled && Strings.hasText(snapshot.getSyncId())) { + final List commits = DirectoryReader.listCommits(recoveryTarget.store().directory()); + final IndexCommit recoveringCommit = commits.get(commits.size() - 1); + if (Objects.equals(recoveringCommit.getUserData().get(Engine.SYNC_COMMIT_ID), snapshot.getSyncId()) == false) { + throw new IllegalStateException("Target index was changed during recovery [" + recoveryTarget + "]"); + } + final int exactNumDocs = Lucene.getExactNumDocs(recoveringCommit); + return new Store.MetadataSnapshot(snapshot.asMap(), snapshot.getCommitUserData(), exactNumDocs); + } + return snapshot; } catch (final org.apache.lucene.index.IndexNotFoundException e) { // happens on an empty folder. no need to log logger.trace("{} shard folder empty, recovering all files", recoveryTarget); @@ -312,7 +328,7 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove final StartRecoveryRequest request; logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); - final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget); + final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(logger, recoveryTarget); logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size()); final long startingSeqNo; diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 4c543aeeb22d4..f72a4b9132894 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -39,6 +39,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -341,7 +342,8 @@ public void phase1(final IndexCommit snapshot, final Supplier translogO recoverySourceSyncId.equals(recoveryTargetSyncId); if (recoverWithSyncId) { final long numDocsTarget = request.metadataSnapshot().getNumDocs(); - final long numDocsSource = recoverySourceMetadata.getNumDocs(); + final boolean softDeletesEnabled = shard.indexSettings().isSoftDeleteEnabled(); + final long numDocsSource = softDeletesEnabled ? Lucene.getExactNumDocs(snapshot) : recoverySourceMetadata.getNumDocs(); if (numDocsTarget != numDocsSource) { throw new IllegalStateException("try to recover " + request.shardId() + " from primary shard with sync id but number " + "of docs differ: " + numDocsSource + " (" + request.sourceNode().getName() + ", primary) vs " + numDocsTarget diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 3b50fa649150c..a7fffcde80d9b 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -24,15 +24,19 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.NoMergePolicy; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.translog.Translog; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static org.hamcrest.Matchers.equalTo; @@ -108,4 +112,35 @@ public void testGetStartingSeqNo() throws Exception { closeShards(replica); } } + + public void testExactNumDocs() throws Exception { + final IndexShard replica = newShard(false); + recoveryEmptyReplica(replica); + long flushedDocs = 0; + final int numDocs = scaledRandomIntBetween(1, 20); + final Set docIds = new HashSet<>(); + for (int i = 0; i < numDocs; i++) { + String id = Integer.toString(i); + docIds.add(id); + indexDoc(replica, "_doc", id); + if (randomBoolean()) { + Engine.CommitId commitId = replica.flush(new FlushRequest()); + replica.syncFlush(UUIDs.randomBase64UUID(), commitId); + flushedDocs = docIds.size(); + } + } + for (String id : randomSubsetOf(docIds)) { + deleteDoc(replica, "_doc", id); + docIds.remove(id); + if (randomBoolean()) { + Engine.CommitId commitId = replica.flush(new FlushRequest()); + replica.syncFlush(UUIDs.randomBase64UUID(), commitId); + flushedDocs = docIds.size(); + } + } + final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); + assertThat(PeerRecoveryTargetService.getStoreMetadataSnapshot(logger, recoveryTarget).getNumDocs(), equalTo(flushedDocs)); + recoveryTarget.decRef(); + closeShards(replica); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 8e9fdeac76a73..0c4ffd62cdde9 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -26,6 +26,8 @@ import com.carrotsearch.randomizedtesting.generators.RandomStrings; import org.apache.logging.log4j.Logger; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; @@ -76,7 +78,9 @@ import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineTestCase; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -1104,8 +1108,7 @@ public void beforeIndexDeletion() throws Exception { // ElasticsearchIntegrationTest must override beforeIndexDeletion() to avoid failures. assertNoPendingIndexOperations(); //check that shards that have same sync id also contain same number of documents - // norelease - AwaitsFix: https://github.com/elastic/elasticsearch/pull/30228 - // assertSameSyncIdSameDocs(); + assertSameSyncIdSameDocs(); assertOpenTranslogReferences(); } @@ -1116,16 +1119,16 @@ private void assertSameSyncIdSameDocs() { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { - CommitStats commitStats = indexShard.commitStats(); - if (commitStats != null) { // null if the engine is closed or if the shard is recovering - String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID); - if (syncId != null) { - long liveDocsOnShard = commitStats.getNumDocs(); - if (docsOnShards.get(syncId) != null) { - assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name + ". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId), equalTo(liveDocsOnShard)); - } else { - docsOnShards.put(syncId, liveDocsOnShard); - } + Tuple commitStats = commitStats(indexShard); + if (commitStats != null) { + String syncId = commitStats.v1(); + long liveDocsOnShard = commitStats.v2(); + if (docsOnShards.get(syncId) != null) { + assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name + + ". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId), + equalTo(liveDocsOnShard)); + } else { + docsOnShards.put(syncId, liveDocsOnShard); } } } @@ -1133,6 +1136,22 @@ private void assertSameSyncIdSameDocs() { } } + private Tuple commitStats(IndexShard indexShard) { + try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) { + final String syncId = commitRef.getIndexCommit().getUserData().get(Engine.SYNC_COMMIT_ID); + // Only read if sync_id exists + if (Strings.hasText(syncId)) { + return Tuple.tuple(syncId, Lucene.getExactNumDocs(commitRef.getIndexCommit())); + } else { + return null; + } + } catch (IllegalIndexShardStateException ex) { + return null; // Shard is closed or not started yet. + } catch (IOException ex) { + throw new AssertionError(ex); + } + } + private void assertNoPendingIndexOperations() throws Exception { assertBusy(() -> { final Collection nodesAndClients = nodes.values(); From 34b19d051be203c28ed3c4614e304d13aef8af55 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 14 May 2018 11:02:35 -0400 Subject: [PATCH 14/15] do not need to check soft-deletes on source --- .../elasticsearch/indices/flush/SyncedFlushService.java | 7 +------ .../indices/recovery/RecoverySourceHandler.java | 3 +-- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index 53e92a8efba72..45024e67c8bf7 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -475,12 +475,7 @@ private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest indexShard.flush(flushRequest); try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) { final SegmentInfos segmentInfos = Lucene.readSegmentInfos(commitRef.getIndexCommit()); - final int numDocs; - if (indexShard.indexSettings().isSoftDeleteEnabled()) { - numDocs = Lucene.getExactNumDocs(commitRef.getIndexCommit()); - } else { - numDocs = Lucene.getNumDocs(segmentInfos); - } + final int numDocs = Lucene.getExactNumDocs(commitRef.getIndexCommit()); final Engine.CommitId commitId = new Engine.CommitId(segmentInfos.getId()); final String syncId = segmentInfos.userData.get(Engine.SYNC_COMMIT_ID); logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, numDocs); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index f72a4b9132894..159e54bbc3344 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -342,8 +342,7 @@ public void phase1(final IndexCommit snapshot, final Supplier translogO recoverySourceSyncId.equals(recoveryTargetSyncId); if (recoverWithSyncId) { final long numDocsTarget = request.metadataSnapshot().getNumDocs(); - final boolean softDeletesEnabled = shard.indexSettings().isSoftDeleteEnabled(); - final long numDocsSource = softDeletesEnabled ? Lucene.getExactNumDocs(snapshot) : recoverySourceMetadata.getNumDocs(); + final long numDocsSource = Lucene.getExactNumDocs(snapshot); if (numDocsTarget != numDocsSource) { throw new IllegalStateException("try to recover " + request.shardId() + " from primary shard with sync id but number " + "of docs differ: " + numDocsSource + " (" + request.sourceNode().getName() + ", primary) vs " + numDocsTarget From c42ca4b47fc8482b8af7178134f19c3e1486acee Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 15 May 2018 12:21:43 -0400 Subject: [PATCH 15/15] always get exact numdocs for snapshot --- .../org/elasticsearch/index/store/Store.java | 12 +++++++++++- .../recovery/PeerRecoveryTargetService.java | 18 +----------------- .../recovery/RecoverySourceHandler.java | 3 +-- .../PeerRecoveryTargetServiceTests.java | 9 +++------ 4 files changed, 16 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index f97553895f3ee..e0dd31c03b2e9 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -862,7 +862,7 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg Map commitUserDataBuilder = new HashMap<>(); try { final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory); - numDocs = Lucene.getNumDocs(segmentCommitInfos); + numDocs = Lucene.getExactNumDocs(commit != null ? commit : findIndexCommit(directory, segmentCommitInfos)); commitUserDataBuilder.putAll(segmentCommitInfos.getUserData()); Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion(); // we don't know which version was used to write so we take the max version. for (SegmentCommitInfo info : segmentCommitInfos) { @@ -945,6 +945,16 @@ public static void hashFile(BytesRefBuilder fileHash, InputStream in, long size) assert fileHash.length() == len : Integer.toString(fileHash.length()) + " != " + Integer.toString(len); } + private static IndexCommit findIndexCommit(Directory directory, SegmentInfos sis) throws IOException { + List commits = DirectoryReader.listCommits(directory); + for (IndexCommit commit : commits) { + if (commit.getSegmentsFileName().equals(sis.getSegmentsFileName())) { + return commit; + } + } + throw new IOException("Index commit [" + sis.getSegmentsFileName() + "] is not found"); + } + @Override public Iterator iterator() { return metadata.values().iterator(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index f9c6e05ec6e24..3e09312bec86f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -34,9 +34,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -44,7 +42,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.CombinedDeletionPolicy; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -68,7 +65,6 @@ import java.io.IOException; import java.util.List; -import java.util.Objects; import java.util.StringJoiner; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -295,19 +291,7 @@ public RecoveryResponse newInstance() { */ static Store.MetadataSnapshot getStoreMetadataSnapshot(final Logger logger, final RecoveryTarget recoveryTarget) { try { - final Store.MetadataSnapshot snapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); - // The primary shard may need the "exact" numDocs to verify if the commit has syncId. - final boolean softDeleteEnabled = recoveryTarget.indexShard().indexSettings().isSoftDeleteEnabled(); - if (softDeleteEnabled && Strings.hasText(snapshot.getSyncId())) { - final List commits = DirectoryReader.listCommits(recoveryTarget.store().directory()); - final IndexCommit recoveringCommit = commits.get(commits.size() - 1); - if (Objects.equals(recoveringCommit.getUserData().get(Engine.SYNC_COMMIT_ID), snapshot.getSyncId()) == false) { - throw new IllegalStateException("Target index was changed during recovery [" + recoveryTarget + "]"); - } - final int exactNumDocs = Lucene.getExactNumDocs(recoveringCommit); - return new Store.MetadataSnapshot(snapshot.asMap(), snapshot.getCommitUserData(), exactNumDocs); - } - return snapshot; + return recoveryTarget.indexShard().snapshotStoreMetadata(); } catch (final org.apache.lucene.index.IndexNotFoundException e) { // happens on an empty folder. no need to log logger.trace("{} shard folder empty, recovering all files", recoveryTarget); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 159e54bbc3344..4c543aeeb22d4 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -39,7 +39,6 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -342,7 +341,7 @@ public void phase1(final IndexCommit snapshot, final Supplier translogO recoverySourceSyncId.equals(recoveryTargetSyncId); if (recoverWithSyncId) { final long numDocsTarget = request.metadataSnapshot().getNumDocs(); - final long numDocsSource = Lucene.getExactNumDocs(snapshot); + final long numDocsSource = recoverySourceMetadata.getNumDocs(); if (numDocsTarget != numDocsSource) { throw new IllegalStateException("try to recover " + request.shardId() + " from primary shard with sync id but number " + "of docs differ: " + numDocsSource + " (" + request.sourceNode().getName() + ", primary) vs " + numDocsTarget diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index a7fffcde80d9b..9c4c1c1e736fd 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -26,7 +26,6 @@ import org.apache.lucene.index.NoMergePolicy; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.common.UUIDs; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; @@ -113,7 +112,7 @@ public void testGetStartingSeqNo() throws Exception { } } - public void testExactNumDocs() throws Exception { + public void testExactNumDocsInStoreMetadataSnapshot() throws Exception { final IndexShard replica = newShard(false); recoveryEmptyReplica(replica); long flushedDocs = 0; @@ -124,8 +123,7 @@ public void testExactNumDocs() throws Exception { docIds.add(id); indexDoc(replica, "_doc", id); if (randomBoolean()) { - Engine.CommitId commitId = replica.flush(new FlushRequest()); - replica.syncFlush(UUIDs.randomBase64UUID(), commitId); + replica.flush(new FlushRequest()); flushedDocs = docIds.size(); } } @@ -133,8 +131,7 @@ public void testExactNumDocs() throws Exception { deleteDoc(replica, "_doc", id); docIds.remove(id); if (randomBoolean()) { - Engine.CommitId commitId = replica.flush(new FlushRequest()); - replica.syncFlush(UUIDs.randomBase64UUID(), commitId); + replica.flush(new FlushRequest()); flushedDocs = docIds.size(); } }