Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -444,26 +444,22 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);

// adding empty buckets if needed
if (minDocCount == 0) {
addEmptyBuckets(reducedBuckets, reduceContext);
}

if (InternalOrder.isKeyAsc(order) || reduceContext.isFinalReduce() == false) {
// nothing to do, data are already sorted since shards return
// sorted buckets and the merge-sort performed by reduceBuckets
// maintains order
} else if (InternalOrder.isKeyDesc(order)) {
// we just need to reverse here...
List<Bucket> reverse = new ArrayList<>(reducedBuckets);
Collections.reverse(reverse);
reducedBuckets = reverse;
} else {
// sorted by compound order or sub-aggregation, need to fall back to a costly n*log(n) sort
CollectionUtil.introSort(reducedBuckets, order.comparator(null));
if (reduceContext.isFinalReduce()) {
if (minDocCount == 0) {
addEmptyBuckets(reducedBuckets, reduceContext);
}
if (InternalOrder.isKeyDesc(order)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a few minutes to convince myself that this works. :)

On first glance the behavior changes because sorting is only done on the final reduction now (instead of every reduction), which I thought might break reduceBuckets() as that relies on consistent ordering. But histos/date_histos sort their shard results by key:asc, so this isn't actually a problem.

Could we add a test to DateHistogramAggregatorTests/HistogramAggregatorTests that randomly chooses a sort order + min_doc_count: 0 to ensure this internal contract never changes in the future? It looks like none of those tests set an order so we might not notice otherwise.

Copy link
Member Author

@javanna javanna Nov 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I changed anything around sorting. That if was hard to read, I rewrote it but the only actual change is that empty buckets are only added in the final reduction phase. I am working on tests, I will add what you suggest, I had other additions as well in mind but I hit some roadblocks (see #36004). Basically empty buckets were not tested in our unit tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We chatted about this in slack, and I'm just bad at boolean logic. All good here :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a check to the base class which verifies that no buckets are added in non final reduce phases. Now that we test adding empty buckets for histogram and date histogram aggs, this check makes some sense I think. These are the only aggs that add buckets as part of reduce right? I wonder if I need to check whether there is some other missing test coverage somewhere.

Also, I worked a bit on increasing test coverage in DateHistogramAggregatorTests like you suggested, and I think I prefer doing it in a follow-up if still necessary.

// we just need to reverse here...
List<Bucket> reverse = new ArrayList<>(reducedBuckets);
Collections.reverse(reverse);
reducedBuckets = reverse;
} else if (InternalOrder.isKeyAsc(order) == false){
// nothing to do when sorting by key ascending, as data is already sorted since shards return
// sorted buckets and the merge-sort performed by reduceBuckets maintains order.
// otherwise, sorted by compound order or sub-aggregation, we need to fall back to a costly n*log(n) sort
CollectionUtil.introSort(reducedBuckets, order.comparator(null));
}
}

return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo,
format, keyed, pipelineAggregators(), getMetaData());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,26 +421,22 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);

// adding empty buckets if needed
if (minDocCount == 0) {
addEmptyBuckets(reducedBuckets, reduceContext);
}

if (InternalOrder.isKeyAsc(order) || reduceContext.isFinalReduce() == false) {
// nothing to do, data are already sorted since shards return
// sorted buckets and the merge-sort performed by reduceBuckets
// maintains order
} else if (InternalOrder.isKeyDesc(order)) {
// we just need to reverse here...
List<Bucket> reverse = new ArrayList<>(reducedBuckets);
Collections.reverse(reverse);
reducedBuckets = reverse;
} else {
// sorted by compound order or sub-aggregation, need to fall back to a costly n*log(n) sort
CollectionUtil.introSort(reducedBuckets, order.comparator(null));
if (reduceContext.isFinalReduce()) {
if (minDocCount == 0) {
addEmptyBuckets(reducedBuckets, reduceContext);
}
if (InternalOrder.isKeyDesc(order)) {
// we just need to reverse here...
List<Bucket> reverse = new ArrayList<>(reducedBuckets);
Collections.reverse(reverse);
reducedBuckets = reverse;
} else if (InternalOrder.isKeyAsc(order) == false){
// nothing to do when sorting by key ascending, as data is already sorted since shards return
// sorted buckets and the merge-sort performed by reduceBuckets maintains order.
// otherwise, sorted by compound order or sub-aggregation, we need to fall back to a costly n*log(n) sort
CollectionUtil.introSort(reducedBuckets, order.comparator(null));
}
}

return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, pipelineAggregators(),
getMetaData());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,19 @@
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalBucketMetricValue;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedBucketMetricValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedDerivative;
import org.elasticsearch.search.aggregations.pipeline.ParsedExtendedStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.ParsedPercentilesBucket;
import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ParsedExtendedStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ParsedDerivative;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -151,6 +151,7 @@
import static org.elasticsearch.test.XContentTestUtils.insertRandomFields;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public abstract class InternalAggregationTestCase<T extends InternalAggregation> extends AbstractWireSerializingTestCase<T> {
public static final int DEFAULT_MAX_BUCKETS = 100000;
Expand Down Expand Up @@ -267,7 +268,14 @@ public void testReduceRandom() {
new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer,false);
@SuppressWarnings("unchecked")
T reduced = (T) inputs.get(0).reduce(internalAggregations, context);
assertMultiBucketConsumer(reduced, bucketConsumer);
int initialBucketCount = 0;
for (InternalAggregation internalAggregation : internalAggregations) {
initialBucketCount += countInnerBucket(internalAggregation);
}
int reducedBucketCount = countInnerBucket(reduced);
//check that non final reduction never adds buckets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

assertThat(reducedBucketCount, lessThanOrEqualTo(initialBucketCount));
assertMultiBucketConsumer(reducedBucketCount, bucketConsumer);
toReduce = new ArrayList<>(toReduce.subList(r, toReduceSize));
toReduce.add(reduced);
}
Expand Down Expand Up @@ -332,14 +340,14 @@ protected NamedXContentRegistry xContentRegistry() {

public final void testFromXContent() throws IOException {
final T aggregation = createTestInstance();
final Aggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), false);
assertFromXContent(aggregation, (ParsedAggregation) parsedAggregation);
final ParsedAggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), false);
assertFromXContent(aggregation, parsedAggregation);
}

public final void testFromXContentWithRandomFields() throws IOException {
final T aggregation = createTestInstance();
final Aggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), true);
assertFromXContent(aggregation, (ParsedAggregation) parsedAggregation);
final ParsedAggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), true);
assertFromXContent(aggregation, parsedAggregation);
}

protected abstract void assertFromXContent(T aggregation, ParsedAggregation parsedAggregation) throws IOException;
Expand Down Expand Up @@ -423,6 +431,10 @@ protected static DocValueFormat randomNumericDocValueFormat() {
}

public static void assertMultiBucketConsumer(Aggregation agg, MultiBucketConsumer bucketConsumer) {
assertThat(bucketConsumer.getCount(), equalTo(countInnerBucket(agg)));
assertMultiBucketConsumer(countInnerBucket(agg), bucketConsumer);
}

private static void assertMultiBucketConsumer(int innerBucketCount, MultiBucketConsumer bucketConsumer) {
assertThat(bucketConsumer.getCount(), equalTo(innerBucketCount));
}
}