From e301196fa2abc1e2ff5b8af65435a5b01d5d3025 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 27 Mar 2019 16:33:23 +0100 Subject: [PATCH 01/14] Add a merge policy that prunes soft-deleted postings This change adds a merge policy that drops all postings for documents that are marked as deleted. This is usually unnecessary unless soft-deletes are used with a rentention policy since otherwise a merge would remove deleted documents anyway. Yet, this merge policy prevents extreme cases where a very large number of soft-deleted documents are retained and are impacting search and update perfromance. Note, using this merge policy will remove all search capabilities for soft-deleted documents. --- .../index/engine/InternalEngine.java | 3 +- .../engine/PrunePostingsMergePolicy.java | 297 ++++++++++++++++++ .../engine/PrunePostingsMergePolicyTests.java | 111 +++++++ 3 files changed, 410 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java create mode 100644 server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f8dc637ca742c..65644cbdb3b18 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2165,7 +2165,8 @@ private IndexWriterConfig getIndexWriterConfig() { iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD); if (softDeleteEnabled) { mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, softDeletesPolicy::getRetentionQuery, - new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, softDeletesPolicy::getRetentionQuery, mergePolicy)); + new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, softDeletesPolicy::getRetentionQuery, + new PrunePostingsMergePolicy(mergePolicy))); } iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy)); iwc.setSimilarity(engineConfig.getSimilarity()); diff --git a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java new file mode 100644 index 0000000000000..e01cb8ef87fb4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java @@ -0,0 +1,297 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.codecs.FieldsProducer; +import org.apache.lucene.codecs.NormsProducer; +import org.apache.lucene.codecs.TermVectorsReader; +import org.apache.lucene.index.CodecReader; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.Fields; +import org.apache.lucene.index.FilterCodecReader; +import org.apache.lucene.index.FilterLeafReader; +import org.apache.lucene.index.FilterNumericDocValues; +import org.apache.lucene.index.FilteredTermsEnum; +import org.apache.lucene.index.ImpactsEnum; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.OneMergeWrappingMergePolicy; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; +import org.apache.lucene.index.TermState; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.Iterator; + +/** + * This merge policy drops postings for all deleted documents which is useful to guarantee + * consistent search and update performance even if a large number of deleted / updated documents + * are retained. Merging postings away is efficient since lucene visits postings term by term and + * with the original live-docs being available we are adding a negotiable overhead such that we can + * prune soft-deletes by default. Yet, using this merge policy will cause loosing all search capabilities on top of + * soft deleted documents independent of the retention policy. Note, in order for this merge policy to be effective it needs to be added + * before the {@link org.apache.lucene.index.SoftDeletesRetentionMergePolicy} because otherwise only documents that are deleted / removed + * anyways will be pruned. + */ +final class PrunePostingsMergePolicy extends OneMergeWrappingMergePolicy { + + PrunePostingsMergePolicy(MergePolicy in) { + super(in, toWrap -> new OneMerge(toWrap.segments) { + @Override + public CodecReader wrapForMerge(CodecReader reader) throws IOException { + CodecReader wrapped = toWrap.wrapForMerge(reader); + return wrapReader(wrapped); + } + }); + assert in instanceof SoftDeletesRetentionMergePolicy == false : + "wrapped merge policy should not be a SoftDeletesRetentionMergePolicy"; + } + + private static int skipDeletedDocs(DocIdSetIterator iterator, Bits liveDocs) throws IOException { + int docId; + do { + docId = iterator.nextDoc(); + } while (docId != DocIdSetIterator.NO_MORE_DOCS && liveDocs.get(docId) == false); + return docId; + } + + private static CodecReader wrapReader(CodecReader reader) { + Bits liveDocs = reader.getLiveDocs(); + if (liveDocs == null) { + return reader; // no deleted docs - we are good! + } + return new FilterCodecReader(reader) { + + @Override + public NormsProducer getNormsReader() { + NormsProducer normsReader = reader.getNormsReader(); + if (normsReader == null) { + return null; + } + return new NormsProducer() { + @Override + public NumericDocValues getNorms(FieldInfo field) throws IOException { + return new FilterNumericDocValues(normsReader.getNorms(field)) { + @Override + public int nextDoc() throws IOException { + return skipDeletedDocs(in, liveDocs); + } + + @Override + public int advance(int target) throws IOException { + throw new UnsupportedEncodingException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedEncodingException(); + } + }; + } + + @Override + public void checkIntegrity() throws IOException { + normsReader.checkIntegrity();; + } + + @Override + public void close() throws IOException { + normsReader.close(); + } + + @Override + public long ramBytesUsed() { + return normsReader.ramBytesUsed(); + } + }; + } + + @Override + public FieldsProducer getPostingsReader() { + FieldsProducer postingsReader = super.getPostingsReader(); + if (postingsReader == null) { + return null; + } + return new FieldsProducer() { + @Override + public void close() throws IOException { + postingsReader.close(); + } + + @Override + public void checkIntegrity() throws IOException { + postingsReader.checkIntegrity(); + } + + @Override + public Iterator iterator() { + return postingsReader.iterator(); + } + + @Override + public Terms terms(String field) throws IOException { + Terms terms = postingsReader.terms(field); + if (terms == null) { + return null; + } + return new FilterLeafReader.FilterTerms(terms) { + @Override + public TermsEnum iterator() throws IOException { + TermsEnum iterator = super.iterator(); + return new FilteredTermsEnum(iterator) { + @Override + protected AcceptStatus accept(BytesRef term) { + return AcceptStatus.YES; + } + + @Override + public BytesRef next() throws IOException { + return iterator.next(); + } + + @Override + public boolean seekExact(BytesRef text) throws IOException { + return iterator.seekExact(text); + } + + @Override + public SeekStatus seekCeil(BytesRef text) throws IOException { + return iterator.seekCeil(text); + } + + @Override + public void seekExact(long ord) throws IOException { + iterator.seekExact(ord); + } + + @Override + public void seekExact(BytesRef term, TermState state) throws IOException { + iterator.seekExact(term, state); + } + + @Override + public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { + return new FilterLeafReader.FilterPostingsEnum(super.postings(reuse, flags)) { + @Override + public int nextDoc() throws IOException { + return skipDeletedDocs(in, liveDocs); + } + + @Override + public int advance(int target) throws IOException { + throw new UnsupportedEncodingException(); + } + }; + } + + @Override + public ImpactsEnum impacts(int flags) throws IOException { + throw new UnsupportedEncodingException(); + } + + + }; + } + }; + } + + @Override + public int size() { + return postingsReader.size(); + } + + @Override + public long ramBytesUsed() { + return postingsReader.ramBytesUsed(); + } + }; + } + + @Override + public CacheHelper getCoreCacheHelper() { + return null; + } + + @Override + public CacheHelper getReaderCacheHelper() { + return null; + } + + @Override + public TermVectorsReader getTermVectorsReader() { + TermVectorsReader termVectorsReader = super.getTermVectorsReader(); + if (termVectorsReader == null) { + return null; + } + return new FilteredTermVectorsReader(liveDocs, termVectorsReader); + } + }; + } + + private static class FilteredTermVectorsReader extends TermVectorsReader { + private final Bits liveDocs; + private final TermVectorsReader termVectorsReader; + + public FilteredTermVectorsReader(Bits liveDocs, TermVectorsReader termVectorsReader) { + this.liveDocs = liveDocs; + this.termVectorsReader = termVectorsReader; + } + + @Override + public Fields get(int doc) throws IOException { + if (liveDocs.get(doc)) { // we can drop the entire TV for a deleted document. + return termVectorsReader.get(doc); + } else { + return null; + } + } + + @Override + public void checkIntegrity() throws IOException { + termVectorsReader.checkIntegrity(); + } + + @Override + public TermVectorsReader clone() { + return new FilteredTermVectorsReader(liveDocs, termVectorsReader.clone()); + } + + @Override + public void close() throws IOException { + termVectorsReader.close(); + } + + @Override + public long ramBytesUsed() { + return termVectorsReader.ramBytesUsed(); + } + + @Override + public TermVectorsReader getMergeInstance() throws IOException { + return new FilteredTermVectorsReader(liveDocs, termVectorsReader.getMergeInstance()); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java new file mode 100644 index 0000000000000..a574d1444bf82 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; +import org.apache.lucene.index.Term; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +public class PrunePostingsMergePolicyTests extends ESTestCase { + + public void testPrune() throws IOException { + try (Directory dir = newDirectory()) { + IndexWriterConfig iwc = newIndexWriterConfig(); + iwc.setSoftDeletesField("_soft_deletes"); + MergePolicy mp = new SoftDeletesRetentionMergePolicy("_soft_deletes", MatchAllDocsQuery::new, + new PrunePostingsMergePolicy(newLogMergePolicy())); + iwc.setMergePolicy(mp); + boolean sorted = randomBoolean(); + if (sorted) { + iwc.setIndexSort(new Sort(new SortField("sort", SortField.Type.INT))); + } + int numUniqueDocs = randomIntBetween(1, 100); + int numDocs = randomIntBetween(numUniqueDocs, numUniqueDocs * 5); + + try (IndexWriter writer = new IndexWriter(dir, iwc)) { + for (int i = 0; i < numDocs ; i++) { + if (rarely()) { + writer.flush(); + } + if (rarely()) { + writer.forceMerge(1, false); + } + int id = i % numUniqueDocs; + Document doc = new Document(); + doc.add(newStringField("id", "" + id, Field.Store.NO)); + doc.add(newTextField("text", "the quick brown fox", Field.Store.YES)); + doc.add(new NumericDocValuesField("sort", i)); + writer.softUpdateDocument(new Term("id", "" + id), doc, new NumericDocValuesField("_soft_deletes", 1)); + if (i == 0) { + // make sure we have at least 2 segments to ensure we do an actual merge to kick out all postings for + // soft deletes + writer.flush(); + } + } + + writer.forceMerge(1); + try (DirectoryReader reader = DirectoryReader.open(writer)) { + LeafReader leafReader = reader.leaves().get(0).reader(); + assertEquals(numDocs, leafReader.maxDoc()); + Terms id = leafReader.terms("id"); + TermsEnum iterator = id.iterator(); + for (int i = 0; i < numUniqueDocs; i++) { + assertTrue(iterator.seekExact(new BytesRef("" + i))); + assertEquals(1, iterator.docFreq()); + } + iterator = leafReader.terms("text").iterator(); + assertTrue(iterator.seekExact(new BytesRef("quick"))); + assertEquals(numUniqueDocs, iterator.docFreq()); + int numValues = 0; + NumericDocValues sort = leafReader.getNumericDocValues("sort"); + while (sort.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + if (sorted) { + assertEquals(sort.docID(), sort.longValue()); + } else { + assertTrue(sort.longValue() >= 0); + assertTrue(sort.longValue() < numDocs); + } + numValues++; + } + assertEquals(numValues, numDocs); + } + } + } + } +} From aeddcfce8c6e409c7c5f852de2e24c6044c72f16 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 2 Apr 2019 21:30:27 +0200 Subject: [PATCH 02/14] fix checkstyle --- .../elasticsearch/index/engine/PrunePostingsMergePolicy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java index e01cb8ef87fb4..aa0b02e80828c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java @@ -255,7 +255,7 @@ private static class FilteredTermVectorsReader extends TermVectorsReader { private final Bits liveDocs; private final TermVectorsReader termVectorsReader; - public FilteredTermVectorsReader(Bits liveDocs, TermVectorsReader termVectorsReader) { + FilteredTermVectorsReader(Bits liveDocs, TermVectorsReader termVectorsReader) { this.liveDocs = liveDocs; this.termVectorsReader = termVectorsReader; } From 96616a9ddc2ed4c63875a7e7623590657cbe7efb Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 3 Apr 2019 13:34:36 +0200 Subject: [PATCH 03/14] fix assertion --- .../elasticsearch/index/engine/PrunePostingsMergePolicy.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java index aa0b02e80828c..9c96944686d1a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java @@ -66,8 +66,6 @@ public CodecReader wrapForMerge(CodecReader reader) throws IOException { return wrapReader(wrapped); } }); - assert in instanceof SoftDeletesRetentionMergePolicy == false : - "wrapped merge policy should not be a SoftDeletesRetentionMergePolicy"; } private static int skipDeletedDocs(DocIdSetIterator iterator, Bits liveDocs) throws IOException { From ce698f9e12a7969a1425def326942b8d81b92d09 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 3 Apr 2019 13:35:00 +0200 Subject: [PATCH 04/14] fix imports --- .../org/elasticsearch/index/engine/PrunePostingsMergePolicy.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java index 9c96944686d1a..bf6a6aa11a58f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java @@ -34,7 +34,6 @@ import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.OneMergeWrappingMergePolicy; import org.apache.lucene.index.PostingsEnum; -import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.TermState; import org.apache.lucene.index.Terms; import org.apache.lucene.index.TermsEnum; From fb1c59fb1a6bbaa36d002435160270793f6a1ea2 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 29 May 2019 10:06:17 +0200 Subject: [PATCH 05/14] fix compilation --- .../elasticsearch/index/engine/PrunePostingsMergePolicy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java index bf6a6aa11a58f..dd4ee9b3cce6f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java @@ -287,7 +287,7 @@ public long ramBytesUsed() { } @Override - public TermVectorsReader getMergeInstance() throws IOException { + public TermVectorsReader getMergeInstance() { return new FilteredTermVectorsReader(liveDocs, termVectorsReader.getMergeInstance()); } } From d9634d9fab686f53458b1a70535a268fc2fd78d9 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 29 May 2019 10:12:35 +0200 Subject: [PATCH 06/14] add predicate to select fields to prune --- .../index/engine/InternalEngine.java | 2 +- .../engine/PrunePostingsMergePolicy.java | 22 +++++++++++++------ .../engine/PrunePostingsMergePolicyTests.java | 3 ++- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1f09c3395cc0b..cbee3368ecd58 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2161,7 +2161,7 @@ private IndexWriterConfig getIndexWriterConfig() { if (softDeleteEnabled) { mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, softDeletesPolicy::getRetentionQuery, new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, softDeletesPolicy::getRetentionQuery, - new PrunePostingsMergePolicy(mergePolicy))); + new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME::equals))); } iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy)); iwc.setSimilarity(engineConfig.getSimilarity()); diff --git a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java index dd4ee9b3cce6f..62b2a702c8c6e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java @@ -44,6 +44,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Iterator; +import java.util.function.Predicate; /** * This merge policy drops postings for all deleted documents which is useful to guarantee @@ -57,12 +58,12 @@ */ final class PrunePostingsMergePolicy extends OneMergeWrappingMergePolicy { - PrunePostingsMergePolicy(MergePolicy in) { + PrunePostingsMergePolicy(MergePolicy in, Predicate pruneFieldPredicate) { super(in, toWrap -> new OneMerge(toWrap.segments) { @Override public CodecReader wrapForMerge(CodecReader reader) throws IOException { CodecReader wrapped = toWrap.wrapForMerge(reader); - return wrapReader(wrapped); + return wrapReader(wrapped, pruneFieldPredicate); } }); } @@ -75,7 +76,7 @@ private static int skipDeletedDocs(DocIdSetIterator iterator, Bits liveDocs) thr return docId; } - private static CodecReader wrapReader(CodecReader reader) { + private static CodecReader wrapReader(CodecReader reader, Predicate pruneFieldPredicate) { Bits liveDocs = reader.getLiveDocs(); if (liveDocs == null) { return reader; // no deleted docs - we are good! @@ -91,7 +92,11 @@ public NormsProducer getNormsReader() { return new NormsProducer() { @Override public NumericDocValues getNorms(FieldInfo field) throws IOException { - return new FilterNumericDocValues(normsReader.getNorms(field)) { + NumericDocValues in = normsReader.getNorms(field); + if (pruneFieldPredicate.test(field.name) == false) { + return in; + } + return new FilterNumericDocValues(in) { @Override public int nextDoc() throws IOException { return skipDeletedDocs(in, liveDocs); @@ -150,11 +155,14 @@ public Iterator iterator() { @Override public Terms terms(String field) throws IOException { - Terms terms = postingsReader.terms(field); - if (terms == null) { + Terms in = postingsReader.terms(field); + if (in == null) { return null; } - return new FilterLeafReader.FilterTerms(terms) { + if (pruneFieldPredicate.test(field) == false) { + return in; + } + return new FilterLeafReader.FilterTerms(in) { @Override public TermsEnum iterator() throws IOException { TermsEnum iterator = super.iterator(); diff --git a/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java index a574d1444bf82..a5fdc564454d4 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.test.ESTestCase; import java.io.IOException; +import java.util.function.Predicate; public class PrunePostingsMergePolicyTests extends ESTestCase { @@ -49,7 +50,7 @@ public void testPrune() throws IOException { IndexWriterConfig iwc = newIndexWriterConfig(); iwc.setSoftDeletesField("_soft_deletes"); MergePolicy mp = new SoftDeletesRetentionMergePolicy("_soft_deletes", MatchAllDocsQuery::new, - new PrunePostingsMergePolicy(newLogMergePolicy())); + new PrunePostingsMergePolicy(newLogMergePolicy(), t -> true)); iwc.setMergePolicy(mp); boolean sorted = randomBoolean(); if (sorted) { From 3a3fd38a4273e4c161a2de25fc25544e70966d72 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 29 May 2019 12:10:23 +0200 Subject: [PATCH 07/14] only purne ID field --- .../uid/PerThreadIDVersionAndSeqNoLookup.java | 13 +- .../index/engine/InternalEngine.java | 2 +- .../engine/PrunePostingsMergePolicy.java | 207 +++++++----------- .../engine/PrunePostingsMergePolicyTests.java | 80 +++++-- 4 files changed, 154 insertions(+), 148 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java index 889721a49e07a..cb5df1d28bb61 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java @@ -72,12 +72,13 @@ final class PerThreadIDVersionAndSeqNoLookup { final Terms terms = reader.terms(uidField); if (terms == null) { // If a segment contains only no-ops, it does not have _uid but has both _soft_deletes and _tombstone fields. - final NumericDocValues softDeletesDV = reader.getNumericDocValues(Lucene.SOFT_DELETES_FIELD); - final NumericDocValues tombstoneDV = reader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); - if (softDeletesDV == null || tombstoneDV == null) { - throw new IllegalArgumentException("reader does not have _uid terms but not a no-op segment; " + - "_soft_deletes [" + softDeletesDV + "], _tombstone [" + tombstoneDV + "]"); - } + // NOCOMMIT +// final NumericDocValues softDeletesDV = reader.getNumericDocValues(Lucene.SOFT_DELETES_FIELD); +// final NumericDocValues tombstoneDV = reader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); +// if (softDeletesDV == null || tombstoneDV == null) { +// throw new IllegalArgumentException("reader does not have _uid terms but not a no-op segment; " + +// "_soft_deletes [" + softDeletesDV + "], _tombstone [" + tombstoneDV + "]"); +// } termsEnum = null; } else { termsEnum = terms.iterator(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index cbee3368ecd58..3c23909755121 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2161,7 +2161,7 @@ private IndexWriterConfig getIndexWriterConfig() { if (softDeleteEnabled) { mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, softDeletesPolicy::getRetentionQuery, new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, softDeletesPolicy::getRetentionQuery, - new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME::equals))); + new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME, softDeletesPolicy::getRetentionQuery))); } iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy)); iwc.setSimilarity(engineConfig.getSimilarity()); diff --git a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java index 62b2a702c8c6e..0707d328727c3 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java @@ -20,35 +20,37 @@ package org.elasticsearch.index.engine; import org.apache.lucene.codecs.FieldsProducer; -import org.apache.lucene.codecs.NormsProducer; -import org.apache.lucene.codecs.TermVectorsReader; import org.apache.lucene.index.CodecReader; +import org.apache.lucene.index.DocValuesType; import org.apache.lucene.index.FieldInfo; -import org.apache.lucene.index.Fields; import org.apache.lucene.index.FilterCodecReader; import org.apache.lucene.index.FilterLeafReader; -import org.apache.lucene.index.FilterNumericDocValues; import org.apache.lucene.index.FilteredTermsEnum; import org.apache.lucene.index.ImpactsEnum; import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.OneMergeWrappingMergePolicy; import org.apache.lucene.index.PostingsEnum; import org.apache.lucene.index.TermState; import org.apache.lucene.index.Terms; import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.BitSet; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Iterator; -import java.util.function.Predicate; +import java.util.function.Supplier; /** - * This merge policy drops postings for all deleted documents which is useful to guarantee - * consistent search and update performance even if a large number of deleted / updated documents + * This merge policy drops id field postings for all delete documents as well as all docs within the provided retention policy this can be + * useful to guarantee consistent search and update performance even if a large number of deleted / updated documents * are retained. Merging postings away is efficient since lucene visits postings term by term and * with the original live-docs being available we are adding a negotiable overhead such that we can * prune soft-deletes by default. Yet, using this merge policy will cause loosing all search capabilities on top of @@ -58,12 +60,18 @@ */ final class PrunePostingsMergePolicy extends OneMergeWrappingMergePolicy { - PrunePostingsMergePolicy(MergePolicy in, Predicate pruneFieldPredicate) { + PrunePostingsMergePolicy(MergePolicy in, String idField, Supplier retentionQuery) { super(in, toWrap -> new OneMerge(toWrap.segments) { @Override public CodecReader wrapForMerge(CodecReader reader) throws IOException { + FieldInfo fieldInfo = reader.getFieldInfos().fieldInfo(idField); + if (fieldInfo != null + && (fieldInfo.hasNorms() || fieldInfo.hasVectors() || fieldInfo.getDocValuesType() != DocValuesType.NONE)) { + // TODO can we guarantee this? + throw new IllegalStateException(idField + " must not have norms, vectors or doc-values"); + } CodecReader wrapped = toWrap.wrapForMerge(reader); - return wrapReader(wrapped, pruneFieldPredicate); + return wrapReader(wrapped, idField, retentionQuery); } }); } @@ -76,60 +84,43 @@ private static int skipDeletedDocs(DocIdSetIterator iterator, Bits liveDocs) thr return docId; } - private static CodecReader wrapReader(CodecReader reader, Predicate pruneFieldPredicate) { - Bits liveDocs = reader.getLiveDocs(); - if (liveDocs == null) { - return reader; // no deleted docs - we are good! - } - return new FilterCodecReader(reader) { - - @Override - public NormsProducer getNormsReader() { - NormsProducer normsReader = reader.getNormsReader(); - if (normsReader == null) { - return null; + private static Bits processLiveDocs(Bits liveDocs, Supplier retentionQuery, CodecReader reader) throws IOException { + Scorer scorer = getScorer(retentionQuery.get(), reader); + if (scorer != null) { + BitSet retentionDocs = BitSet.of(scorer.iterator(), reader.maxDoc()); + if (liveDocs == null) { + return retentionDocs; + } + return new Bits() { + @Override + public boolean get(int index) { + return liveDocs.get(index) && retentionDocs.get(index); } - return new NormsProducer() { - @Override - public NumericDocValues getNorms(FieldInfo field) throws IOException { - NumericDocValues in = normsReader.getNorms(field); - if (pruneFieldPredicate.test(field.name) == false) { - return in; - } - return new FilterNumericDocValues(in) { - @Override - public int nextDoc() throws IOException { - return skipDeletedDocs(in, liveDocs); - } - - @Override - public int advance(int target) throws IOException { - throw new UnsupportedEncodingException(); - } - @Override - public boolean advanceExact(int target) throws IOException { - throw new UnsupportedEncodingException(); - } - }; - } + @Override + public int length() { + return reader.maxDoc(); + } + }; + } else { + return new Bits.MatchNoBits(reader.maxDoc()); + } + } - @Override - public void checkIntegrity() throws IOException { - normsReader.checkIntegrity();; - } + private static Scorer getScorer(Query query, CodecReader reader) throws IOException { + IndexSearcher s = new IndexSearcher(reader); + s.setQueryCache(null); + Weight weight = s.createWeight(s.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1.0f); + return weight.scorer(reader.getContext()); + } - @Override - public void close() throws IOException { - normsReader.close(); - } - @Override - public long ramBytesUsed() { - return normsReader.ramBytesUsed(); - } - }; - } + private static CodecReader wrapReader(CodecReader reader, String idField, Supplier retentionQuery) throws IOException { + Bits liveDocs = processLiveDocs(reader.getLiveDocs(), retentionQuery, reader); + if (liveDocs == null) { + return reader; // no deleted docs - we are good! + } + return new FilterCodecReader(reader) { @Override public FieldsProducer getPostingsReader() { @@ -156,10 +147,7 @@ public Iterator iterator() { @Override public Terms terms(String field) throws IOException { Terms in = postingsReader.terms(field); - if (in == null) { - return null; - } - if (pruneFieldPredicate.test(field) == false) { + if (in == null || idField.equals(field) == false) { return in; } return new FilterLeafReader.FilterTerms(in) { @@ -167,6 +155,7 @@ public Terms terms(String field) throws IOException { public TermsEnum iterator() throws IOException { TermsEnum iterator = super.iterator(); return new FilteredTermsEnum(iterator) { + private PostingsEnum internal; @Override protected AcceptStatus accept(BytesRef term) { return AcceptStatus.YES; @@ -174,35 +163,51 @@ protected AcceptStatus accept(BytesRef term) { @Override public BytesRef next() throws IOException { - return iterator.next(); + if (liveDocs instanceof Bits.MatchNoBits) { + return null; // short-cut this if we don't match anything + } + BytesRef ref; + while ((ref = iterator.next()) != null) { + internal = super.postings(internal, PostingsEnum.NONE); + if (skipDeletedDocs(internal, liveDocs) != DocIdSetIterator.NO_MORE_DOCS) { + break; + } + } + return ref; } @Override - public boolean seekExact(BytesRef text) throws IOException { - return iterator.seekExact(text); + public boolean seekExact(BytesRef text) { + throw new UnsupportedOperationException("This TermsEnum can not seek"); } @Override - public SeekStatus seekCeil(BytesRef text) throws IOException { - return iterator.seekCeil(text); + public SeekStatus seekCeil(BytesRef text) { + throw new UnsupportedOperationException("This TermsEnum can not seek"); } @Override - public void seekExact(long ord) throws IOException { - iterator.seekExact(ord); + public void seekExact(long ord) { + throw new UnsupportedOperationException("This TermsEnum can not seek"); } @Override - public void seekExact(BytesRef term, TermState state) throws IOException { - iterator.seekExact(term, state); + public void seekExact(BytesRef term, TermState state) { + throw new UnsupportedOperationException("This TermsEnum can not seek"); } @Override - public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { - return new FilterLeafReader.FilterPostingsEnum(super.postings(reuse, flags)) { + public PostingsEnum postings(PostingsEnum reuse, int flags) { + assert internal != null; + return new FilterLeafReader.FilterPostingsEnum(internal) { + @Override public int nextDoc() throws IOException { - return skipDeletedDocs(in, liveDocs); + int currentDocId = in.docID(); + if (currentDocId != NO_MORE_DOCS) { + skipDeletedDocs(in, liveDocs); + } + return currentDocId; } @Override @@ -244,59 +249,7 @@ public CacheHelper getCoreCacheHelper() { public CacheHelper getReaderCacheHelper() { return null; } - - @Override - public TermVectorsReader getTermVectorsReader() { - TermVectorsReader termVectorsReader = super.getTermVectorsReader(); - if (termVectorsReader == null) { - return null; - } - return new FilteredTermVectorsReader(liveDocs, termVectorsReader); - } }; } - private static class FilteredTermVectorsReader extends TermVectorsReader { - private final Bits liveDocs; - private final TermVectorsReader termVectorsReader; - - FilteredTermVectorsReader(Bits liveDocs, TermVectorsReader termVectorsReader) { - this.liveDocs = liveDocs; - this.termVectorsReader = termVectorsReader; - } - - @Override - public Fields get(int doc) throws IOException { - if (liveDocs.get(doc)) { // we can drop the entire TV for a deleted document. - return termVectorsReader.get(doc); - } else { - return null; - } - } - - @Override - public void checkIntegrity() throws IOException { - termVectorsReader.checkIntegrity(); - } - - @Override - public TermVectorsReader clone() { - return new FilteredTermVectorsReader(liveDocs, termVectorsReader.clone()); - } - - @Override - public void close() throws IOException { - termVectorsReader.close(); - } - - @Override - public long ramBytesUsed() { - return termVectorsReader.ramBytesUsed(); - } - - @Override - public TermVectorsReader getMergeInstance() { - return new FilteredTermVectorsReader(liveDocs, termVectorsReader.getMergeInstance()); - } - } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java index a5fdc564454d4..939608c39c718 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java @@ -22,6 +22,7 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; @@ -34,6 +35,8 @@ import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.MatchNoDocsQuery; +import org.apache.lucene.search.Query; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.store.Directory; @@ -41,16 +44,18 @@ import org.elasticsearch.test.ESTestCase; import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; public class PrunePostingsMergePolicyTests extends ESTestCase { public void testPrune() throws IOException { + AtomicReference retention = new AtomicReference<>(new MatchAllDocsQuery()); try (Directory dir = newDirectory()) { IndexWriterConfig iwc = newIndexWriterConfig(); iwc.setSoftDeletesField("_soft_deletes"); MergePolicy mp = new SoftDeletesRetentionMergePolicy("_soft_deletes", MatchAllDocsQuery::new, - new PrunePostingsMergePolicy(newLogMergePolicy(), t -> true)); + new PrunePostingsMergePolicy(newLogMergePolicy(), "id", retention::get)); iwc.setMergePolicy(mp); boolean sorted = randomBoolean(); if (sorted) { @@ -69,7 +74,7 @@ public void testPrune() throws IOException { } int id = i % numUniqueDocs; Document doc = new Document(); - doc.add(newStringField("id", "" + id, Field.Store.NO)); + doc.add(new StringField("id", "" + id, Field.Store.NO)); doc.add(newTextField("text", "the quick brown fox", Field.Store.YES)); doc.add(new NumericDocValuesField("sort", i)); writer.softUpdateDocument(new Term("id", "" + id), doc, new NumericDocValuesField("_soft_deletes", 1)); @@ -92,21 +97,68 @@ public void testPrune() throws IOException { } iterator = leafReader.terms("text").iterator(); assertTrue(iterator.seekExact(new BytesRef("quick"))); - assertEquals(numUniqueDocs, iterator.docFreq()); - int numValues = 0; - NumericDocValues sort = leafReader.getNumericDocValues("sort"); - while (sort.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { - if (sorted) { - assertEquals(sort.docID(), sort.longValue()); - } else { - assertTrue(sort.longValue() >= 0); - assertTrue(sort.longValue() < numDocs); - } - numValues++; + assertEquals(leafReader.maxDoc(), iterator.docFreq()); + int numValues = 0; + NumericDocValues sort = leafReader.getNumericDocValues("sort"); + while (sort.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + if (sorted) { + assertEquals(sort.docID(), sort.longValue()); + } else { + assertTrue(sort.longValue() >= 0); + assertTrue(sort.longValue() < numDocs); } - assertEquals(numValues, numDocs); + numValues++; + } + assertEquals(numValues, numDocs); + } + { + // prune away a single ID + Document doc = new Document(); + doc.add(new StringField("id", "test", Field.Store.NO)); + writer.deleteDocuments(new Term("id", "test")); + writer.flush(); + writer.forceMerge(1); + writer.updateNumericDocValue(new Term("id", "test"), "_soft_deletes", 1);// delete it + writer.flush(); + writer.forceMerge(1); + + try (DirectoryReader reader = DirectoryReader.open(writer)) { + LeafReader leafReader = reader.leaves().get(0).reader(); + assertEquals(numDocs, leafReader.maxDoc()); + Terms id = leafReader.terms("id"); + TermsEnum iterator = id.iterator(); + for (int i = 0; i < numUniqueDocs; i++) { + assertTrue(iterator.seekExact(new BytesRef("" + i))); + assertEquals(1, iterator.docFreq()); + } + assertFalse(iterator.seekExact(new BytesRef("test"))); + iterator = leafReader.terms("text").iterator(); + assertTrue(iterator.seekExact(new BytesRef("quick"))); + assertEquals(leafReader.maxDoc(), iterator.docFreq()); + } + } + + { // drop all ids + retention.set(new MatchNoDocsQuery()); + Document doc = new Document(); + doc.add(new StringField("id", "0", Field.Store.NO)); + doc.add(newTextField("text", "the quick brown fox", Field.Store.YES)); + doc.add(new NumericDocValuesField("sort", 0)); + writer.softUpdateDocument(new Term("id", "0"), doc, new NumericDocValuesField("_soft_deletes", 1)); + writer.flush(); + writer.forceMerge(1); + + try (DirectoryReader reader = DirectoryReader.open(writer)) { + LeafReader leafReader = reader.leaves().get(0).reader(); + assertEquals(numDocs + 1, leafReader.maxDoc()); + assertNull(leafReader.terms("id")); + TermsEnum iterator = leafReader.terms("text").iterator(); + assertTrue(iterator.seekExact(new BytesRef("quick"))); + assertEquals(leafReader.maxDoc(), iterator.docFreq()); + } } } + } } } From 73e97c06047d79bd8ea006019bde080dc682f15d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 4 Jun 2019 09:44:05 +0200 Subject: [PATCH 08/14] beef up test --- .../index/engine/PrunePostingsMergePolicyTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java index 939608c39c718..3026bf971ef7e 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java @@ -127,6 +127,7 @@ public void testPrune() throws IOException { assertEquals(numDocs, leafReader.maxDoc()); Terms id = leafReader.terms("id"); TermsEnum iterator = id.iterator(); + assertEquals(numUniqueDocs, id.size()); for (int i = 0; i < numUniqueDocs; i++) { assertTrue(iterator.seekExact(new BytesRef("" + i))); assertEquals(1, iterator.docFreq()); From 1cd7a6b43c5e226ffa6e4115a3bd73ea628e387d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 4 Jun 2019 14:28:04 +0200 Subject: [PATCH 09/14] roll back retention query --- .../uid/PerThreadIDVersionAndSeqNoLookup.java | 13 +++-- .../index/engine/InternalEngine.java | 2 +- .../engine/PrunePostingsMergePolicy.java | 54 ++++--------------- .../engine/PrunePostingsMergePolicyTests.java | 16 +++--- 4 files changed, 28 insertions(+), 57 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java index cb5df1d28bb61..889721a49e07a 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java @@ -72,13 +72,12 @@ final class PerThreadIDVersionAndSeqNoLookup { final Terms terms = reader.terms(uidField); if (terms == null) { // If a segment contains only no-ops, it does not have _uid but has both _soft_deletes and _tombstone fields. - // NOCOMMIT -// final NumericDocValues softDeletesDV = reader.getNumericDocValues(Lucene.SOFT_DELETES_FIELD); -// final NumericDocValues tombstoneDV = reader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); -// if (softDeletesDV == null || tombstoneDV == null) { -// throw new IllegalArgumentException("reader does not have _uid terms but not a no-op segment; " + -// "_soft_deletes [" + softDeletesDV + "], _tombstone [" + tombstoneDV + "]"); -// } + final NumericDocValues softDeletesDV = reader.getNumericDocValues(Lucene.SOFT_DELETES_FIELD); + final NumericDocValues tombstoneDV = reader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); + if (softDeletesDV == null || tombstoneDV == null) { + throw new IllegalArgumentException("reader does not have _uid terms but not a no-op segment; " + + "_soft_deletes [" + softDeletesDV + "], _tombstone [" + tombstoneDV + "]"); + } termsEnum = null; } else { termsEnum = terms.iterator(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3c23909755121..2b9691f9d8079 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2161,7 +2161,7 @@ private IndexWriterConfig getIndexWriterConfig() { if (softDeleteEnabled) { mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, softDeletesPolicy::getRetentionQuery, new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, softDeletesPolicy::getRetentionQuery, - new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME, softDeletesPolicy::getRetentionQuery))); + new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME))); } iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy)); iwc.setSimilarity(engineConfig.getSimilarity()); diff --git a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java index 0707d328727c3..0e289c15f8a0f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java @@ -60,7 +60,7 @@ */ final class PrunePostingsMergePolicy extends OneMergeWrappingMergePolicy { - PrunePostingsMergePolicy(MergePolicy in, String idField, Supplier retentionQuery) { + PrunePostingsMergePolicy(MergePolicy in, String idField) { super(in, toWrap -> new OneMerge(toWrap.segments) { @Override public CodecReader wrapForMerge(CodecReader reader) throws IOException { @@ -71,52 +71,13 @@ public CodecReader wrapForMerge(CodecReader reader) throws IOException { throw new IllegalStateException(idField + " must not have norms, vectors or doc-values"); } CodecReader wrapped = toWrap.wrapForMerge(reader); - return wrapReader(wrapped, idField, retentionQuery); + return wrapReader(wrapped, idField); } }); } - private static int skipDeletedDocs(DocIdSetIterator iterator, Bits liveDocs) throws IOException { - int docId; - do { - docId = iterator.nextDoc(); - } while (docId != DocIdSetIterator.NO_MORE_DOCS && liveDocs.get(docId) == false); - return docId; - } - - private static Bits processLiveDocs(Bits liveDocs, Supplier retentionQuery, CodecReader reader) throws IOException { - Scorer scorer = getScorer(retentionQuery.get(), reader); - if (scorer != null) { - BitSet retentionDocs = BitSet.of(scorer.iterator(), reader.maxDoc()); - if (liveDocs == null) { - return retentionDocs; - } - return new Bits() { - @Override - public boolean get(int index) { - return liveDocs.get(index) && retentionDocs.get(index); - } - - @Override - public int length() { - return reader.maxDoc(); - } - }; - } else { - return new Bits.MatchNoBits(reader.maxDoc()); - } - } - - private static Scorer getScorer(Query query, CodecReader reader) throws IOException { - IndexSearcher s = new IndexSearcher(reader); - s.setQueryCache(null); - Weight weight = s.createWeight(s.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1.0f); - return weight.scorer(reader.getContext()); - } - - - private static CodecReader wrapReader(CodecReader reader, String idField, Supplier retentionQuery) throws IOException { - Bits liveDocs = processLiveDocs(reader.getLiveDocs(), retentionQuery, reader); + private static CodecReader wrapReader(CodecReader reader, String idField) { + Bits liveDocs = reader.getLiveDocs(); if (liveDocs == null) { return reader; // no deleted docs - we are good! } @@ -252,4 +213,11 @@ public CacheHelper getReaderCacheHelper() { }; } + private static int skipDeletedDocs(DocIdSetIterator iterator, Bits liveDocs) throws IOException { + int docId; + do { + docId = iterator.nextDoc(); + } while (docId != DocIdSetIterator.NO_MORE_DOCS && liveDocs.get(docId) == false); + return docId; + } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java index 3026bf971ef7e..227629ac8b13a 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java @@ -50,12 +50,11 @@ public class PrunePostingsMergePolicyTests extends ESTestCase { public void testPrune() throws IOException { - AtomicReference retention = new AtomicReference<>(new MatchAllDocsQuery()); try (Directory dir = newDirectory()) { IndexWriterConfig iwc = newIndexWriterConfig(); iwc.setSoftDeletesField("_soft_deletes"); MergePolicy mp = new SoftDeletesRetentionMergePolicy("_soft_deletes", MatchAllDocsQuery::new, - new PrunePostingsMergePolicy(newLogMergePolicy(), "id", retention::get)); + new PrunePostingsMergePolicy(newLogMergePolicy(), "id")); iwc.setMergePolicy(mp); boolean sorted = randomBoolean(); if (sorted) { @@ -140,18 +139,23 @@ public void testPrune() throws IOException { } { // drop all ids - retention.set(new MatchNoDocsQuery()); + // first add a doc such that we can force merge Document doc = new Document(); - doc.add(new StringField("id", "0", Field.Store.NO)); + doc.add(new StringField("id", "" + 0, Field.Store.NO)); doc.add(newTextField("text", "the quick brown fox", Field.Store.YES)); doc.add(new NumericDocValuesField("sort", 0)); - writer.softUpdateDocument(new Term("id", "0"), doc, new NumericDocValuesField("_soft_deletes", 1)); + writer.softUpdateDocument(new Term("id", "" + 0), doc, new NumericDocValuesField("_soft_deletes", 1)); + for (int i = 0; i < numUniqueDocs; i++) { + writer.updateNumericDocValue(new Term("id", "" + i), "_soft_deletes", 1); + } writer.flush(); writer.forceMerge(1); + try (DirectoryReader reader = DirectoryReader.open(writer)) { LeafReader leafReader = reader.leaves().get(0).reader(); - assertEquals(numDocs + 1, leafReader.maxDoc()); + assertEquals(numDocs+1, leafReader.maxDoc()); + assertEquals(0, leafReader.numDocs()); assertNull(leafReader.terms("id")); TermsEnum iterator = leafReader.terms("text").iterator(); assertTrue(iterator.seekExact(new BytesRef("quick"))); From d357728caca22283bbe10a58088c1c518e031b5b Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 4 Jun 2019 15:08:17 +0200 Subject: [PATCH 10/14] foo --- .../engine/PrunePostingsMergePolicy.java | 198 +++++++++--------- 1 file changed, 100 insertions(+), 98 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java index 0e289c15f8a0f..8368d63da0d28 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java @@ -21,8 +21,6 @@ import org.apache.lucene.codecs.FieldsProducer; import org.apache.lucene.index.CodecReader; -import org.apache.lucene.index.DocValuesType; -import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.FilterCodecReader; import org.apache.lucene.index.FilterLeafReader; import org.apache.lucene.index.FilteredTermsEnum; @@ -30,27 +28,19 @@ import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.OneMergeWrappingMergePolicy; import org.apache.lucene.index.PostingsEnum; -import org.apache.lucene.index.TermState; import org.apache.lucene.index.Terms; import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.Scorer; -import org.apache.lucene.search.Weight; -import org.apache.lucene.util.BitSet; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Iterator; -import java.util.function.Supplier; /** - * This merge policy drops id field postings for all delete documents as well as all docs within the provided retention policy this can be - * useful to guarantee consistent search and update performance even if a large number of deleted / updated documents + * This merge policy drops id field postings for all delete documents this can be + * useful to guarantee consistent update performance even if a large number of deleted / updated documents * are retained. Merging postings away is efficient since lucene visits postings term by term and * with the original live-docs being available we are adding a negotiable overhead such that we can * prune soft-deletes by default. Yet, using this merge policy will cause loosing all search capabilities on top of @@ -64,12 +54,6 @@ final class PrunePostingsMergePolicy extends OneMergeWrappingMergePolicy { super(in, toWrap -> new OneMerge(toWrap.segments) { @Override public CodecReader wrapForMerge(CodecReader reader) throws IOException { - FieldInfo fieldInfo = reader.getFieldInfos().fieldInfo(idField); - if (fieldInfo != null - && (fieldInfo.hasNorms() || fieldInfo.hasVectors() || fieldInfo.getDocValuesType() != DocValuesType.NONE)) { - // TODO can we guarantee this? - throw new IllegalStateException(idField + " must not have norms, vectors or doc-values"); - } CodecReader wrapped = toWrap.wrapForMerge(reader); return wrapReader(wrapped, idField); } @@ -81,6 +65,7 @@ private static CodecReader wrapReader(CodecReader reader, String idField) { if (liveDocs == null) { return reader; // no deleted docs - we are good! } + final boolean fullyDeletedSegment = reader.numDocs() == 0; return new FilterCodecReader(reader) { @Override @@ -108,85 +93,46 @@ public Iterator iterator() { @Override public Terms terms(String field) throws IOException { Terms in = postingsReader.terms(field); - if (in == null || idField.equals(field) == false) { - return in; - } - return new FilterLeafReader.FilterTerms(in) { - @Override - public TermsEnum iterator() throws IOException { - TermsEnum iterator = super.iterator(); - return new FilteredTermsEnum(iterator) { - private PostingsEnum internal; - @Override - protected AcceptStatus accept(BytesRef term) { - return AcceptStatus.YES; - } - - @Override - public BytesRef next() throws IOException { - if (liveDocs instanceof Bits.MatchNoBits) { - return null; // short-cut this if we don't match anything - } - BytesRef ref; - while ((ref = iterator.next()) != null) { - internal = super.postings(internal, PostingsEnum.NONE); - if (skipDeletedDocs(internal, liveDocs) != DocIdSetIterator.NO_MORE_DOCS) { - break; + if (idField.equals(field) && in != null) { + return new FilterLeafReader.FilterTerms(in) { + @Override + public TermsEnum iterator() throws IOException { + TermsEnum iterator = super.iterator(); + return new FilteredTermsEnum(iterator, false) { + private PostingsEnum internal; + + @Override + protected AcceptStatus accept(BytesRef term) throws IOException { + if (fullyDeletedSegment) { + return AcceptStatus.END; // short-cut this if we don't match anything } - } - return ref; - } - - @Override - public boolean seekExact(BytesRef text) { - throw new UnsupportedOperationException("This TermsEnum can not seek"); - } - - @Override - public SeekStatus seekCeil(BytesRef text) { - throw new UnsupportedOperationException("This TermsEnum can not seek"); - } - - @Override - public void seekExact(long ord) { - throw new UnsupportedOperationException("This TermsEnum can not seek"); - } - - @Override - public void seekExact(BytesRef term, TermState state) { - throw new UnsupportedOperationException("This TermsEnum can not seek"); - } - - @Override - public PostingsEnum postings(PostingsEnum reuse, int flags) { - assert internal != null; - return new FilterLeafReader.FilterPostingsEnum(internal) { - - @Override - public int nextDoc() throws IOException { - int currentDocId = in.docID(); - if (currentDocId != NO_MORE_DOCS) { - skipDeletedDocs(in, liveDocs); - } - return currentDocId; + internal = postings(internal, PostingsEnum.NONE); + if (internal.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + return AcceptStatus.YES; } + return AcceptStatus.NO; + } - @Override - public int advance(int target) throws IOException { - throw new UnsupportedEncodingException(); + @Override + public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { + if (reuse != null && reuse instanceof OnlyLiveDocsPostingsEnum) { + OnlyLiveDocsPostingsEnum reuseInstance = (OnlyLiveDocsPostingsEnum) reuse; + reuseInstance.reset(super.postings(reuseInstance.in, flags)); + return reuseInstance; } - }; - } - - @Override - public ImpactsEnum impacts(int flags) throws IOException { - throw new UnsupportedEncodingException(); - } - + return new OnlyLiveDocsPostingsEnum(super.postings(null, flags), liveDocs); + } - }; - } - }; + @Override + public ImpactsEnum impacts(int flags) throws IOException { + throw new UnsupportedEncodingException(); + } + }; + } + }; + } else { + return in; + } } @Override @@ -213,11 +159,67 @@ public CacheHelper getReaderCacheHelper() { }; } - private static int skipDeletedDocs(DocIdSetIterator iterator, Bits liveDocs) throws IOException { - int docId; - do { - docId = iterator.nextDoc(); - } while (docId != DocIdSetIterator.NO_MORE_DOCS && liveDocs.get(docId) == false); - return docId; + private static final class OnlyLiveDocsPostingsEnum extends PostingsEnum { + + private final Bits liveDocs; + private PostingsEnum in; + + public OnlyLiveDocsPostingsEnum(PostingsEnum in, Bits liveDocs) { + this.liveDocs = liveDocs; + reset(in); + } + + void reset(PostingsEnum in) { + this.in = in; + } + + @Override + public int docID() { + return in.docID(); + } + + @Override + public int nextDoc() throws IOException { + int docId; + do { + docId = in.nextDoc(); + } while (docId != DocIdSetIterator.NO_MORE_DOCS && liveDocs.get(docId) == false); + return docId; + } + + @Override + public int advance(int target) { + throw new UnsupportedOperationException(); + } + + @Override + public long cost() { + return in.cost(); + } + + @Override + public int freq() throws IOException { + return in.freq(); + } + + @Override + public int nextPosition() throws IOException { + return in.nextPosition(); + } + + @Override + public int startOffset() throws IOException { + return in.startOffset(); + } + + @Override + public int endOffset() throws IOException { + return in.endOffset(); + } + + @Override + public BytesRef getPayload() throws IOException { + return in.getPayload(); + } } } From e4645414371689df267246d3985b0fb9fa9037ad Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 4 Jun 2019 15:17:40 +0200 Subject: [PATCH 11/14] remove redundant modifier --- .../elasticsearch/index/engine/PrunePostingsMergePolicy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java index 8368d63da0d28..77fe7d73b211d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java @@ -164,7 +164,7 @@ private static final class OnlyLiveDocsPostingsEnum extends PostingsEnum { private final Bits liveDocs; private PostingsEnum in; - public OnlyLiveDocsPostingsEnum(PostingsEnum in, Bits liveDocs) { + OnlyLiveDocsPostingsEnum(PostingsEnum in, Bits liveDocs) { this.liveDocs = liveDocs; reset(in); } From bedb784050b8f9891dfb38ad7568db1f061dfe41 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 4 Jun 2019 17:23:31 +0200 Subject: [PATCH 12/14] fix assumption about empty Terms --- .../uid/PerThreadIDVersionAndSeqNoLookup.java | 4 +- .../engine/PrunePostingsMergePolicy.java | 3 +- .../index/engine/InternalEngineTests.java | 56 +++++++++++++++++++ .../engine/PrunePostingsMergePolicyTests.java | 4 -- 4 files changed, 60 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java index 889721a49e07a..99e725b153c1f 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java @@ -74,7 +74,9 @@ final class PerThreadIDVersionAndSeqNoLookup { // If a segment contains only no-ops, it does not have _uid but has both _soft_deletes and _tombstone fields. final NumericDocValues softDeletesDV = reader.getNumericDocValues(Lucene.SOFT_DELETES_FIELD); final NumericDocValues tombstoneDV = reader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); - if (softDeletesDV == null || tombstoneDV == null) { + // this is a special case when we pruned away all IDs in a segment since all docs are deleted. + final boolean allDocsDeleted = (softDeletesDV != null && reader.numDocs() == 0); + if ((softDeletesDV == null || tombstoneDV == null) && allDocsDeleted == false) { throw new IllegalArgumentException("reader does not have _uid terms but not a no-op segment; " + "_soft_deletes [" + softDeletesDV + "], _tombstone [" + tombstoneDV + "]"); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java index 77fe7d73b211d..f59b65003e2a3 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java @@ -35,7 +35,6 @@ import org.apache.lucene.util.BytesRef; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.util.Iterator; /** @@ -125,7 +124,7 @@ public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { @Override public ImpactsEnum impacts(int flags) throws IOException { - throw new UnsupportedEncodingException(); + throw new UnsupportedOperationException(); } }; } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index b213da097ce5e..eaff8491f1da0 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -115,6 +115,7 @@ import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLease; @@ -1452,6 +1453,61 @@ public void testForceMergeWithoutSoftDeletes() throws IOException { } } + /* + * we are testing an edge case here where we have a fully deleted segment that is retained but has all it's IDs pruned away. + */ + public void testLookupVersionWithPrunedAwayIds() throws IOException { + try (Directory dir = newDirectory()) { + IndexWriterConfig indexWriterConfig = new IndexWriterConfig(Lucene.STANDARD_ANALYZER); + indexWriterConfig.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD); + try (IndexWriter writer = new IndexWriter(dir, + indexWriterConfig.setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, + MatchAllDocsQuery::new, new PrunePostingsMergePolicy(indexWriterConfig.getMergePolicy(), "_id"))))) { + org.apache.lucene.document.Document doc = new org.apache.lucene.document.Document(); + doc.add(new Field(IdFieldMapper.NAME, "1", IdFieldMapper.Defaults.FIELD_TYPE)); + doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, -1)); + doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, 1)); + doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, 1)); + writer.addDocument(doc); + writer.flush(); + writer.softUpdateDocument(new Term(IdFieldMapper.NAME, "1"), doc, new NumericDocValuesField(Lucene.SOFT_DELETES_FIELD, 1)); + writer.updateNumericDocValue(new Term(IdFieldMapper.NAME, "1"), Lucene.SOFT_DELETES_FIELD, 1); + writer.forceMerge(1); + try (DirectoryReader reader = DirectoryReader.open(writer)) { + assertEquals(1, reader.leaves().size()); + assertNull(VersionsAndSeqNoResolver.loadDocIdAndVersion(reader, new Term(IdFieldMapper.NAME, "1"), false)); + } + } + } + } + + public void testUpdateWithFullyDeletedSegments() throws IOException { + Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), Integer.MAX_VALUE); + final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final Set liveDocs = new HashSet<>(); + try (Store store = createStore(); + InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, + null, globalCheckpoint::get))) { + int numDocs = scaledRandomIntBetween(10, 100); + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null); + engine.index(indexForDoc(doc)); + liveDocs.add(doc.id()); + } + + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null); + engine.index(indexForDoc(doc)); + liveDocs.add(doc.id()); + } + } + } + public void testForceMergeWithSoftDeletesRetention() throws Exception { final long retainedExtraOps = randomLongBetween(0, 10); Settings.Builder settings = Settings.builder() diff --git a/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java index 227629ac8b13a..9fb9445181327 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java @@ -35,8 +35,6 @@ import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.MatchAllDocsQuery; -import org.apache.lucene.search.MatchNoDocsQuery; -import org.apache.lucene.search.Query; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.store.Directory; @@ -44,8 +42,6 @@ import org.elasticsearch.test.ESTestCase; import java.io.IOException; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Predicate; public class PrunePostingsMergePolicyTests extends ESTestCase { From db7ef7e3ca3a79fcbe1552068c7013c0ae084aee Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 4 Jun 2019 17:27:22 +0200 Subject: [PATCH 13/14] remove null check --- .../elasticsearch/index/engine/PrunePostingsMergePolicy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java index f59b65003e2a3..0c973ba93f65d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/PrunePostingsMergePolicy.java @@ -114,7 +114,7 @@ protected AcceptStatus accept(BytesRef term) throws IOException { @Override public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { - if (reuse != null && reuse instanceof OnlyLiveDocsPostingsEnum) { + if (reuse instanceof OnlyLiveDocsPostingsEnum) { OnlyLiveDocsPostingsEnum reuseInstance = (OnlyLiveDocsPostingsEnum) reuse; reuseInstance.reset(super.postings(reuseInstance.in, flags)); return reuseInstance; From 2e133027191599f2e4389e47a4d6ae813e965ec2 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 5 Jun 2019 10:43:49 +0200 Subject: [PATCH 14/14] Add test for the engine to check if we prune the IDs of retained docs away --- .../index/engine/InternalEngineTests.java | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index eaff8491f1da0..cafc69d2869f4 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -51,6 +51,8 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; @@ -115,6 +117,7 @@ import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.ReplicationTracker; @@ -5734,4 +5737,56 @@ public void testRefreshAndFailEngineConcurrently() throws Exception { } assertThat(engine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L)); } + + public void testPruneAwayDeletedButRetainedIds() throws Exception { + IOUtils.close(engine, store); + Settings settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build()); + store = createStore(indexSettings, newDirectory()); + LogDocMergePolicy policy = new LogDocMergePolicy(); + policy.setMinMergeDocs(10000); + try (InternalEngine engine = createEngine(indexSettings, store, createTempDir(), policy)) { + int numDocs = between(1, 20); + for (int i = 0; i < numDocs; i++) { + index(engine, i); + } + engine.forceMerge(true); + engine.delete(new Engine.Delete("_doc", "0", newUid("0"), primaryTerm.get())); + engine.refresh("test"); + // now we have 2 segments since we now added a tombstone plus the old segment with the delete + try (Searcher searcher = engine.acquireSearcher("test")) { + IndexReader reader = searcher.reader(); + assertEquals(2, reader.leaves().size()); + LeafReaderContext leafReaderContext = reader.leaves().get(0); + LeafReader leafReader = leafReaderContext.reader(); + assertEquals("the delete and the tombstone", 1, leafReader.numDeletedDocs()); + assertEquals(numDocs, leafReader.maxDoc()); + Terms id = leafReader.terms("_id"); + assertNotNull(id); + assertEquals("deleted IDs are NOT YET pruned away", reader.numDocs()+1, id.size()); + TermsEnum iterator = id.iterator(); + assertTrue(iterator.seekExact(Uid.encodeId("0"))); + } + + // lets force merge the tombstone and the original segment and make sure the doc is still there but the ID term is gone + engine.forceMerge(true); + engine.refresh("test"); + try (Searcher searcher = engine.acquireSearcher("test")) { + IndexReader reader = searcher.reader(); + assertEquals(1, reader.leaves().size()); + LeafReaderContext leafReaderContext = reader.leaves().get(0); + LeafReader leafReader = leafReaderContext.reader(); + assertEquals("the delete and the tombstone", 2, leafReader.numDeletedDocs()); + assertEquals(numDocs+1, leafReader.maxDoc()); + Terms id = leafReader.terms("_id"); + assertNotNull(id); + assertEquals("deleted IDs are pruned away", reader.numDocs(), id.size()); + TermsEnum iterator = id.iterator(); + assertFalse(iterator.seekExact(Uid.encodeId("0"))); + } + } + } }