From 4c706833645f769e8a432708c37d24a96717b5a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CJames?= Date: Fri, 10 Jul 2020 17:15:45 -0400 Subject: [PATCH 1/9] Convert merge map to a UnaryOperator in VWH --- .../bucket/BucketsAggregator.java | 23 +++++++++++-- .../MergingBucketsDeferringCollector.java | 16 ++++++++-- .../VariableWidthHistogramAggregator.java | 32 ++++++++++--------- 3 files changed, 52 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index ad364d0367d50..82f1270e05db7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -46,6 +46,7 @@ import java.util.function.Function; import java.util.function.IntConsumer; import java.util.function.ToLongFunction; +import java.util.function.UnaryOperator; public abstract class BucketsAggregator extends AggregatorBase { @@ -107,15 +108,33 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do * Refer to that method for documentation about the merge map. */ public final void mergeBuckets(long[] mergeMap, long newNumBuckets) { + UnaryOperator mergeMapOperator = new UnaryOperator() { + @Override + public Long apply(Long bucket) { + return mergeMap[Math.toIntExact(bucket)]; + } + }; + + mergeBuckets(mergeMapOperator, newNumBuckets); + } + + /** + * This only tidies up doc counts. Call {@link MergingBucketsDeferringCollector#mergeBuckets(UnaryOperator)} to + * merge the actual ordinals and doc ID deltas. + */ + public final void mergeBuckets(UnaryOperator mergeMap, long newNumBuckets){ try (IntArray oldDocCounts = docCounts) { docCounts = bigArrays.newIntArray(newNumBuckets, true); docCounts.fill(0, newNumBuckets, 0); for (int i = 0; i < oldDocCounts.size(); i++) { int docCount = oldDocCounts.get(i); + if(docCount == 0) continue; + // Skip any in the map which have been "removed", signified with -1 - if (docCount != 0 && mergeMap[i] != -1) { - docCounts.increment(mergeMap[i], docCount); + long destinationOrdinal = mergeMap.apply((long)i); + if (destinationOrdinal != -1) { + docCounts.increment(destinationOrdinal, docCount); } } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java index 2fd2be6613351..3a3fbe0ac5095 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.function.UnaryOperator; /** * A specialization of {@link BestBucketsDeferringCollector} that collects all @@ -53,6 +54,17 @@ public MergingBucketsDeferringCollector(SearchContext context, boolean isGlobal) * not be called unless there are actually changes to be made, to avoid unnecessary work. */ public void mergeBuckets(long[] mergeMap) { + UnaryOperator mergeMapOperator = new UnaryOperator() { + @Override + public Long apply(Long bucket) { + return mergeMap[Math.toIntExact(bucket)]; + } + }; + + mergeBuckets(mergeMapOperator); + } + + public void mergeBuckets(UnaryOperator mergeMap){ List newEntries = new ArrayList<>(entries.size()); for (Entry sourceEntry : entries) { PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT); @@ -66,7 +78,7 @@ public void mergeBuckets(long[] mergeMap) { long delta = docDeltasItr.next(); // Only merge in the ordinal if it hasn't been "removed", signified with -1 - long ordinal = mergeMap[Math.toIntExact(bucket)]; + long ordinal = mergeMap.apply(bucket); if (ordinal != -1) { newBuckets.add(ordinal); @@ -102,7 +114,7 @@ public void mergeBuckets(long[] mergeMap) { long bucket = itr.next(); assert docDeltasItr.hasNext(); long delta = docDeltasItr.next(); - long ordinal = mergeMap[Math.toIntExact(bucket)]; + long ordinal = mergeMap.apply(bucket); // Only merge in the ordinal if it hasn't been "removed", signified with -1 if (ordinal != -1) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java index 63247d70b65f4..9369f4392de45 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.function.UnaryOperator; public class VariableWidthHistogramAggregator extends DeferableBucketAggregator { @@ -265,7 +266,7 @@ private void bucketBufferedDocs(final DoubleArray buffer, final int bufferSize, } } - mergeBuckets(mergeMap, numBuckets); + mergeBuckets(mergeMap, shardSize); if (deferringCollector != null) { deferringCollector.mergeBuckets(mergeMap); } @@ -354,21 +355,22 @@ private void moveLastCluster(int index){ clusterSizes.set(index, holdSize); // Move the underlying buckets - long[] mergeMap = new long[numClusters]; - for (int i = 0; i < index; i++) { - // The clusters in range {0 ... idx - 1} don't move - mergeMap[i] = i; - } - for (int i = index; i < numClusters - 1; i++) { - // The clusters in range {index ... numClusters - 1} shift up - mergeMap[i] = i + 1; - } - // Finally, the new cluster moves to index - mergeMap[numClusters - 1] = index; + UnaryOperator mergeMap = new UnaryOperator() { + @Override + public Long apply(Long i) { + if(i < index) { + // The clusters in range {0 ... idx - 1} don't move + return i; + } else if(i == numClusters - 1) { + // The new cluster moves to index + return (long)index; + } else { + // The clusters in range {index ... numClusters - 1} shift forward + return i + 1; + } + } + }; - // TODO: Create a moveLastCluster() method in BucketsAggregator which is like BucketsAggregator::mergeBuckets, - // except it doesn't require a merge map. This would be more efficient as there would be no need to create a - // merge map on every call. mergeBuckets(mergeMap, numClusters); if (deferringCollector != null) { deferringCollector.mergeBuckets(mergeMap); From 5e90bd4f08e0dd836cd3238b7776a99c62b5c03e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CJames?= Date: Wed, 15 Jul 2020 02:30:17 -0400 Subject: [PATCH 2/9] Change UnaryOperator to LongUnaryOperator (and other similar style fixes) in the mergeBuckets methods --- .../bucket/BucketsAggregator.java | 25 +++++++++-------- .../MergingBucketsDeferringCollector.java | 28 +++++++++++-------- .../VariableWidthHistogramAggregator.java | 27 +++++++++--------- 3 files changed, 44 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index 82f1270e05db7..24cf76e7aea25 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -45,6 +45,7 @@ import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.IntConsumer; +import java.util.function.LongUnaryOperator; import java.util.function.ToLongFunction; import java.util.function.UnaryOperator; @@ -106,33 +107,33 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do * ordinals and doc ID deltas. * * Refer to that method for documentation about the merge map. + * + * @deprecated use {@link mergeBuckets(long, LongUnaryOperator)} */ + @Deprecated public final void mergeBuckets(long[] mergeMap, long newNumBuckets) { - UnaryOperator mergeMapOperator = new UnaryOperator() { - @Override - public Long apply(Long bucket) { - return mergeMap[Math.toIntExact(bucket)]; - } - }; - - mergeBuckets(mergeMapOperator, newNumBuckets); + mergeBuckets(newNumBuckets, bucket -> mergeMap[Math.toIntExact(bucket)]); } /** - * This only tidies up doc counts. Call {@link MergingBucketsDeferringCollector#mergeBuckets(UnaryOperator)} to + * + * @param mergeMap a unary operatorwhich maps a bucket's ordinal to the ordinal it should be merged with. + * If the value is set to -1 then the bucket is removed entirely. + * + * This only tidies up doc counts. Call {@link MergingBucketsDeferringCollector#mergeBuckets(LongUnaryOperator)} to * merge the actual ordinals and doc ID deltas. */ - public final void mergeBuckets(UnaryOperator mergeMap, long newNumBuckets){ + public final void mergeBuckets(long newNumBuckets, LongUnaryOperator mergeMap){ try (IntArray oldDocCounts = docCounts) { docCounts = bigArrays.newIntArray(newNumBuckets, true); docCounts.fill(0, newNumBuckets, 0); - for (int i = 0; i < oldDocCounts.size(); i++) { + for (long i = 0; i < oldDocCounts.size(); i++) { int docCount = oldDocCounts.get(i); if(docCount == 0) continue; // Skip any in the map which have been "removed", signified with -1 - long destinationOrdinal = mergeMap.apply((long)i); + long destinationOrdinal = mergeMap.applyAsLong(i); if (destinationOrdinal != -1) { docCounts.increment(destinationOrdinal, docCount); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java index 3a3fbe0ac5095..d27f2b717998d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.function.LongUnaryOperator; import java.util.function.UnaryOperator; /** @@ -52,19 +53,24 @@ public MergingBucketsDeferringCollector(SearchContext context, boolean isGlobal) * * This process rebuilds the ordinals and docDeltas according to the mergeMap, so it should * not be called unless there are actually changes to be made, to avoid unnecessary work. + * + * @deprecated use {@link mergeBuckets(LongUnaryOperator)} */ + @Deprecated public void mergeBuckets(long[] mergeMap) { - UnaryOperator mergeMapOperator = new UnaryOperator() { - @Override - public Long apply(Long bucket) { - return mergeMap[Math.toIntExact(bucket)]; - } - }; - - mergeBuckets(mergeMapOperator); + mergeBuckets(bucket -> mergeMap[Math.toIntExact(bucket)]); } - public void mergeBuckets(UnaryOperator mergeMap){ + /** + * Merges/prunes the existing bucket ordinals and docDeltas according to the provided mergeMap. + * + * @param mergeMap a unary operatorwhich maps a bucket's ordinal to the ordinal it should be merged with. + * If the value is set to -1 then the bucket is removed entirely. + * + * This process rebuilds the ordinals and docDeltas according to the mergeMap, so it should + * not be called unless there are actually changes to be made, to avoid unnecessary work. + */ + public void mergeBuckets(LongUnaryOperator mergeMap){ List newEntries = new ArrayList<>(entries.size()); for (Entry sourceEntry : entries) { PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT); @@ -78,7 +84,7 @@ public void mergeBuckets(UnaryOperator mergeMap){ long delta = docDeltasItr.next(); // Only merge in the ordinal if it hasn't been "removed", signified with -1 - long ordinal = mergeMap.apply(bucket); + long ordinal = mergeMap.applyAsLong(bucket); if (ordinal != -1) { newBuckets.add(ordinal); @@ -114,7 +120,7 @@ public void mergeBuckets(UnaryOperator mergeMap){ long bucket = itr.next(); assert docDeltasItr.hasNext(); long delta = docDeltasItr.next(); - long ordinal = mergeMap.apply(bucket); + long ordinal = mergeMap.applyAsLong(bucket); // Only merge in the ordinal if it hasn't been "removed", signified with -1 if (ordinal != -1) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java index 9369f4392de45..4273c165e3a22 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.function.LongUnaryOperator; import java.util.function.UnaryOperator; public class VariableWidthHistogramAggregator extends DeferableBucketAggregator { @@ -355,23 +356,23 @@ private void moveLastCluster(int index){ clusterSizes.set(index, holdSize); // Move the underlying buckets - UnaryOperator mergeMap = new UnaryOperator() { + LongUnaryOperator mergeMap = new LongUnaryOperator() { @Override - public Long apply(Long i) { - if(i < index) { - // The clusters in range {0 ... idx - 1} don't move - return i; - } else if(i == numClusters - 1) { - // The new cluster moves to index - return (long)index; - } else { - // The clusters in range {index ... numClusters - 1} shift forward - return i + 1; - } + public long applyAsLong(long i) { + if(i < index) { + // The clusters in range {0 ... idx - 1} don't move + return i; + } + if(i == numClusters - 1) { + // The new cluster moves to index + return (long)index; + } + // The clusters in range {index ... numClusters - 1} shift forward + return i + 1; } }; - mergeBuckets(mergeMap, numClusters); + mergeBuckets(numClusters, mergeMap); if (deferringCollector != null) { deferringCollector.mergeBuckets(mergeMap); } From 77ebb6381c72374d3bfed2bc2ab44b18bfb5f122 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CJames?= Date: Wed, 15 Jul 2020 02:31:33 -0400 Subject: [PATCH 3/9] Add tests for BucketsAggregator::mergeBuckets --- .../bucket/BucketsAggregatorTests.java | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java new file mode 100644 index 0000000000000..2cc7da4d43a0d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java @@ -0,0 +1,134 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.store.Directory; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; +import org.elasticsearch.search.internal.SearchContext; +import org.junit.Before; + +import java.io.IOException; + +import static org.elasticsearch.search.aggregations.MultiBucketConsumerService.DEFAULT_MAX_BUCKETS; +import static org.hamcrest.Matchers.equalTo; + +public class BucketsAggregatorTests extends AggregatorTestCase{ + + public BucketsAggregator buildMergeAggregator() throws IOException{ + + try(Directory directory = newDirectory()) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + final Document document = new Document(); + document.add(new SortedNumericDocValuesField("numeric", 0)); + indexWriter.addDocument(document); + indexWriter.commit(); + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + final IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + SearchContext searchContext = createSearchContext( + indexSearcher, + createIndexSettings(), + null, + new MultiBucketConsumerService.MultiBucketConsumer(DEFAULT_MAX_BUCKETS, new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)), + new NumberFieldMapper.NumberFieldType("test", NumberFieldMapper.NumberType.INTEGER) + ); + + searchContext.bigArrays().breakerService(); + + return new BucketsAggregator("test", AggregatorFactories.EMPTY, searchContext, null, null, null) { + @Override + protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + return null; + } + + @Override + public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + return new InternalAggregation[0]; + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return null; + } + }; + } + } + } + + public void testBucketMergeNoDelete() throws IOException{ + BucketsAggregator mergeAggregator = buildMergeAggregator(); + + mergeAggregator.grow(10); + for(int i = 0; i < 10; i++){ + mergeAggregator.incrementBucketDocCount(i, i); + } + + mergeAggregator.mergeBuckets(10, bucket -> bucket % 5); + + for(int i=0; i<5; i++) { + // The i'th bucket should now have all docs whose index % 5 == i + // This is buckets i and i + 5 + // i + (i+5) = 2*i + 5 + assertEquals(mergeAggregator.getDocCounts().get(i), (2 * i) + 5); + } + for(int i=5; i<10; i++){ + assertEquals(mergeAggregator.getDocCounts().get(i), 0); + } + } + + public void testBucketMergeAndDelete() throws IOException{ + BucketsAggregator mergeAggregator = buildMergeAggregator(); + + mergeAggregator.grow(10); + int sum = 0; + for(int i = 0; i < 20; i++){ + mergeAggregator.incrementBucketDocCount(i, i); + if(5 <= i && i < 15) { + sum += i; + } + } + + // Put the buckets in indices 5 ... 14 in bucket 5, and delete the rest of the buckets + mergeAggregator.mergeBuckets(10, bucket -> (5 <= bucket && bucket < 15) ? 5 : -1); + + assertEquals(mergeAggregator.getDocCounts().size(), 10); // Confirm that the 10 other buckets were deleted + for(int i=0; i<10; i++){ + assertEquals(mergeAggregator.getDocCounts().get(i), i == 5 ? sum : 0); + } + } +} From 3b32bafb76fccbfdb523ce61c2da085c95996e0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CJames?= Date: Thu, 16 Jul 2020 12:49:06 -0400 Subject: [PATCH 4/9] Add tests for MergingBucketsDeferringCollector::mergeBuckets --- .../MergingBucketsDeferringCollector.java | 15 + .../bucket/BucketsAggregatorTests.java | 8 +- ...MergingBucketsDeferringCollectorTests.java | 286 ++++++++++++++++++ 3 files changed, 303 insertions(+), 6 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java index d27f2b717998d..4b86ab9593ae6 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java @@ -139,4 +139,19 @@ public void mergeBuckets(LongUnaryOperator mergeMap){ bucketsBuilder = newBuckets; } } + + /** + * Utility method for testing (see MergingBucketsDeferringCollectorTests) + * @return ordered list of bucket ordinals being stored + */ + List getBuckets(){ + List buckets = new ArrayList<>(); + for (Entry sourceEntry : entries) { + for (PackedLongValues.Iterator itr = sourceEntry.buckets.iterator(); itr.hasNext(); ) { + long bucket = itr.next(); + buckets.add(bucket); + } + } + return buckets; + } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java index 2cc7da4d43a0d..06a842a69d6ab 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java @@ -48,17 +48,15 @@ public class BucketsAggregatorTests extends AggregatorTestCase{ public BucketsAggregator buildMergeAggregator() throws IOException{ - try(Directory directory = newDirectory()) { try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { - final Document document = new Document(); + Document document = new Document(); document.add(new SortedNumericDocValuesField("numeric", 0)); indexWriter.addDocument(document); - indexWriter.commit(); } try (IndexReader indexReader = DirectoryReader.open(directory)) { - final IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + IndexSearcher indexSearcher = new IndexSearcher(indexReader); SearchContext searchContext = createSearchContext( indexSearcher, @@ -68,8 +66,6 @@ public BucketsAggregator buildMergeAggregator() throws IOException{ new NumberFieldMapper.NumberFieldType("test", NumberFieldMapper.NumberType.INTEGER) ); - searchContext.bigArrays().breakerService(); - return new BucketsAggregator("test", AggregatorFactories.EMPTY, searchContext, null, null, null) { @Override protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java new file mode 100644 index 0000000000000..aa255d3773251 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java @@ -0,0 +1,286 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket; + +import org.apache.lucene.search.Scorable; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.BulkScorer; +import org.apache.lucene.search.ConstantScoreScorer; +import org.apache.lucene.search.ConstantScoreWeight; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.Weight; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.Bits; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.BucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.bucket.MergingBucketsDeferringCollector; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomInt; +import static org.apache.lucene.util.LuceneTestCase.newDirectory; +import static org.apache.lucene.util.LuceneTestCase.random; +import static org.mockito.Mockito.when; + +public class MergingBucketsDeferringCollectorTests extends AggregatorTestCase { + + /** + * Usually all documents get collected into ordinal 0 unless they are part of a sub aggregation + * @return a query that collects the i'th document into bucket ordinal i + */ + private Query getQueryToCollectIntoDifferentOrdinals() { + return new Query() { + @Override + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) { + return new ConstantScoreWeight(this, boost) { + + @Override + public Scorer scorer(LeafReaderContext context) throws IOException { + return new ConstantScoreScorer(this, score(), scoreMode, DocIdSetIterator.all(context.reader().maxDoc())); + } + + @Override + public boolean isCacheable(LeafReaderContext ctx) { + return false; + } + + @Override + public BulkScorer bulkScorer(LeafReaderContext context) throws IOException { + if (scoreMode == ScoreMode.TOP_SCORES) { + return super.bulkScorer(context); + } + final float score = score(); + final int maxDoc = context.reader().maxDoc(); + return new BulkScorer() { + @Override + public int score(LeafCollector collector, Bits acceptDocs, int min, int max) throws IOException { + LeafBucketCollector leafBucketCollector = (LeafBucketCollector) collector; + max = Math.min(max, maxDoc); + for (int doc = min; doc < max; ++doc) { + if (acceptDocs == null || acceptDocs.get(doc)) { + leafBucketCollector.collect(doc, doc); + } + } + return max == maxDoc ? DocIdSetIterator.NO_MORE_DOCS : max; + } + + @Override + public long cost() { + return maxDoc; + } + }; + } + }; + } + + @Override + public String toString(String field) { + return "*:*"; + } + + @Override + public boolean equals(Object o) { + return sameClassAs(o); + } + + @Override + public int hashCode() { + return classHash(); + } + + @Override + public void visit(QueryVisitor visitor) { + visitor.visitLeaf(this); + } + }; + } + + public void testBucketMergeNoDelete() throws Exception { + try (Directory directory = newDirectory()) { + int numDocs = 10; + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + for (int i = 0; i < 10; i++) { + Document document = new Document(); + document.add(new NumericDocValuesField("field", 3 * i)); + indexWriter.addDocument(document); + } + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = new IndexSearcher(indexReader); + + Query query = getQueryToCollectIntoDifferentOrdinals(); + Query rewrittenQuery = indexSearcher.rewrite(query); + TopDocs topDocs = indexSearcher.search(query, numDocs); + + SearchContext searchContext = createSearchContext(indexSearcher, createIndexSettings(), rewrittenQuery, null); + when(searchContext.query()).thenReturn(rewrittenQuery); + MergingBucketsDeferringCollector deferringCollector = new MergingBucketsDeferringCollector(searchContext, false) { + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE; + } + }; + + BucketCollector bc = new BucketCollector() { + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { + return new LeafBucketCollector() { + @Override + public void collect(int doc, long bucket) throws IOException { + if (doc == 9) { + deferringCollector.mergeBuckets(b -> 9 - b); + } + } + }; + } + + @Override + public void preCollection() throws IOException { + } + + @Override + public void postCollection() throws IOException { + } + + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE_NO_SCORES; + } + }; + + deferringCollector.setDeferredCollector(Collections.singleton(bc)); + deferringCollector.preCollection(); + indexSearcher.search(query, deferringCollector); + deferringCollector.postCollection(); + deferringCollector.prepareSelectedBuckets(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + List ordinals = deferringCollector.getBuckets(); + assertEquals(ordinals.size(), 10); + for (int i = 0; i < ordinals.size(); i++) { + assertEquals(i, 9 - ordinals.get(i)); + } + } + } + } + + public void testBucketMergeAndDelete() throws Exception { + try (Directory directory = newDirectory()) { + int numDocs = 10; + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + for (int i = 0; i < 10; i++) { + Document document = new Document(); + document.add(new NumericDocValuesField("field", 3 * i)); + indexWriter.addDocument(document); + } + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = new IndexSearcher(indexReader); + + Query query = getQueryToCollectIntoDifferentOrdinals(); + Query rewrittenQuery = indexSearcher.rewrite(query); + TopDocs topDocs = indexSearcher.search(query, numDocs); + + SearchContext searchContext = createSearchContext(indexSearcher, createIndexSettings(), rewrittenQuery, null); + when(searchContext.query()).thenReturn(rewrittenQuery); + MergingBucketsDeferringCollector deferringCollector = new MergingBucketsDeferringCollector(searchContext, false) { + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE; + } + }; + + BucketCollector bc = new BucketCollector() { + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { + return new LeafBucketCollector() { + @Override + public void collect(int doc, long bucket) throws IOException { + if (doc == 9) { + // Apply two merge operations once we reach the last bucket + // In the end, only buckets where bucket % 4 = 0, will remain + deferringCollector.mergeBuckets(b -> b % 2 == 0 ? b : -1); + deferringCollector.mergeBuckets(b -> b % 4 == 0 ? b : -1); + } + } + }; + } + + @Override + public void preCollection() throws IOException { + } + + @Override + public void postCollection() throws IOException { + } + + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE_NO_SCORES; + } + }; + + deferringCollector.setDeferredCollector(Collections.singleton(bc)); + deferringCollector.preCollection(); + indexSearcher.search(query, deferringCollector); + deferringCollector.postCollection(); + deferringCollector.prepareSelectedBuckets(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + List ordinals = deferringCollector.getBuckets(); + assertEquals(ordinals.size(), 3); + assertEquals(0L, (long)ordinals.get(0)); + assertEquals(4L, (long)ordinals.get(1)); + assertEquals(8L, (long)ordinals.get(2)); + } + } + } +} + From 4e8597f4b67fe0de7d701b220257633217917ff6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CJames?= Date: Thu, 16 Jul 2020 13:06:39 -0400 Subject: [PATCH 5/9] Fix formatting --- .../search/aggregations/bucket/BucketsAggregator.java | 4 ++-- .../aggregations/bucket/MergingBucketsDeferringCollector.java | 4 ++-- .../search/aggregations/bucket/BucketsAggregatorTests.java | 4 ++-- .../bucket/MergingBucketsDeferringCollectorTests.java | 4 +--- 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index 24cf76e7aea25..0187727a22df3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -117,8 +117,8 @@ public final void mergeBuckets(long[] mergeMap, long newNumBuckets) { /** * - * @param mergeMap a unary operatorwhich maps a bucket's ordinal to the ordinal it should be merged with. - * If the value is set to -1 then the bucket is removed entirely. + * @param mergeMap a unary operator which maps a bucket's ordinal to the ordinal it should be merged with. + * If a bucket's ordinal is mapped to -1 then the bucket is removed entirely. * * This only tidies up doc counts. Call {@link MergingBucketsDeferringCollector#mergeBuckets(LongUnaryOperator)} to * merge the actual ordinals and doc ID deltas. diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java index 4b86ab9593ae6..8b4997151c1ee 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java @@ -64,8 +64,8 @@ public void mergeBuckets(long[] mergeMap) { /** * Merges/prunes the existing bucket ordinals and docDeltas according to the provided mergeMap. * - * @param mergeMap a unary operatorwhich maps a bucket's ordinal to the ordinal it should be merged with. - * If the value is set to -1 then the bucket is removed entirely. + * @param mergeMap a unary operator which maps a bucket's ordinal to the ordinal it should be merged with. + * If a bucket's ordinal is mapped to -1 then the bucket is removed entirely. * * This process rebuilds the ordinals and docDeltas according to the mergeMap, so it should * not be called unless there are actually changes to be made, to avoid unnecessary work. diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java index 06a842a69d6ab..8a5ef7d61dd2a 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java @@ -97,7 +97,7 @@ public void testBucketMergeNoDelete() throws IOException{ mergeAggregator.mergeBuckets(10, bucket -> bucket % 5); for(int i=0; i<5; i++) { - // The i'th bucket should now have all docs whose index % 5 == i + // The i'th bucket should now have all docs whose index % 5 = i // This is buckets i and i + 5 // i + (i+5) = 2*i + 5 assertEquals(mergeAggregator.getDocCounts().get(i), (2 * i) + 5); @@ -119,7 +119,7 @@ public void testBucketMergeAndDelete() throws IOException{ } } - // Put the buckets in indices 5 ... 14 in bucket 5, and delete the rest of the buckets + // Put the buckets in indices 5 ... 14 into bucket 5, and delete the rest of the buckets mergeAggregator.mergeBuckets(10, bucket -> (5 <= bucket && bucket < 15) ? 5 : -1); assertEquals(mergeAggregator.getDocCounts().size(), 10); // Confirm that the 10 other buckets were deleted diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java index aa255d3773251..413d2b34b2b1a 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java @@ -156,7 +156,6 @@ public void testBucketMergeNoDelete() throws Exception { Query query = getQueryToCollectIntoDifferentOrdinals(); Query rewrittenQuery = indexSearcher.rewrite(query); - TopDocs topDocs = indexSearcher.search(query, numDocs); SearchContext searchContext = createSearchContext(indexSearcher, createIndexSettings(), rewrittenQuery, null); when(searchContext.query()).thenReturn(rewrittenQuery); @@ -226,7 +225,6 @@ public void testBucketMergeAndDelete() throws Exception { Query query = getQueryToCollectIntoDifferentOrdinals(); Query rewrittenQuery = indexSearcher.rewrite(query); - TopDocs topDocs = indexSearcher.search(query, numDocs); SearchContext searchContext = createSearchContext(indexSearcher, createIndexSettings(), rewrittenQuery, null); when(searchContext.query()).thenReturn(rewrittenQuery); @@ -246,7 +244,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOExce public void collect(int doc, long bucket) throws IOException { if (doc == 9) { // Apply two merge operations once we reach the last bucket - // In the end, only buckets where bucket % 4 = 0, will remain + // In the end, only the buckets where (bucket % 4 = 0) will remain deferringCollector.mergeBuckets(b -> b % 2 == 0 ? b : -1); deferringCollector.mergeBuckets(b -> b % 4 == 0 ? b : -1); } From 71b1797e2bd827bad5bf30c91c8d994a57a3c62b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CJames?= Date: Thu, 16 Jul 2020 15:17:37 -0400 Subject: [PATCH 6/9] Remove unused imports --- .../aggregations/bucket/BucketsAggregator.java | 1 - .../bucket/MergingBucketsDeferringCollector.java | 1 - .../VariableWidthHistogramAggregator.java | 1 - .../bucket/BucketsAggregatorTests.java | 7 ++++--- .../MergingBucketsDeferringCollectorTests.java | 15 --------------- 5 files changed, 4 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index 0187727a22df3..cffcb61aa06af 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -47,7 +47,6 @@ import java.util.function.IntConsumer; import java.util.function.LongUnaryOperator; import java.util.function.ToLongFunction; -import java.util.function.UnaryOperator; public abstract class BucketsAggregator extends AggregatorBase { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java index 8b4997151c1ee..a36fb64c3f11d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java @@ -26,7 +26,6 @@ import java.util.ArrayList; import java.util.List; import java.util.function.LongUnaryOperator; -import java.util.function.UnaryOperator; /** * A specialization of {@link BestBucketsDeferringCollector} that collects all diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java index 4273c165e3a22..4bc350c0274df 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java @@ -52,7 +52,6 @@ import java.util.Map; import java.util.function.Function; import java.util.function.LongUnaryOperator; -import java.util.function.UnaryOperator; public class VariableWidthHistogramAggregator extends DeferableBucketAggregator { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java index 8a5ef7d61dd2a..824e1d96d7865 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java @@ -38,12 +38,10 @@ import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.internal.SearchContext; -import org.junit.Before; import java.io.IOException; import static org.elasticsearch.search.aggregations.MultiBucketConsumerService.DEFAULT_MAX_BUCKETS; -import static org.hamcrest.Matchers.equalTo; public class BucketsAggregatorTests extends AggregatorTestCase{ @@ -62,7 +60,10 @@ public BucketsAggregator buildMergeAggregator() throws IOException{ indexSearcher, createIndexSettings(), null, - new MultiBucketConsumerService.MultiBucketConsumer(DEFAULT_MAX_BUCKETS, new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)), + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), new NumberFieldMapper.NumberFieldType("test", NumberFieldMapper.NumberType.INTEGER) ); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java index 413d2b34b2b1a..0bd0dd0dc4f31 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java @@ -18,34 +18,24 @@ */ package org.elasticsearch.search.aggregations.bucket; -import org.apache.lucene.search.Scorable; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; import org.apache.lucene.document.NumericDocValuesField; -import org.apache.lucene.document.SortedNumericDocValuesField; -import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.RandomIndexWriter; -import org.apache.lucene.index.Term; import org.apache.lucene.search.BulkScorer; import org.apache.lucene.search.ConstantScoreScorer; import org.apache.lucene.search.ConstantScoreWeight; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.LeafCollector; -import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.QueryVisitor; -import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Scorer; -import org.apache.lucene.search.TermQuery; -import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.Weight; import org.apache.lucene.store.Directory; import org.apache.lucene.util.Bits; @@ -56,15 +46,10 @@ import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Set; -import static com.carrotsearch.randomizedtesting.RandomizedTest.randomInt; import static org.apache.lucene.util.LuceneTestCase.newDirectory; -import static org.apache.lucene.util.LuceneTestCase.random; import static org.mockito.Mockito.when; public class MergingBucketsDeferringCollectorTests extends AggregatorTestCase { From 8df97487b7b236914f3158c4279cc0d55282424d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CJames?= Date: Fri, 17 Jul 2020 14:38:38 -0400 Subject: [PATCH 7/9] Wrap the MergingBucketsDeferringCollector and remove the need to implement a custom query, in MergingBucketsDeferringCollectorTests --- ...MergingBucketsDeferringCollectorTests.java | 111 +++++------------- 1 file changed, 31 insertions(+), 80 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java index 0bd0dd0dc4f31..011531f0c9294 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java @@ -18,7 +18,9 @@ */ package org.elasticsearch.search.aggregations.bucket; -import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.util.packed.PackedInts; +import org.apache.lucene.util.packed.PackedLongValues; import org.apache.lucene.document.Document; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.DirectoryReader; @@ -26,19 +28,11 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.search.BulkScorer; -import org.apache.lucene.search.ConstantScoreScorer; -import org.apache.lucene.search.ConstantScoreWeight; -import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.LeafCollector; import org.apache.lucene.search.Query; -import org.apache.lucene.search.QueryVisitor; import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.Scorer; -import org.apache.lucene.search.Weight; import org.apache.lucene.store.Directory; -import org.apache.lucene.util.Bits; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.BucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollector; @@ -54,73 +48,39 @@ public class MergingBucketsDeferringCollectorTests extends AggregatorTestCase { - /** - * Usually all documents get collected into ordinal 0 unless they are part of a sub aggregation - * @return a query that collects the i'th document into bucket ordinal i - */ - private Query getQueryToCollectIntoDifferentOrdinals() { - return new Query() { - @Override - public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) { - return new ConstantScoreWeight(this, boost) { + public MergingBucketsDeferringCollector getMergingBucketsDeferringCollector(SearchContext searchContext){ - @Override - public Scorer scorer(LeafReaderContext context) throws IOException { - return new ConstantScoreScorer(this, score(), scoreMode, DocIdSetIterator.all(context.reader().maxDoc())); - } + return new MergingBucketsDeferringCollector(searchContext, false) { - @Override - public boolean isCacheable(LeafReaderContext ctx) { - return false; - } + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { + super.getLeafCollector(ctx); + + return new LeafBucketCollector() { + int lastDoc = 0; @Override - public BulkScorer bulkScorer(LeafReaderContext context) throws IOException { - if (scoreMode == ScoreMode.TOP_SCORES) { - return super.bulkScorer(context); + public void collect(int doc, long bucket) throws IOException { + // Force each doc to be collected into a different ordinal so that there are buckets to merge + // Otherwise, they will all be collected into ordinal 0 by default + bucket = doc; + + if (context == null) { + context = ctx; + docDeltasBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT); + bucketsBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT); } - final float score = score(); - final int maxDoc = context.reader().maxDoc(); - return new BulkScorer() { - @Override - public int score(LeafCollector collector, Bits acceptDocs, int min, int max) throws IOException { - LeafBucketCollector leafBucketCollector = (LeafBucketCollector) collector; - max = Math.min(max, maxDoc); - for (int doc = min; doc < max; ++doc) { - if (acceptDocs == null || acceptDocs.get(doc)) { - leafBucketCollector.collect(doc, doc); - } - } - return max == maxDoc ? DocIdSetIterator.NO_MORE_DOCS : max; - } - - @Override - public long cost() { - return maxDoc; - } - }; + docDeltasBuilder.add(doc - lastDoc); + bucketsBuilder.add(bucket); + lastDoc = doc; + maxBucket = Math.max(maxBucket, bucket); } }; } @Override - public String toString(String field) { - return "*:*"; - } - - @Override - public boolean equals(Object o) { - return sameClassAs(o); - } - - @Override - public int hashCode() { - return classHash(); - } - - @Override - public void visit(QueryVisitor visitor) { - visitor.visitLeaf(this); + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE; } }; } @@ -139,17 +99,12 @@ public void testBucketMergeNoDelete() throws Exception { try (IndexReader indexReader = DirectoryReader.open(directory)) { IndexSearcher indexSearcher = new IndexSearcher(indexReader); - Query query = getQueryToCollectIntoDifferentOrdinals(); + Query query = new MatchAllDocsQuery(); Query rewrittenQuery = indexSearcher.rewrite(query); SearchContext searchContext = createSearchContext(indexSearcher, createIndexSettings(), rewrittenQuery, null); when(searchContext.query()).thenReturn(rewrittenQuery); - MergingBucketsDeferringCollector deferringCollector = new MergingBucketsDeferringCollector(searchContext, false) { - @Override - public ScoreMode scoreMode() { - return ScoreMode.COMPLETE; - } - }; + MergingBucketsDeferringCollector deferringCollector = getMergingBucketsDeferringCollector(searchContext); BucketCollector bc = new BucketCollector() { @@ -208,17 +163,12 @@ public void testBucketMergeAndDelete() throws Exception { try (IndexReader indexReader = DirectoryReader.open(directory)) { IndexSearcher indexSearcher = new IndexSearcher(indexReader); - Query query = getQueryToCollectIntoDifferentOrdinals(); + Query query = new MatchAllDocsQuery(); Query rewrittenQuery = indexSearcher.rewrite(query); SearchContext searchContext = createSearchContext(indexSearcher, createIndexSettings(), rewrittenQuery, null); when(searchContext.query()).thenReturn(rewrittenQuery); - MergingBucketsDeferringCollector deferringCollector = new MergingBucketsDeferringCollector(searchContext, false) { - @Override - public ScoreMode scoreMode() { - return ScoreMode.COMPLETE; - } - }; + MergingBucketsDeferringCollector deferringCollector = getMergingBucketsDeferringCollector(searchContext); BucketCollector bc = new BucketCollector() { @@ -227,6 +177,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOExce return new LeafBucketCollector() { @Override public void collect(int doc, long bucket) throws IOException { + bucket = doc; if (doc == 9) { // Apply two merge operations once we reach the last bucket // In the end, only the buckets where (bucket % 4 = 0) will remain From e286f0a83c83e6204ab5bb7f0bb3d4b9b8a4faa1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CJames?= Date: Fri, 17 Jul 2020 16:15:20 -0400 Subject: [PATCH 8/9] Resolve merge conflict --- .../bucket/histogram/VariableWidthHistogramAggregator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java index f83022e0dc649..0779972ce4603 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java @@ -265,7 +265,7 @@ private void bucketBufferedDocs(final DoubleArray buffer, final int bufferSize, } } - mergeBuckets(mergeMap, shardSize); + mergeBuckets(mergeMap, bucketOrd + 1); if (deferringCollector != null) { deferringCollector.mergeBuckets(mergeMap); } From 822dfd624c1bf4105a731a846b4d1056fddffe89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CJames?= Date: Wed, 22 Jul 2020 13:14:29 -0400 Subject: [PATCH 9/9] Add patch from @nik9000 to make the MergingBucketsDeferringCollectorTests much cleaner --- .../MergingBucketsDeferringCollector.java | 15 -- ...MergingBucketsDeferringCollectorTests.java | 228 +++++++----------- 2 files changed, 90 insertions(+), 153 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java index a36fb64c3f11d..db2a4b4df16bf 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java @@ -138,19 +138,4 @@ public void mergeBuckets(LongUnaryOperator mergeMap){ bucketsBuilder = newBuckets; } } - - /** - * Utility method for testing (see MergingBucketsDeferringCollectorTests) - * @return ordered list of bucket ordinals being stored - */ - List getBuckets(){ - List buckets = new ArrayList<>(); - for (Entry sourceEntry : entries) { - for (PackedLongValues.Iterator itr = sourceEntry.buckets.iterator(); itr.hasNext(); ) { - long bucket = itr.next(); - buckets.add(bucket); - } - } - return buckets; - } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java index 011531f0c9294..04a6c48e86009 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java @@ -18,81 +18,95 @@ */ package org.elasticsearch.search.aggregations.bucket; -import org.apache.lucene.search.MatchAllDocsQuery; -import org.apache.lucene.util.packed.PackedInts; -import org.apache.lucene.util.packed.PackedLongValues; import org.apache.lucene.document.Document; -import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.store.Directory; +import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.BucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollector; -import org.elasticsearch.search.aggregations.bucket.MergingBucketsDeferringCollector; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; -import static org.apache.lucene.util.LuceneTestCase.newDirectory; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.when; public class MergingBucketsDeferringCollectorTests extends AggregatorTestCase { - - public MergingBucketsDeferringCollector getMergingBucketsDeferringCollector(SearchContext searchContext){ - - return new MergingBucketsDeferringCollector(searchContext, false) { - + public void testBucketMergeNoDelete() throws Exception { + testCase((deferringCollector, delegate) -> new LeafBucketCollector() { @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { - super.getLeafCollector(ctx); + public void collect(int doc, long owningBucketOrd) throws IOException { + assert owningBucketOrd == 0; // Only collects at top level + delegate.collect(doc, doc); + if (doc == 7) { + deferringCollector.mergeBuckets(oldBucket -> 0); + } + } + }, (deferringCollector, finalCollector) -> { + deferringCollector.prepareSelectedBuckets(0, 8, 9); - return new LeafBucketCollector() { - int lastDoc = 0; + equalTo(Map.of(0L, List.of(0, 1, 2, 3, 4, 5, 6, 7), 1L, List.of(8), 2L, List.of(9))); + }); + } - @Override - public void collect(int doc, long bucket) throws IOException { - // Force each doc to be collected into a different ordinal so that there are buckets to merge - // Otherwise, they will all be collected into ordinal 0 by default - bucket = doc; - - if (context == null) { - context = ctx; - docDeltasBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT); - bucketsBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT); - } - docDeltasBuilder.add(doc - lastDoc); - bucketsBuilder.add(bucket); - lastDoc = doc; - maxBucket = Math.max(maxBucket, bucket); - } - }; + public void testBucketMergeAndDelete() throws Exception { + testCase((deferringCollector, delegate) -> new LeafBucketCollector() { + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + assert owningBucketOrd == 0; // Only collects at top level + delegate.collect(doc, doc); + if (doc == 7) { + deferringCollector.mergeBuckets(oldBucket -> oldBucket > 3 ? 0 : -1); + } } + }, (deferringCollector, finalCollector) -> { + deferringCollector.prepareSelectedBuckets(0, 8, 9); + + assertThat(finalCollector.collection, equalTo(Map.of(0L, List.of(4, 5, 6, 7), 1L, List.of(8), 2L, List.of(9)))); + }); + } + @AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/60021") + public void testBucketMergeAndDeleteLastEntry() throws Exception { + testCase((deferringCollector, delegate) -> new LeafBucketCollector() { @Override - public ScoreMode scoreMode() { - return ScoreMode.COMPLETE; + public void collect(int doc, long owningBucketOrd) throws IOException { + assert owningBucketOrd == 0; // Only collects at top level + delegate.collect(doc, doc); + if (doc == 7) { + deferringCollector.mergeBuckets(oldBucket -> oldBucket <= 3 ? 0 : -1); + } } - }; + }, (deferringCollector, finalCollector) -> { + deferringCollector.prepareSelectedBuckets(0, 8, 9); + + assertThat(finalCollector.collection, equalTo(Map.of(0L, List.of(0, 1, 2, 3), 1L, List.of(8), 2L, List.of(9)))); + }); } - public void testBucketMergeNoDelete() throws Exception { + private void testCase( + BiFunction leafCollector, + CheckedBiConsumer verify + ) throws IOException { try (Directory directory = newDirectory()) { - int numDocs = 10; try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { for (int i = 0; i < 10; i++) { - Document document = new Document(); - document.add(new NumericDocValuesField("field", 3 * i)); - indexWriter.addDocument(document); + indexWriter.addDocument(new Document()); } } @@ -100,121 +114,59 @@ public void testBucketMergeNoDelete() throws Exception { IndexSearcher indexSearcher = new IndexSearcher(indexReader); Query query = new MatchAllDocsQuery(); - Query rewrittenQuery = indexSearcher.rewrite(query); - - SearchContext searchContext = createSearchContext(indexSearcher, createIndexSettings(), rewrittenQuery, null); - when(searchContext.query()).thenReturn(rewrittenQuery); - MergingBucketsDeferringCollector deferringCollector = getMergingBucketsDeferringCollector(searchContext); - - BucketCollector bc = new BucketCollector() { + SearchContext searchContext = createSearchContext(indexSearcher, createIndexSettings(), query, null); + when(searchContext.query()).thenReturn(query); + MergingBucketsDeferringCollector deferringCollector = new MergingBucketsDeferringCollector(searchContext, false); + CollectingBucketCollector finalCollector = new CollectingBucketCollector(); + deferringCollector.setDeferredCollector(Collections.singleton(finalCollector)); + deferringCollector.preCollection(); + indexSearcher.search(query, new BucketCollector() { @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { - return new LeafBucketCollector() { - @Override - public void collect(int doc, long bucket) throws IOException { - if (doc == 9) { - deferringCollector.mergeBuckets(b -> 9 - b); - } - } - }; + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE_NO_SCORES; } @Override - public void preCollection() throws IOException { - } + public void preCollection() throws IOException {} @Override - public void postCollection() throws IOException { - } + public void postCollection() throws IOException {} @Override - public ScoreMode scoreMode() { - return ScoreMode.COMPLETE_NO_SCORES; + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { + LeafBucketCollector delegate = deferringCollector.getLeafCollector(ctx); + return leafCollector.apply(deferringCollector, delegate); } - }; - - deferringCollector.setDeferredCollector(Collections.singleton(bc)); - deferringCollector.preCollection(); - indexSearcher.search(query, deferringCollector); + }); deferringCollector.postCollection(); - deferringCollector.prepareSelectedBuckets(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); - - List ordinals = deferringCollector.getBuckets(); - assertEquals(ordinals.size(), 10); - for (int i = 0; i < ordinals.size(); i++) { - assertEquals(i, 9 - ordinals.get(i)); - } + verify.accept(deferringCollector, finalCollector); } } } - public void testBucketMergeAndDelete() throws Exception { - try (Directory directory = newDirectory()) { - int numDocs = 10; - try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { - for (int i = 0; i < 10; i++) { - Document document = new Document(); - document.add(new NumericDocValuesField("field", 3 * i)); - indexWriter.addDocument(document); - } - } - - try (IndexReader indexReader = DirectoryReader.open(directory)) { - IndexSearcher indexSearcher = new IndexSearcher(indexReader); - - Query query = new MatchAllDocsQuery(); - Query rewrittenQuery = indexSearcher.rewrite(query); - - SearchContext searchContext = createSearchContext(indexSearcher, createIndexSettings(), rewrittenQuery, null); - when(searchContext.query()).thenReturn(rewrittenQuery); - MergingBucketsDeferringCollector deferringCollector = getMergingBucketsDeferringCollector(searchContext); - - BucketCollector bc = new BucketCollector() { - - @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { - return new LeafBucketCollector() { - @Override - public void collect(int doc, long bucket) throws IOException { - bucket = doc; - if (doc == 9) { - // Apply two merge operations once we reach the last bucket - // In the end, only the buckets where (bucket % 4 = 0) will remain - deferringCollector.mergeBuckets(b -> b % 2 == 0 ? b : -1); - deferringCollector.mergeBuckets(b -> b % 4 == 0 ? b : -1); - } - } - }; - } + private class CollectingBucketCollector extends BucketCollector { + final Map> collection = new HashMap<>(); - @Override - public void preCollection() throws IOException { - } - - @Override - public void postCollection() throws IOException { - } + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE_NO_SCORES; + } - @Override - public ScoreMode scoreMode() { - return ScoreMode.COMPLETE_NO_SCORES; - } - }; + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { + return new LeafBucketCollector() { + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + collection.computeIfAbsent(owningBucketOrd, k -> new ArrayList<>()).add(doc); + } + }; + } - deferringCollector.setDeferredCollector(Collections.singleton(bc)); - deferringCollector.preCollection(); - indexSearcher.search(query, deferringCollector); - deferringCollector.postCollection(); - deferringCollector.prepareSelectedBuckets(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + @Override + public void preCollection() throws IOException {} - List ordinals = deferringCollector.getBuckets(); - assertEquals(ordinals.size(), 3); - assertEquals(0L, (long)ordinals.get(0)); - assertEquals(4L, (long)ordinals.get(1)); - assertEquals(8L, (long)ordinals.get(2)); - } - } + @Override + public void postCollection() throws IOException {} } } -