From 61849a1150ae298fe38667341bc640fecb952a50 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 18 Sep 2017 07:41:18 +0200 Subject: [PATCH] aggs: Allow aggregation sorting via nested aggregation. The nested aggregator now buffers all bucket ords per parent document and emits all bucket ords for a parent document's nested document once. This way the nested documents document DocIdSetIterator gets used once per bucket instead of wrapping the nested aggregator inside a multi bucket aggregator, which was the current solution upto now. This allows sorting by buckets under a nested bucket. Closes #16838 --- .../bucket/nested/NestedAggregator.java | 115 +++++++-- .../nested/NestedAggregatorFactory.java | 6 +- .../metrics/tophits/TopHitsAggregator.java | 9 +- .../bucket/nested/NestedAggregatorTests.java | 218 ++++++++++++++++++ 4 files changed, 321 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java index 99932bdc2fab7..b39bf864ad2b7 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.search.aggregations.bucket.nested; +import com.carrotsearch.hppc.LongArrayList; import org.apache.lucene.index.IndexReaderContext; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.ReaderUtil; @@ -51,14 +52,19 @@ class NestedAggregator extends BucketsAggregator implements SingleBucketAggregat private final BitSetProducer parentFilter; private final Query childFilter; + private final boolean collectsFromSingleBucket; + + private BufferingNestedLeafBucketCollector bufferingNestedLeafBucketCollector; NestedAggregator(String name, AggregatorFactories factories, ObjectMapper parentObjectMapper, ObjectMapper childObjectMapper, - SearchContext context, Aggregator parentAggregator, - List pipelineAggregators, Map metaData) throws IOException { + SearchContext context, Aggregator parentAggregator, + List pipelineAggregators, Map metaData, + boolean collectsFromSingleBucket) throws IOException { super(name, factories, context, parentAggregator, pipelineAggregators, metaData); Query parentFilter = parentObjectMapper != null ? parentObjectMapper.nestedTypeFilter() : Queries.newNonNestedFilter(); this.parentFilter = context.bitsetFilterCache().getBitSetProducer(parentFilter); this.childFilter = childObjectMapper.nestedTypeFilter(); + this.collectsFromSingleBucket = collectsFromSingleBucket; } @Override @@ -71,26 +77,38 @@ public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final L final BitSet parentDocs = parentFilter.getBitSet(ctx); final DocIdSetIterator childDocs = childDocsScorer != null ? childDocsScorer.iterator() : null; - return new LeafBucketCollectorBase(sub, null) { - @Override - public void collect(int parentDoc, long bucket) throws IOException { - // if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent - // doc), so we can skip: - if (parentDoc == 0 || parentDocs == null || childDocs == null) { - return; - } + if (collectsFromSingleBucket) { + return new LeafBucketCollectorBase(sub, null) { + @Override + public void collect(int parentDoc, long bucket) throws IOException { + // if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent + // doc), so we can skip: + if (parentDoc == 0 || parentDocs == null || childDocs == null) { + return; + } - final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1); - int childDocId = childDocs.docID(); - if (childDocId <= prevParentDoc) { - childDocId = childDocs.advance(prevParentDoc + 1); - } + final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1); + int childDocId = childDocs.docID(); + if (childDocId <= prevParentDoc) { + childDocId = childDocs.advance(prevParentDoc + 1); + } - for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) { - collectBucket(sub, childDocId, bucket); + for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) { + collectBucket(sub, childDocId, bucket); + } } - } - }; + }; + } else { + doPostCollection(); + return bufferingNestedLeafBucketCollector = new BufferingNestedLeafBucketCollector(sub, parentDocs, childDocs); + } + } + + @Override + protected void doPostCollection() throws IOException { + if (bufferingNestedLeafBucketCollector != null) { + bufferingNestedLeafBucketCollector.postCollect(); + } } @Override @@ -104,4 +122,63 @@ public InternalAggregation buildEmptyAggregation() { return new InternalNested(name, 0, buildEmptySubAggregations(), pipelineAggregators(), metaData()); } + class BufferingNestedLeafBucketCollector extends LeafBucketCollectorBase { + + final BitSet parentDocs; + final LeafBucketCollector sub; + final DocIdSetIterator childDocs; + final LongArrayList bucketBuffer = new LongArrayList(); + + int currentParentDoc = -1; + + BufferingNestedLeafBucketCollector(LeafBucketCollector sub, BitSet parentDocs, DocIdSetIterator childDocs) { + super(sub, null); + this.sub = sub; + this.parentDocs = parentDocs; + this.childDocs = childDocs; + } + + @Override + public void collect(int parentDoc, long bucket) throws IOException { + // if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent + // doc), so we can skip: + if (parentDoc == 0 || parentDocs == null || childDocs == null) { + return; + } + + if (currentParentDoc != parentDoc) { + processChildBuckets(currentParentDoc, bucketBuffer); + currentParentDoc = parentDoc; + } + bucketBuffer.add(bucket); + } + + void processChildBuckets(int parentDoc, LongArrayList buckets) throws IOException { + if (bucketBuffer.isEmpty()) { + return; + } + + + final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1); + int childDocId = childDocs.docID(); + if (childDocId <= prevParentDoc) { + childDocId = childDocs.advance(prevParentDoc + 1); + } + + for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) { + final long[] buffer = buckets.buffer; + final int size = buckets.size(); + for (int i = 0; i < size; i++) { + collectBucket(sub, childDocId, buffer[i]); + } + } + bucketBuffer.clear(); + } + + void postCollect() throws IOException { + processChildBuckets(currentParentDoc, bucketBuffer); + } + + } + } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorFactory.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorFactory.java index b491bf8ff0dc4..dfbe18ba87b4f 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorFactory.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorFactory.java @@ -48,13 +48,11 @@ class NestedAggregatorFactory extends AggregatorFactory @Override public Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List pipelineAggregators, Map metaData) throws IOException { - if (collectsFromSingleBucket == false) { - return asMultiBucketAggregator(this, context, parent); - } if (childObjectMapper == null) { return new Unmapped(name, context, parent, pipelineAggregators, metaData); } - return new NestedAggregator(name, factories, parentObjectMapper, childObjectMapper, context, parent, pipelineAggregators, metaData); + return new NestedAggregator(name, factories, parentObjectMapper, childObjectMapper, context, parent, + pipelineAggregators, metaData, collectsFromSingleBucket); } private static final class Unmapped extends NonCollectingAggregator { diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsAggregator.java index aeed0ef250ea4..84dd870e3f06d 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsAggregator.java @@ -91,10 +91,6 @@ public boolean needsScores() { public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { - for (LongObjectPagedHashMap.Cursor cursor : topDocsCollectors) { - cursor.value.leafCollector = cursor.value.topLevelCollector.getLeafCollector(ctx); - } - return new LeafBucketCollectorBase(sub, null) { Scorer scorer; @@ -103,6 +99,11 @@ public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, public void setScorer(Scorer scorer) throws IOException { this.scorer = scorer; for (LongObjectPagedHashMap.Cursor cursor : topDocsCollectors) { + // Instantiate the leaf collector not in the getLeafCollector(...) method or in the constructor of this + // anonymous class. Otherwise in the case this leaf bucket collector gets invoked with post collection + // then we already have moved on to the next reader and then we may encounter assertion errors or + // incorrect results. + cursor.value.leafCollector = cursor.value.topLevelCollector.getLeafCollector(ctx); cursor.value.leafCollector.setScorer(scorer); } super.setScorer(scorer); diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java index 7000924001f20..f6e18f828045f 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java @@ -22,6 +22,7 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriterConfig; @@ -34,21 +35,33 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lucene.search.Queries; +import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.mapper.TypeFieldMapper; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.max.InternalMax; +import org.elasticsearch.search.aggregations.metrics.max.Max; import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.min.Min; +import org.elasticsearch.search.aggregations.metrics.min.MinAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.sum.InternalSum; import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValueType; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Locale; import java.util.stream.DoubleStream; public class NestedAggregatorTests extends AggregatorTestCase { @@ -314,6 +327,189 @@ public void testResetRootDocId() throws Exception { } } + public void testNestedOrdering() throws IOException { + try (Directory directory = newDirectory()) { + try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory)) { + iw.addDocuments(generateBook("1", new String[]{"a"}, new int[]{12, 13, 14})); + iw.addDocuments(generateBook("2", new String[]{"b"}, new int[]{5, 50})); + iw.addDocuments(generateBook("3", new String[]{"c"}, new int[]{39, 19})); + iw.addDocuments(generateBook("4", new String[]{"d"}, new int[]{2, 1, 3})); + iw.addDocuments(generateBook("5", new String[]{"a"}, new int[]{70, 10})); + iw.addDocuments(generateBook("6", new String[]{"e"}, new int[]{23, 21})); + iw.addDocuments(generateBook("7", new String[]{"e", "a"}, new int[]{8, 8})); + iw.addDocuments(generateBook("8", new String[]{"f"}, new int[]{12, 14})); + iw.addDocuments(generateBook("9", new String[]{"g", "c", "e"}, new int[]{18, 8})); + } + try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) { + MappedFieldType fieldType1 = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + fieldType1.setName("num_pages"); + MappedFieldType fieldType2 = new KeywordFieldMapper.KeywordFieldType(); + fieldType2.setHasDocValues(true); + fieldType2.setName("author"); + + TermsAggregationBuilder termsBuilder = new TermsAggregationBuilder("authors", ValueType.STRING) + .field("author").order(BucketOrder.aggregation("chapters>num_pages.value", true)); + NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder("chapters", "nested_chapters"); + MaxAggregationBuilder maxAgg = new MaxAggregationBuilder("num_pages").field("num_pages"); + nestedBuilder.subAggregation(maxAgg); + termsBuilder.subAggregation(nestedBuilder); + + Terms terms = search(newSearcher(indexReader, false, true), + new MatchAllDocsQuery(), termsBuilder, fieldType1, fieldType2); + + assertEquals(7, terms.getBuckets().size()); + assertEquals("authors", terms.getName()); + + Terms.Bucket bucket = terms.getBuckets().get(0); + assertEquals("d", bucket.getKeyAsString()); + Max numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages"); + assertEquals(3, (int) numPages.getValue()); + + bucket = terms.getBuckets().get(1); + assertEquals("f", bucket.getKeyAsString()); + numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages"); + assertEquals(14, (int) numPages.getValue()); + + bucket = terms.getBuckets().get(2); + assertEquals("g", bucket.getKeyAsString()); + numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages"); + assertEquals(18, (int) numPages.getValue()); + + bucket = terms.getBuckets().get(3); + assertEquals("e", bucket.getKeyAsString()); + numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages"); + assertEquals(23, (int) numPages.getValue()); + + bucket = terms.getBuckets().get(4); + assertEquals("c", bucket.getKeyAsString()); + numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages"); + assertEquals(39, (int) numPages.getValue()); + + bucket = terms.getBuckets().get(5); + assertEquals("b", bucket.getKeyAsString()); + numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages"); + assertEquals(50, (int) numPages.getValue()); + + bucket = terms.getBuckets().get(6); + assertEquals("a", bucket.getKeyAsString()); + numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages"); + assertEquals(70, (int) numPages.getValue()); + + // reverse order: + termsBuilder = new TermsAggregationBuilder("authors", ValueType.STRING) + .field("author").order(BucketOrder.aggregation("chapters>num_pages.value", false)); + nestedBuilder = new NestedAggregationBuilder("chapters", "nested_chapters"); + maxAgg = new MaxAggregationBuilder("num_pages").field("num_pages"); + nestedBuilder.subAggregation(maxAgg); + termsBuilder.subAggregation(nestedBuilder); + + terms = search(newSearcher(indexReader, false, true), new MatchAllDocsQuery(), termsBuilder, fieldType1, fieldType2); + + assertEquals(7, terms.getBuckets().size()); + assertEquals("authors", terms.getName()); + + bucket = terms.getBuckets().get(0); + assertEquals("a", bucket.getKeyAsString()); + numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages"); + assertEquals(70, (int) numPages.getValue()); + + bucket = terms.getBuckets().get(1); + assertEquals("b", bucket.getKeyAsString()); + numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages"); + assertEquals(50, (int) numPages.getValue()); + + bucket = terms.getBuckets().get(2); + assertEquals("c", bucket.getKeyAsString()); + numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages"); + assertEquals(39, (int) numPages.getValue()); + + bucket = terms.getBuckets().get(3); + assertEquals("e", bucket.getKeyAsString()); + numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages"); + assertEquals(23, (int) numPages.getValue()); + + bucket = terms.getBuckets().get(4); + assertEquals("g", bucket.getKeyAsString()); + numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages"); + assertEquals(18, (int) numPages.getValue()); + + bucket = terms.getBuckets().get(5); + assertEquals("f", bucket.getKeyAsString()); + numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages"); + assertEquals(14, (int) numPages.getValue()); + + bucket = terms.getBuckets().get(6); + assertEquals("d", bucket.getKeyAsString()); + numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages"); + assertEquals(3, (int) numPages.getValue()); + } + } + } + + public void testNestedOrdering_random() throws IOException { + int numBooks = randomIntBetween(32, 512); + List> books = new ArrayList<>(); + for (int i = 0; i < numBooks; i++) { + int numChapters = randomIntBetween(1, 8); + int[] chapters = new int[numChapters]; + for (int j = 0; j < numChapters; j++) { + chapters[j] = randomIntBetween(2, 64); + } + books.add(Tuple.tuple(String.format(Locale.ROOT, "%03d", i), chapters)); + } + try (Directory directory = newDirectory()) { + try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory)) { + int id = 0; + for (Tuple book : books) { + iw.addDocuments(generateBook( + String.format(Locale.ROOT, "%03d", id), new String[]{book.v1()}, book.v2()) + ); + id++; + } + } + for (Tuple book : books) { + Arrays.sort(book.v2()); + } + books.sort((o1, o2) -> { + int cmp = Integer.compare(o1.v2()[0], o2.v2()[0]); + if (cmp == 0) { + return o1.v1().compareTo(o2.v1()); + } else { + return cmp; + } + }); + try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) { + MappedFieldType fieldType1 = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + fieldType1.setName("num_pages"); + MappedFieldType fieldType2 = new KeywordFieldMapper.KeywordFieldType(); + fieldType2.setHasDocValues(true); + fieldType2.setName("author"); + + TermsAggregationBuilder termsBuilder = new TermsAggregationBuilder("authors", ValueType.STRING) + .size(books.size()).field("author") + .order(BucketOrder.compound(BucketOrder.aggregation("chapters>num_pages.value", true), BucketOrder.key(true))); + NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder("chapters", "nested_chapters"); + MinAggregationBuilder minAgg = new MinAggregationBuilder("num_pages").field("num_pages"); + nestedBuilder.subAggregation(minAgg); + termsBuilder.subAggregation(nestedBuilder); + + Terms terms = search(newSearcher(indexReader, false, true), + new MatchAllDocsQuery(), termsBuilder, fieldType1, fieldType2); + + assertEquals(books.size(), terms.getBuckets().size()); + assertEquals("authors", terms.getName()); + + for (int i = 0; i < books.size(); i++) { + Tuple book = books.get(i); + Terms.Bucket bucket = terms.getBuckets().get(i); + assertEquals(book.v1(), bucket.getKeyAsString()); + Min numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages"); + assertEquals(book.v2()[0], (int) numPages.getValue()); + } + } + } + } + private double generateMaxDocs(List documents, int numNestedDocs, int id, String path, String fieldName) { return DoubleStream.of(generateDocuments(documents, numNestedDocs, id, path, fieldName)) .max().orElse(Double.NEGATIVE_INFINITY); @@ -340,4 +536,26 @@ private double[] generateDocuments(List documents, int numNestedDocs, return values; } + private List generateBook(String id, String[] authors, int[] numPages) { + List documents = new ArrayList<>(); + + for (int numPage : numPages) { + Document document = new Document(); + document.add(new Field(UidFieldMapper.NAME, "book#" + id, UidFieldMapper.Defaults.NESTED_FIELD_TYPE)); + document.add(new Field(TypeFieldMapper.NAME, "__nested_chapters", TypeFieldMapper.Defaults.FIELD_TYPE)); + document.add(new SortedNumericDocValuesField("num_pages", numPage)); + documents.add(document); + } + + Document document = new Document(); + document.add(new Field(UidFieldMapper.NAME, "book#" + id, UidFieldMapper.Defaults.FIELD_TYPE)); + document.add(new Field(TypeFieldMapper.NAME, "book", TypeFieldMapper.Defaults.FIELD_TYPE)); + for (String author : authors) { + document.add(new SortedSetDocValuesField("author", new BytesRef(author))); + } + documents.add(document); + + return documents; + } + }