From 5f9b40d6fbdbe458c5366a1d9a457090db332bba Mon Sep 17 00:00:00 2001 From: javanna Date: Mon, 26 Nov 2018 22:32:50 +0100 Subject: [PATCH 1/2] Add empty buckets only in the final reduce step Empty buckets don't need to be added when performing an incremental reduction step, they can be added later in the final reduction step. This should allow us to later remove the max buckets limit when performing non final reduction. --- .../histogram/InternalDateHistogram.java | 34 ++++++++----------- .../bucket/histogram/InternalHistogram.java | 34 ++++++++----------- 2 files changed, 30 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index 5d6ec6f93b732..496f8efc60ccf 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -444,26 +444,22 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { @Override public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { List reducedBuckets = reduceBuckets(aggregations, reduceContext); - - // adding empty buckets if needed - if (minDocCount == 0) { - addEmptyBuckets(reducedBuckets, reduceContext); - } - - if (InternalOrder.isKeyAsc(order) || reduceContext.isFinalReduce() == false) { - // nothing to do, data are already sorted since shards return - // sorted buckets and the merge-sort performed by reduceBuckets - // maintains order - } else if (InternalOrder.isKeyDesc(order)) { - // we just need to reverse here... - List reverse = new ArrayList<>(reducedBuckets); - Collections.reverse(reverse); - reducedBuckets = reverse; - } else { - // sorted by compound order or sub-aggregation, need to fall back to a costly n*log(n) sort - CollectionUtil.introSort(reducedBuckets, order.comparator(null)); + if (reduceContext.isFinalReduce()) { + if (minDocCount == 0) { + addEmptyBuckets(reducedBuckets, reduceContext); + } + if (InternalOrder.isKeyDesc(order)) { + // we just need to reverse here... + List reverse = new ArrayList<>(reducedBuckets); + Collections.reverse(reverse); + reducedBuckets = reverse; + } else if (InternalOrder.isKeyAsc(order) == false){ + // nothing to do when sorting by key ascending, as data is already sorted since shards return + // sorted buckets and the merge-sort performed by reduceBuckets maintains order. + // otherwise, sorted by compound order or sub-aggregation, we need to fall back to a costly n*log(n) sort + CollectionUtil.introSort(reducedBuckets, order.comparator(null)); + } } - return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo, format, keyed, pipelineAggregators(), getMetaData()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index d26ac47c9ea25..9f93929c0a186 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -421,26 +421,22 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { @Override public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { List reducedBuckets = reduceBuckets(aggregations, reduceContext); - - // adding empty buckets if needed - if (minDocCount == 0) { - addEmptyBuckets(reducedBuckets, reduceContext); - } - - if (InternalOrder.isKeyAsc(order) || reduceContext.isFinalReduce() == false) { - // nothing to do, data are already sorted since shards return - // sorted buckets and the merge-sort performed by reduceBuckets - // maintains order - } else if (InternalOrder.isKeyDesc(order)) { - // we just need to reverse here... - List reverse = new ArrayList<>(reducedBuckets); - Collections.reverse(reverse); - reducedBuckets = reverse; - } else { - // sorted by compound order or sub-aggregation, need to fall back to a costly n*log(n) sort - CollectionUtil.introSort(reducedBuckets, order.comparator(null)); + if (reduceContext.isFinalReduce()) { + if (minDocCount == 0) { + addEmptyBuckets(reducedBuckets, reduceContext); + } + if (InternalOrder.isKeyDesc(order)) { + // we just need to reverse here... + List reverse = new ArrayList<>(reducedBuckets); + Collections.reverse(reverse); + reducedBuckets = reverse; + } else if (InternalOrder.isKeyAsc(order) == false){ + // nothing to do when sorting by key ascending, as data is already sorted since shards return + // sorted buckets and the merge-sort performed by reduceBuckets maintains order. + // otherwise, sorted by compound order or sub-aggregation, we need to fall back to a costly n*log(n) sort + CollectionUtil.introSort(reducedBuckets, order.comparator(null)); + } } - return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, pipelineAggregators(), getMetaData()); } From 9f55e45944403b8ee2d3f76ed2431b5b552febeb Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 30 Nov 2018 17:21:36 +0100 Subject: [PATCH 2/2] add check that no aggs add empty buckets in their non final reduce phase --- .../test/InternalAggregationTestCase.java | 40 ++++++++++++------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index 6e4dfc8fe254c..59327121c9038 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -119,19 +119,19 @@ import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; -import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.InternalBucketMetricValue; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.ParsedBucketMetricValue; +import org.elasticsearch.search.aggregations.pipeline.ParsedDerivative; +import org.elasticsearch.search.aggregations.pipeline.ParsedExtendedStatsBucket; import org.elasticsearch.search.aggregations.pipeline.ParsedPercentilesBucket; -import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue; import org.elasticsearch.search.aggregations.pipeline.ParsedStatsBucket; +import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.ParsedExtendedStatsBucket; -import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.ParsedDerivative; import java.io.IOException; import java.util.ArrayList; @@ -151,6 +151,7 @@ import static org.elasticsearch.test.XContentTestUtils.insertRandomFields; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; public abstract class InternalAggregationTestCase extends AbstractWireSerializingTestCase { public static final int DEFAULT_MAX_BUCKETS = 100000; @@ -267,7 +268,14 @@ public void testReduceRandom() { new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer,false); @SuppressWarnings("unchecked") T reduced = (T) inputs.get(0).reduce(internalAggregations, context); - assertMultiBucketConsumer(reduced, bucketConsumer); + int initialBucketCount = 0; + for (InternalAggregation internalAggregation : internalAggregations) { + initialBucketCount += countInnerBucket(internalAggregation); + } + int reducedBucketCount = countInnerBucket(reduced); + //check that non final reduction never adds buckets + assertThat(reducedBucketCount, lessThanOrEqualTo(initialBucketCount)); + assertMultiBucketConsumer(reducedBucketCount, bucketConsumer); toReduce = new ArrayList<>(toReduce.subList(r, toReduceSize)); toReduce.add(reduced); } @@ -332,14 +340,14 @@ protected NamedXContentRegistry xContentRegistry() { public final void testFromXContent() throws IOException { final T aggregation = createTestInstance(); - final Aggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), false); - assertFromXContent(aggregation, (ParsedAggregation) parsedAggregation); + final ParsedAggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), false); + assertFromXContent(aggregation, parsedAggregation); } public final void testFromXContentWithRandomFields() throws IOException { final T aggregation = createTestInstance(); - final Aggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), true); - assertFromXContent(aggregation, (ParsedAggregation) parsedAggregation); + final ParsedAggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), true); + assertFromXContent(aggregation, parsedAggregation); } protected abstract void assertFromXContent(T aggregation, ParsedAggregation parsedAggregation) throws IOException; @@ -423,6 +431,10 @@ protected static DocValueFormat randomNumericDocValueFormat() { } public static void assertMultiBucketConsumer(Aggregation agg, MultiBucketConsumer bucketConsumer) { - assertThat(bucketConsumer.getCount(), equalTo(countInnerBucket(agg))); + assertMultiBucketConsumer(countInnerBucket(agg), bucketConsumer); + } + + private static void assertMultiBucketConsumer(int innerBucketCount, MultiBucketConsumer bucketConsumer) { + assertThat(bucketConsumer.getCount(), equalTo(innerBucketCount)); } }