diff --git a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java index c70ddea7e3b22..5fda5af7418a3 100644 --- a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java +++ b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java @@ -233,7 +233,7 @@ public Object getProperty(List path) { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { // merge stats across all shards List aggs = new ArrayList<>(aggregations); aggs.removeIf(p -> ((InternalMatrixStats)p).stats == null); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 0bd7afa7f61c2..27b5c9cf3b2a8 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -487,7 +487,7 @@ private ReducedQueryPhase reducedQueryPhase(Collectionmost cases, the assumption will be the all given * aggregations are of the same type (the same type as this aggregation). For best efficiency, when implementing, * try reusing an existing instance (typically the first in the given list) to save on redundant object * construction. */ - public final InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { - InternalAggregation aggResult = doReduce(aggregations, reduceContext); - if (reduceContext.isFinalReduce()) { - for (PipelineAggregator pipelineAggregator : pipelineAggregators) { - aggResult = pipelineAggregator.reduce(aggResult, reduceContext); - } - } - return aggResult; - } - - public abstract InternalAggregation doReduce(List aggregations, ReduceContext reduceContext); + public abstract InternalAggregation reduce(List aggregations, ReduceContext reduceContext); /** * Return true if this aggregation is mapped, and can lead a reduction. If this agg returns diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index e1597c5c8c063..b4fdd74a10b85 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** * An internal implementation of {@link Aggregations}. @@ -91,10 +92,47 @@ public List getTopLevelPipelineAggregators() { return topLevelPipelineAggregators; } + @SuppressWarnings("unchecked") + private List getInternalAggregations() { + return (List) aggregations; + } + + /** + * Begin the reduction process. This should be the entry point for the "first" reduction, e.g. called by + * SearchPhaseController or anywhere else that wants to initiate a reduction. It _should not_ be called + * as an intermediate reduction step (e.g. in the middle of an aggregation tree). + * + * This method first reduces the aggregations, and if it is the final reduce, then reduce the pipeline + * aggregations (both embedded parent/sibling as well as top-level sibling pipelines) + */ + public static InternalAggregations topLevelReduce(List aggregationsList, ReduceContext context) { + InternalAggregations reduced = reduce(aggregationsList, context); + if (reduced == null) { + return null; + } + + if (context.isFinalReduce()) { + List reducedInternalAggs = reduced.getInternalAggregations(); + reducedInternalAggs = reducedInternalAggs.stream() + .map(agg -> agg.reducePipelines(agg, context)) + .collect(Collectors.toList()); + + List topLevelPipelineAggregators = aggregationsList.get(0).getTopLevelPipelineAggregators(); + for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) { + InternalAggregation newAgg + = pipelineAggregator.doReduce(new InternalAggregations(reducedInternalAggs), context); + reducedInternalAggs.add(newAgg); + } + return new InternalAggregations(reducedInternalAggs); + } + return reduced; + } + /** * Reduces the given list of aggregations as well as the top-level pipeline aggregators extracted from the first * {@link InternalAggregations} object found in the list. - * Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched. + * Note that pipeline aggregations _are not_ reduced by this method. Pipelines are handled + * separately by {@link InternalAggregations#topLevelReduce(List, ReduceContext)} */ public static InternalAggregations reduce(List aggregationsList, ReduceContext context) { if (aggregationsList.isEmpty()) { @@ -123,13 +161,6 @@ public static InternalAggregations reduce(List aggregation reducedAggregations.add(first.reduce(aggregations, context)); } - if (context.isFinalReduce()) { - for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) { - InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context); - reducedAggregations.add(newAgg); - } - return new InternalAggregations(reducedAggregations); - } return new InternalAggregations(reducedAggregations, topLevelPipelineAggregators); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java index 41b1a9aef6230..cfad39166e70d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java @@ -26,6 +26,7 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -73,7 +74,7 @@ protected InternalMultiBucketAggregation(StreamInput in) throws IOException { protected abstract B reduceBucket(List buckets, ReduceContext context); @Override - public abstract List getBuckets(); + public abstract List getBuckets(); @Override public Object getProperty(List path) { @@ -141,6 +142,30 @@ public static int countInnerBucket(Aggregation agg) { return size; } + /** + * Unlike {@link InternalAggregation#reducePipelines(InternalAggregation, ReduceContext)}, a multi-bucket + * agg needs to first reduce the buckets (and their parent pipelines) before allowing sibling pipelines + * to materialize + */ + @Override + public final InternalAggregation reducePipelines(InternalAggregation reducedAggs, ReduceContext reduceContext) { + assert reduceContext.isFinalReduce(); + List materializedBuckets = reducePipelineBuckets(reduceContext); + return super.reducePipelines(create(materializedBuckets), reduceContext); + } + + private List reducePipelineBuckets(ReduceContext reduceContext) { + List reducedBuckets = new ArrayList<>(); + for (B bucket : getBuckets()) { + List aggs = new ArrayList<>(); + for (Aggregation agg : bucket.getAggregations()) { + aggs.add(((InternalAggregation)agg).reducePipelines((InternalAggregation)agg, reduceContext)); + } + reducedBuckets.add(createBucket(new InternalAggregations(aggs), bucket)); + } + return reducedBuckets; + } + public abstract static class InternalBucket implements Bucket, Writeable { public Object getProperty(String containingAggName, List path) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java index cf5ddc54884d0..0a34e7a92b8f1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java @@ -97,7 +97,7 @@ public InternalSingleBucketAggregation create(InternalAggregations subAggregatio protected abstract InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations); @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { long docCount = 0L; List subAggregationsList = new ArrayList<>(aggregations.size()); for (InternalAggregation aggregation : aggregations) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java index 57c7d703cbdf6..78181e3a3366a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java @@ -181,7 +181,7 @@ public InternalBucket getBucketByKey(String key) { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { Map> bucketsMap = new HashMap<>(); for (InternalAggregation aggregation : aggregations) { InternalAdjacencyMatrix filters = (InternalAdjacencyMatrix) aggregation; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java index e11db15acecad..243ae557bfa2c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java @@ -143,7 +143,7 @@ int[] getReverseMuls() { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { PriorityQueue pq = new PriorityQueue<>(aggregations.size()); for (InternalAggregation agg : aggregations) { InternalComposite sortedAgg = (InternalComposite) agg; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java index 271d1c54d5898..d99da0187f74b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java @@ -189,7 +189,7 @@ public InternalBucket getBucketByKey(String key) { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { List> bucketsList = null; for (InternalAggregation aggregation : aggregations) { InternalFilters filters = (InternalFilters) aggregation; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java index 61c06a062cc05..1760fc19728c4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java @@ -81,7 +81,7 @@ public List getBuckets() { } @Override - public InternalGeoGrid doReduce(List aggregations, ReduceContext reduceContext) { + public InternalGeoGrid reduce(List aggregations, ReduceContext reduceContext) { LongObjectPagedHashMap> buckets = null; for (InternalAggregation aggregation : aggregations) { InternalGeoGrid grid = (InternalGeoGrid) aggregation; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java index c776d764637dd..0d81188c8ca0e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java @@ -498,7 +498,7 @@ static int getAppropriateRounding(long minKey, long maxKey, int roundingIdx, } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { BucketReduceResult reducedBucketsResult = reduceBuckets(aggregations, reduceContext); if (reduceContext.isFinalReduce()) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index 1e79b60ca7976..1f1775ede7535 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -448,7 +448,7 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { List reducedBuckets = reduceBuckets(aggregations, reduceContext); if (reduceContext.isFinalReduce()) { if (minDocCount == 0) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index f4f7db5cd64a2..bc20534c5251b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -421,7 +421,7 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { List reducedBuckets = reduceBuckets(aggregations, reduceContext); if (reduceContext.isFinalReduce()) { if (minDocCount == 0) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java index 04252c0a25a50..21d0f2fdab5ca 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java @@ -230,7 +230,7 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { reduceContext.consumeBucketsAndMaybeBreak(buckets.size()); long[] docCounts = new long[buckets.size()]; InternalAggregations[][] aggs = new InternalAggregations[buckets.size()][]; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java index 399dc04e44f5f..a77b70a480850 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java @@ -288,7 +288,7 @@ public B createBucket(InternalAggregations aggregations, B prototype) { @SuppressWarnings("unchecked") @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { reduceContext.consumeBucketsAndMaybeBreak(ranges.size()); List[] rangeList = new List[ranges.size()]; for (int i = 0; i < rangeList.length; ++i) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java index 5f5f557ffd561..6f1a4cc6a4aad 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java @@ -49,7 +49,7 @@ public String getWriteableName() { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { return new UnmappedSampler(name, pipelineAggregators(), metaData); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java index d641a2773e615..47926760d3ba9 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java @@ -126,7 +126,7 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws } if (spare == null) { - spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format); + spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0); } spare.bucketOrd = bucketOrd; copy(lookupGlobalOrd.apply(globalOrd), spare.termBytes); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java index 49c2718baaf26..789ced9cd035e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java @@ -105,6 +105,9 @@ public long getSubsetSize() { return subsetSize; } + // TODO we should refactor to remove this, since buckets should be immutable after they are generated. + // This can lead to confusing bugs if the bucket is re-created (via createBucket() or similar) without + // the score void updateScore(SignificanceHeuristic significanceHeuristic) { score = significanceHeuristic.getScore(subsetDf, subsetSize, supersetDf, supersetSize); } @@ -191,7 +194,7 @@ protected final void doWriteTo(StreamOutput out) throws IOException { public abstract List getBuckets(); @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { long globalSubsetSize = 0; long globalSupersetSize = 0; // Compute the overall result set size and the corpus size using the diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java index 582346f529a8a..d9f4ac7e1f10f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java @@ -42,14 +42,9 @@ static class Bucket extends InternalSignificantTerms.Bucket { long term; Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations, - DocValueFormat format) { + DocValueFormat format, double score) { super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format); this.term = term; - } - - Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations, - double score) { - this(subsetDf, subsetSize, supersetDf, supersetSize, term, aggregations, null); this.score = score; } @@ -134,7 +129,7 @@ public SignificantLongTerms create(List buckets) { @Override public Bucket createBucket(InternalAggregations aggregations, SignificantLongTerms.Bucket prototype) { return new Bucket(prototype.subsetDf, prototype.subsetSize, prototype.supersetDf, prototype.supersetSize, prototype.term, - aggregations, prototype.format); + aggregations, prototype.format, prototype.score); } @Override @@ -151,6 +146,6 @@ protected Bucket[] createBucketsArray(int size) { @Override Bucket createBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations, SignificantLongTerms.Bucket prototype) { - return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, prototype.term, aggregations, format); + return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, prototype.term, aggregations, format, prototype.score); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsAggregator.java index 2fcba9f09bf7e..8684acb600a2f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsAggregator.java @@ -88,7 +88,7 @@ public SignificantLongTerms buildAggregation(long owningBucketOrdinal) throws IO continue; } if (spare == null) { - spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null, format); + spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null, format, 0); } spare.term = bucketOrds.get(i); spare.subsetDf = docCount; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java index 6c7da48a56092..cb79e52238399 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java @@ -43,9 +43,10 @@ public static class Bucket extends InternalSignificantTerms.Bucket { BytesRef termBytes; public Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations, - DocValueFormat format) { + DocValueFormat format, double score) { super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format); this.termBytes = term; + this.score = score; } /** @@ -69,12 +70,6 @@ public void writeTo(StreamOutput out) throws IOException { aggregations.writeTo(out); } - public Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize, - InternalAggregations aggregations, double score, DocValueFormat format) { - this(term, subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format); - this.score = score; - } - @Override public Number getKeyAsNumber() { // this method is needed for scripted numeric aggregations @@ -139,7 +134,7 @@ public SignificantStringTerms create(List buckets @Override public Bucket createBucket(InternalAggregations aggregations, SignificantStringTerms.Bucket prototype) { return new Bucket(prototype.termBytes, prototype.subsetDf, prototype.subsetSize, prototype.supersetDf, prototype.supersetSize, - aggregations, prototype.format); + aggregations, prototype.format, prototype.score); } @Override @@ -156,6 +151,6 @@ protected Bucket[] createBucketsArray(int size) { @Override Bucket createBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations, SignificantStringTerms.Bucket prototype) { - return new Bucket(prototype.termBytes, subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format); + return new Bucket(prototype.termBytes, subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format, prototype.score); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTermsAggregator.java index 91ade2e42f740..ed4c96f19add7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTermsAggregator.java @@ -91,7 +91,7 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws } if (spare == null) { - spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format); + spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0); } bucketOrds.get(i, spare.termBytes); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregator.java index 7f62813278b64..170b0e1fd81c5 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregator.java @@ -201,7 +201,7 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws } if (spare == null) { - spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format); + spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0); } bucketOrds.get(i, spare.termBytes); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java index d9e3c85de3a60..3b88fb0d3e460 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java @@ -105,7 +105,7 @@ Bucket createBucket(long subsetDf, long subsetSize, long supersetDf, long supers } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { return new UnmappedSignificantTerms(name, requiredSize, minDocCount, pipelineAggregators(), metaData); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java index 8bc0e83c8d6a6..9672136cf5bea 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java @@ -144,7 +144,7 @@ protected Bucket[] createBucketsArray(int size) { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { boolean promoteToDouble = false; for (InternalAggregation agg : aggregations) { if (agg instanceof LongTerms && ((LongTerms) agg).format == DocValueFormat.RAW) { @@ -157,7 +157,7 @@ public InternalAggregation doReduce(List aggregations, Redu } } if (promoteToDouble == false) { - return super.doReduce(aggregations, reduceContext); + return super.reduce(aggregations, reduceContext); } List newAggs = new ArrayList<>(aggregations.size()); for (InternalAggregation agg : aggregations) { @@ -168,7 +168,7 @@ public InternalAggregation doReduce(List aggregations, Redu newAggs.add(agg); } } - return newAggs.get(0).doReduce(newAggs, reduceContext); + return newAggs.get(0).reduce(newAggs, reduceContext); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedRareTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedRareTerms.java index 7711946226470..bc8e8198984ef 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedRareTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedRareTerms.java @@ -87,7 +87,7 @@ protected void writeTermTypeInfoTo(StreamOutput out) throws IOException { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { Map> buckets = new HashMap<>(); InternalRareTerms referenceTerms = null; SetBackedScalingCuckooFilter filter = null; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalRareTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalRareTerms.java index ae9f8e27ec6ae..dee3424621cd6 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalRareTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalRareTerms.java @@ -154,7 +154,7 @@ protected final void doWriteTo(StreamOutput out) throws IOException { public abstract B getBucketByKey(String term); @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException(); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index 3eefc9bee0100..8f45749a36304 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -192,7 +192,7 @@ protected final void doWriteTo(StreamOutput out) throws IOException { public abstract B getBucketByKey(String term); @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { Map> buckets = new HashMap<>(); long sumDocCountError = 0; long otherDocCount = 0; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java index 6a0fcde1fa053..90ebe7af36abe 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java @@ -144,13 +144,13 @@ protected Bucket[] createBucketsArray(int size) { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { for (InternalAggregation agg : aggregations) { if (agg instanceof DoubleTerms) { - return agg.doReduce(aggregations, reduceContext); + return agg.reduce(aggregations, reduceContext); } } - return super.doReduce(aggregations, reduceContext); + return super.reduce(aggregations, reduceContext); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedRareTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedRareTerms.java index c4a019e6fe9b2..58dac3ece2b24 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedRareTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedRareTerms.java @@ -93,7 +93,7 @@ protected UnmappedRareTerms createWithFilter(String name, List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { return new UnmappedRareTerms(name, pipelineAggregators(), metaData); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java index 8096366f6d655..2a1baff9ded4d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java @@ -99,7 +99,7 @@ protected UnmappedTerms create(String name, List buckets, long docCountE } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { return new UnmappedTerms(name, order, requiredSize, minDocCount, pipelineAggregators(), metaData); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractInternalHDRPercentiles.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractInternalHDRPercentiles.java index d8e043ee9b5e9..692ea1761fe3c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractInternalHDRPercentiles.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractInternalHDRPercentiles.java @@ -103,7 +103,7 @@ DoubleHistogram getState() { } @Override - public AbstractInternalHDRPercentiles doReduce(List aggregations, ReduceContext reduceContext) { + public AbstractInternalHDRPercentiles reduce(List aggregations, ReduceContext reduceContext) { DoubleHistogram merged = null; for (InternalAggregation aggregation : aggregations) { final AbstractInternalHDRPercentiles percentiles = (AbstractInternalHDRPercentiles) aggregation; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractInternalTDigestPercentiles.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractInternalTDigestPercentiles.java index ca03e2aa2b1c9..f691a438d0d07 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractInternalTDigestPercentiles.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractInternalTDigestPercentiles.java @@ -87,7 +87,7 @@ TDigestState getState() { } @Override - public AbstractInternalTDigestPercentiles doReduce(List aggregations, ReduceContext reduceContext) { + public AbstractInternalTDigestPercentiles reduce(List aggregations, ReduceContext reduceContext) { TDigestState merged = null; for (InternalAggregation aggregation : aggregations) { final AbstractInternalTDigestPercentiles percentiles = (AbstractInternalTDigestPercentiles) aggregation; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalAvg.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalAvg.java index 3e3b2ae03ea0d..199f3a7ccc872 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalAvg.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalAvg.java @@ -87,7 +87,7 @@ public String getWriteableName() { } @Override - public InternalAvg doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAvg reduce(List aggregations, ReduceContext reduceContext) { CompensatedSum kahanSummation = new CompensatedSum(0, 0); long count = 0; // Compute the sum of double values with Kahan summation algorithm which is more diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCardinality.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCardinality.java index c3132a299042e..bc2c4d88c4679 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCardinality.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCardinality.java @@ -85,7 +85,7 @@ public HyperLogLogPlusPlus getCounts() { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { InternalCardinality reduced = null; for (InternalAggregation aggregation : aggregations) { final InternalCardinality cardinality = (InternalCardinality) aggregation; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalExtendedStats.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalExtendedStats.java index 3fe2e75576aa4..5385960579451 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalExtendedStats.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalExtendedStats.java @@ -140,7 +140,7 @@ public String getStdDeviationBoundAsString(Bounds bound) { } @Override - public InternalExtendedStats doReduce(List aggregations, ReduceContext reduceContext) { + public InternalExtendedStats reduce(List aggregations, ReduceContext reduceContext) { double sumOfSqrs = 0; double compensationOfSqrs = 0; for (InternalAggregation aggregation : aggregations) { @@ -158,7 +158,7 @@ public InternalExtendedStats doReduce(List aggregations, Re sumOfSqrs = newSumOfSqrs; } } - final InternalStats stats = super.doReduce(aggregations, reduceContext); + final InternalStats stats = super.reduce(aggregations, reduceContext); return new InternalExtendedStats(name, stats.getCount(), stats.getSum(), stats.getMin(), stats.getMax(), sumOfSqrs, sigma, format, pipelineAggregators(), getMetaData()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoBounds.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoBounds.java index 4d48e4ab8966b..91007ab2f8fff 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoBounds.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoBounds.java @@ -93,7 +93,7 @@ public String getWriteableName() { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { double top = Double.NEGATIVE_INFINITY; double bottom = Double.POSITIVE_INFINITY; double posLeft = Double.POSITIVE_INFINITY; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoCentroid.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoCentroid.java index 2172d15259b85..24493273aa534 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoCentroid.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoCentroid.java @@ -114,7 +114,7 @@ public long count() { } @Override - public InternalGeoCentroid doReduce(List aggregations, ReduceContext reduceContext) { + public InternalGeoCentroid reduce(List aggregations, ReduceContext reduceContext) { double lonSum = Double.NaN; double latSum = Double.NaN; int totalCount = 0; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMax.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMax.java index 9a8458c85a690..6abb0d3a51dd8 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMax.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMax.java @@ -71,7 +71,7 @@ public double getValue() { } @Override - public InternalMax doReduce(List aggregations, ReduceContext reduceContext) { + public InternalMax reduce(List aggregations, ReduceContext reduceContext) { double max = Double.NEGATIVE_INFINITY; for (InternalAggregation aggregation : aggregations) { max = Math.max(max, ((InternalMax) aggregation).max); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMedianAbsoluteDeviation.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMedianAbsoluteDeviation.java index 871f387638dc3..b228c95c0dc3b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMedianAbsoluteDeviation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMedianAbsoluteDeviation.java @@ -80,7 +80,7 @@ protected void doWriteTo(StreamOutput out) throws IOException { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { final TDigestState valueMerged = new TDigestState(valuesSketch.compression()); for (InternalAggregation aggregation : aggregations) { final InternalMedianAbsoluteDeviation madAggregation = (InternalMedianAbsoluteDeviation) aggregation; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMin.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMin.java index f68d5a46860bc..6912f55ecafdc 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMin.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMin.java @@ -71,7 +71,7 @@ public double getValue() { } @Override - public InternalMin doReduce(List aggregations, ReduceContext reduceContext) { + public InternalMin reduce(List aggregations, ReduceContext reduceContext) { double min = Double.POSITIVE_INFINITY; for (InternalAggregation aggregation : aggregations) { min = Math.min(min, ((InternalMin) aggregation).min); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalScriptedMetric.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalScriptedMetric.java index ce051a1691b55..e6113f3763cdf 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalScriptedMetric.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalScriptedMetric.java @@ -85,7 +85,7 @@ List getAggregation() { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { List aggregationObjects = new ArrayList<>(); for (InternalAggregation aggregation : aggregations) { InternalScriptedMetric mapReduceAggregation = (InternalScriptedMetric) aggregation; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalStats.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalStats.java index adb879999a410..661457e1c1f3d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalStats.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalStats.java @@ -145,7 +145,7 @@ public double value(String name) { } @Override - public InternalStats doReduce(List aggregations, ReduceContext reduceContext) { + public InternalStats reduce(List aggregations, ReduceContext reduceContext) { long count = 0; double min = Double.POSITIVE_INFINITY; double max = Double.NEGATIVE_INFINITY; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java index 6e6315eded101..5778edb4da19a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java @@ -71,7 +71,7 @@ public double getValue() { } @Override - public InternalSum doReduce(List aggregations, ReduceContext reduceContext) { + public InternalSum reduce(List aggregations, ReduceContext reduceContext) { // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. CompensatedSum kahanSummation = new CompensatedSum(0, 0); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTopHits.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTopHits.java index 348e98302d2bc..4f922bd39d947 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTopHits.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTopHits.java @@ -99,7 +99,7 @@ int getSize() { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { final SearchHits[] shardHits = new SearchHits[aggregations.size()]; final int from; final int size; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalValueCount.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalValueCount.java index 32ee8bd36d120..0c942e1afbcaf 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalValueCount.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalValueCount.java @@ -70,7 +70,7 @@ public double value() { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { long valueCount = 0; for (InternalAggregation aggregation : aggregations) { valueCount += ((InternalValueCount) aggregation).value; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalWeightedAvg.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalWeightedAvg.java index 4b3523b03ac3d..e4c79f7f8996f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalWeightedAvg.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalWeightedAvg.java @@ -87,7 +87,7 @@ public String getWriteableName() { } @Override - public InternalWeightedAvg doReduce(List aggregations, ReduceContext reduceContext) { + public InternalWeightedAvg reduce(List aggregations, ReduceContext reduceContext) { CompensatedSum sumCompensation = new CompensatedSum(0, 0); CompensatedSum weightCompensation = new CompensatedSum(0, 0); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalBucketMetricValue.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalBucketMetricValue.java index 1acdb54080693..84b4c3b305a41 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalBucketMetricValue.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalBucketMetricValue.java @@ -85,7 +85,7 @@ DocValueFormat formatter() { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException("Not supported"); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalExtendedStatsBucket.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalExtendedStatsBucket.java index b0b78eb012042..d1f60fe30e511 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalExtendedStatsBucket.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalExtendedStatsBucket.java @@ -48,7 +48,7 @@ public String getWriteableName() { } @Override - public InternalExtendedStats doReduce(List aggregations, ReduceContext reduceContext) { + public InternalExtendedStats reduce(List aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException("Not supported"); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalPercentilesBucket.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalPercentilesBucket.java index 77493f66d643e..38eaae87c012f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalPercentilesBucket.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalPercentilesBucket.java @@ -119,7 +119,7 @@ public double value(String name) { } @Override - public InternalMax doReduce(List aggregations, ReduceContext reduceContext) { + public InternalMax reduce(List aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException("Not supported"); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java index 4f7b51b6e3b38..29c25b117467f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java @@ -76,7 +76,7 @@ DocValueFormat formatter() { } @Override - public InternalSimpleValue doReduce(List aggregations, ReduceContext reduceContext) { + public InternalSimpleValue reduce(List aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException("Not supported"); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalStatsBucket.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalStatsBucket.java index 51d3cfc060f73..c8beef459b8c8 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalStatsBucket.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalStatsBucket.java @@ -47,7 +47,7 @@ public String getWriteableName() { } @Override - public InternalStats doReduce(List aggregations, ReduceContext reduceContext) { + public InternalStats reduce(List aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException("Not supported"); } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java index cf6a9f6d838ee..7236b354ef92b 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java @@ -62,7 +62,7 @@ public void testNonFinalReduceTopLevelPipelineAggs() { List aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms), topLevelPipelineAggs)); InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, false); - InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContext); + InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, reduceContext); assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size()); assertEquals(1, reducedAggs.aggregations.size()); } @@ -78,11 +78,11 @@ public void testFinalReduceTopLevelPipelineAggs() { if (randomBoolean()) { InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms), Collections.singletonList(siblingPipelineAggregator)); - reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext); + reducedAggs = InternalAggregations.topLevelReduce(Collections.singletonList(aggs), reduceContext); } else { InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms), Collections.singletonList(siblingPipelineAggregator)); - reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext); + reducedAggs = InternalAggregations.topLevelReduce(Collections.singletonList(aggs), reduceContext); } assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size()); assertEquals(2, reducedAggs.aggregations.size()); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java index 9293b33e22f43..c6bb10fa6c947 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java @@ -36,10 +36,15 @@ import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.MultiBucketConsumerService; +import org.elasticsearch.search.aggregations.metrics.InternalMax; import org.elasticsearch.search.aggregations.metrics.InternalStats; +import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; import org.hamcrest.Matchers; import org.junit.Assert; @@ -58,9 +63,12 @@ import java.util.function.Consumer; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.equalTo; + public class AutoDateHistogramAggregatorTests extends AggregatorTestCase { private static final String DATE_FIELD = "date"; private static final String INSTANT_FIELD = "instant"; + private static final String NUMERIC_FIELD = "numeric"; private static final List DATES_WITH_TIME = Arrays.asList( ZonedDateTime.of(2010, 3, 12, 1, 7, 45, 0, ZoneOffset.UTC), @@ -718,6 +726,35 @@ public void testIntervalSecond() throws IOException { ); } + public void testWithPipelineReductions() throws IOException { + testSearchAndReduceCase(DEFAULT_QUERY, DATES_WITH_TIME, + aggregation -> aggregation.setNumBuckets(1).field(DATE_FIELD) + .subAggregation(AggregationBuilders.histogram("histo").field(NUMERIC_FIELD).interval(1) + .subAggregation(AggregationBuilders.max("max").field(NUMERIC_FIELD)) + .subAggregation(new DerivativePipelineAggregationBuilder("deriv", "max"))), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + final List buckets = histogram.getBuckets(); + assertEquals(1, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2010-01-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(10, bucket.getDocCount()); + assertThat(bucket.getAggregations().asList().size(), equalTo(1)); + InternalHistogram histo = (InternalHistogram) bucket.getAggregations().asList().get(0); + assertThat(histo.getBuckets().size(), equalTo(10)); + for (int i = 0; i < 10; i++) { + assertThat(histo.getBuckets().get(i).key, equalTo((double)i)); + assertThat(((InternalMax)histo.getBuckets().get(i).aggregations.get("max")).getValue(), equalTo((double)i)); + if (i > 0) { + assertThat(((InternalSimpleValue)histo.getBuckets().get(i).aggregations.get("deriv")).getValue(), equalTo(1.0)); + } + } + + + }); + } + private void testSearchCase(final Query query, final List dataset, final Consumer configure, final Consumer verify) throws IOException { @@ -757,6 +794,7 @@ private void executeTestCase(final boolean reduced, final Query query, final Lis try (Directory directory = newDirectory()) { try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { final Document document = new Document(); + int i = 0; for (final ZonedDateTime date : dataset) { if (frequently()) { indexWriter.commit(); @@ -765,8 +803,10 @@ private void executeTestCase(final boolean reduced, final Query query, final Lis final long instant = date.toInstant().toEpochMilli(); document.add(new SortedNumericDocValuesField(DATE_FIELD, instant)); document.add(new LongPoint(INSTANT_FIELD, instant)); + document.add(new SortedNumericDocValuesField(NUMERIC_FIELD, i)); indexWriter.addDocument(document); document.clear(); + i += 1; } } @@ -783,11 +823,19 @@ private void executeTestCase(final boolean reduced, final Query query, final Lis fieldType.setHasDocValues(true); fieldType.setName(aggregationBuilder.field()); + MappedFieldType instantFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + instantFieldType.setName(INSTANT_FIELD); + instantFieldType.setHasDocValues(true); + + MappedFieldType numericFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + numericFieldType.setName(NUMERIC_FIELD); + numericFieldType.setHasDocValues(true); + final InternalAutoDateHistogram histogram; if (reduced) { - histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType); + histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType, instantFieldType, numericFieldType); } else { - histogram = search(indexSearcher, query, aggregationBuilder, fieldType); + histogram = search(indexSearcher, query, aggregationBuilder, fieldType, instantFieldType, numericFieldType); } verify.accept(histogram); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java index fa4bce9a4e959..4ada5e349de1f 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java @@ -108,7 +108,6 @@ public void testGetAppropriateRoundingUsesCorrectIntervals() { assertThat(result, equalTo(2)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/39497") public void testReduceRandom() { super.testReduceRandom(); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java index fb9f6dd29c73f..5b8264b9e71ea 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java @@ -109,7 +109,7 @@ public void testHandlesNaN() { newBuckets.add(new InternalHistogram.Bucket(Double.NaN, b.docCount, keyed, b.format, b.aggregations)); InternalHistogram newHistogram = histogram.create(newBuckets); - newHistogram.doReduce(Arrays.asList(newHistogram, histogram2), new InternalAggregation.ReduceContext(null, null, false)); + newHistogram.reduce(Arrays.asList(newHistogram, histogram2), new InternalAggregation.ReduceContext(null, null, false)); } @Override diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java index 49de4eb821115..6afc5e94e3029 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java @@ -126,12 +126,12 @@ public void testStreamResponse() throws Exception { InternalMappedSignificantTerms getRandomSignificantTerms(SignificanceHeuristic heuristic) { if (randomBoolean()) { SignificantLongTerms.Bucket bucket = new SignificantLongTerms.Bucket(1, 2, 3, 4, 123, InternalAggregations.EMPTY, - DocValueFormat.RAW); + DocValueFormat.RAW, randomDoubleBetween(0, 100, true)); return new SignificantLongTerms("some_name", 1, 1, emptyList(), null, DocValueFormat.RAW, 10, 20, heuristic, singletonList(bucket)); } else { SignificantStringTerms.Bucket bucket = new SignificantStringTerms.Bucket(new BytesRef("someterm"), 1, 2, 3, 4, - InternalAggregations.EMPTY, DocValueFormat.RAW); + InternalAggregations.EMPTY, DocValueFormat.RAW, randomDoubleBetween(0, 100, true)); return new SignificantStringTerms("some_name", 1, 1, emptyList(), null, DocValueFormat.RAW, 10, 20, heuristic, singletonList(bucket)); } @@ -149,7 +149,7 @@ public static SignificanceHeuristic getRandomSignificanceheuristic() { public void testReduce() { List aggs = createInternalAggregations(); InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(null, null, true); - SignificantTerms reducedAgg = (SignificantTerms) aggs.get(0).doReduce(aggs, context); + SignificantTerms reducedAgg = (SignificantTerms) aggs.get(0).reduce(aggs, context); assertThat(reducedAgg.getBuckets().size(), equalTo(2)); assertThat(reducedAgg.getBuckets().get(0).getSubsetDf(), equalTo(8L)); assertThat(reducedAgg.getBuckets().get(0).getSubsetSize(), equalTo(16L)); @@ -196,7 +196,7 @@ SignificantStringTerms createAggregation(SignificanceHeuristic significanceHeuri @Override SignificantStringTerms.Bucket createBucket(long subsetDF, long subsetSize, long supersetDF, long supersetSize, long label) { return new SignificantStringTerms.Bucket(new BytesRef(Long.toString(label).getBytes(StandardCharsets.UTF_8)), subsetDF, - subsetSize, supersetDF, supersetSize, InternalAggregations.EMPTY, DocValueFormat.RAW); + subsetSize, supersetDF, supersetSize, InternalAggregations.EMPTY, DocValueFormat.RAW, 0); } } private class LongTestAggFactory extends TestAggFactory { @@ -210,7 +210,7 @@ SignificantLongTerms createAggregation(SignificanceHeuristic significanceHeurist @Override SignificantLongTerms.Bucket createBucket(long subsetDF, long subsetSize, long supersetDF, long supersetSize, long label) { return new SignificantLongTerms.Bucket(subsetDF, subsetSize, supersetDF, supersetSize, label, InternalAggregations.EMPTY, - DocValueFormat.RAW); + DocValueFormat.RAW, 0); } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsTests.java index 755cb6e85292d..3a9684d305114 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsTests.java @@ -58,7 +58,7 @@ protected InternalSignificantTerms createTestInstance(String name, for (int i = 0; i < numBuckets; ++i) { long term = randomValueOtherThanMany(l -> terms.add(l) == false, random()::nextLong); SignificantLongTerms.Bucket bucket = new SignificantLongTerms.Bucket(subsetDfs[i], subsetSize, - supersetDfs[i], supersetSize, term, aggs, format); + supersetDfs[i], supersetSize, term, aggs, format, 0); bucket.updateScore(significanceHeuristic); buckets.add(bucket); } @@ -109,7 +109,7 @@ protected Class implementationClass() { case 5: buckets = new ArrayList<>(buckets); buckets.add(new SignificantLongTerms.Bucket(randomLong(), randomNonNegativeLong(), randomNonNegativeLong(), - randomNonNegativeLong(), randomNonNegativeLong(), InternalAggregations.EMPTY, format)); + randomNonNegativeLong(), randomNonNegativeLong(), InternalAggregations.EMPTY, format, 0)); break; case 8: if (metaData == null) { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTermsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTermsTests.java index 2255373fd346d..d230b681cbe68 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTermsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTermsTests.java @@ -51,7 +51,7 @@ protected InternalSignificantTerms createTestInstance(String name, for (int i = 0; i < numBuckets; ++i) { BytesRef term = randomValueOtherThanMany(b -> terms.add(b) == false, () -> new BytesRef(randomAlphaOfLength(10))); SignificantStringTerms.Bucket bucket = new SignificantStringTerms.Bucket(term, subsetDfs[i], subsetSize, - supersetDfs[i], supersetSize, aggs, format); + supersetDfs[i], supersetSize, aggs, format, 0); bucket.updateScore(significanceHeuristic); buckets.add(bucket); } @@ -103,7 +103,7 @@ protected Class implementationClass() { buckets = new ArrayList<>(buckets); buckets.add(new SignificantStringTerms.Bucket(new BytesRef(randomAlphaOfLengthBetween(1, 10)), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - InternalAggregations.EMPTY, format)); + InternalAggregations.EMPTY, format, 0)); break; case 8: if (metaData == null) { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java index 727c3ea3a87ae..611e7d916c9c9 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java @@ -1073,7 +1073,7 @@ public void testMixLongAndDouble() throws Exception { new InternalAggregation.ReduceContext(new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()), null, true); for (InternalAggregation internalAgg : aggs) { - InternalAggregation mergedAggs = internalAgg.doReduce(aggs, ctx); + InternalAggregation mergedAggs = internalAgg.reduce(aggs, ctx); assertTrue(mergedAggs instanceof DoubleTerms); long expected = numLongs + numDoubles; List buckets = ((DoubleTerms) mergedAggs).getBuckets(); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalAvgTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalAvgTests.java index 10ae10a9af1c0..5582af4ced64f 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalAvgTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalAvgTests.java @@ -23,8 +23,6 @@ import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.ParsedAggregation; -import org.elasticsearch.search.aggregations.metrics.InternalAvg; -import org.elasticsearch.search.aggregations.metrics.ParsedAvg; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.test.InternalAggregationTestCase; @@ -95,7 +93,7 @@ private void verifyAvgOfDoubles(double[] values, double expected, double delta) aggregations.add(new InternalAvg("dummy1", value, 1, null, null, null)); } InternalAvg internalAvg = new InternalAvg("dummy2", 0, 0, null, null, null); - InternalAvg reduced = internalAvg.doReduce(aggregations, null); + InternalAvg reduced = internalAvg.reduce(aggregations, null); assertEquals(expected, reduced.getValue(), delta); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalExtendedStatsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalExtendedStatsTests.java index 3c5201bfa8aa9..2f53902bb537a 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalExtendedStatsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalExtendedStatsTests.java @@ -225,7 +225,7 @@ private void verifySumOfSqrsOfDoubles(double[] values, double expectedSumOfSqrs, aggregations.add(new InternalExtendedStats("dummy1", 1, 0.0, 0.0, 0.0, sumOfSqrs, sigma, null, null, null)); } InternalExtendedStats stats = new InternalExtendedStats("dummy", 1, 0.0, 0.0, 0.0, 0.0, sigma, null, null, null); - InternalExtendedStats reduced = stats.doReduce(aggregations, null); + InternalExtendedStats reduced = stats.reduce(aggregations, null); assertEquals(expectedSumOfSqrs, reduced.getSumOfSquares(), delta); } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalStatsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalStatsTests.java index 8198d6c2e81a3..e4a56c0c5b84a 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalStatsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalStatsTests.java @@ -114,7 +114,7 @@ private void verifyStatsOfDoubles(double[] values, double expectedSum, double ex aggregations.add(new InternalStats("dummy1", 1, value, value, value, null, null, null)); } InternalStats internalStats = new InternalStats("dummy2", 0, 0.0, 2.0, 0.0, null, null, null); - InternalStats reduced = internalStats.doReduce(aggregations, null); + InternalStats reduced = internalStats.reduce(aggregations, null); assertEquals("dummy2", reduced.getName()); assertEquals(values.length, reduced.getCount()); assertEquals(expectedSum, reduced.getSum(), delta); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalSumTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalSumTests.java index 4f44be7d50833..0fca0d43bd6f1 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalSumTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalSumTests.java @@ -87,7 +87,7 @@ private void verifySummationOfDoubles(double[] values, double expected, double d aggregations.add(new InternalSum("dummy1", value, null, null, null)); } InternalSum internalSum = new InternalSum("dummy", 0, null, null, null); - InternalSum reduced = internalSum.doReduce(aggregations, null); + InternalSum reduced = internalSum.reduce(aggregations, null); assertEquals(expected, reduced.value(), delta); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketAggregatorTests.java index afea0f13bd7db..b8e9f1444dd6a 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketAggregatorTests.java @@ -77,7 +77,7 @@ public class AvgBucketAggregatorTests extends AggregatorTestCase { * it is fixed. * * Note: we have this test inside of the `avg_bucket` package so that we can get access to the package-private - * `doReduce()` needed for testing this + * `reduce()` needed for testing this */ public void testSameAggNames() throws IOException { Query query = new MatchAllDocsQuery(); diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 199f9b055393c..221030cd8d157 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -387,7 +387,7 @@ protected A searchAndReduc InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(root.context().bigArrays(), getMockScriptService(), reduceBucketConsumer, false); - A reduced = (A) aggs.get(0).doReduce(toReduce, context); + A reduced = (A) aggs.get(0).reduce(toReduce, context); doAssertReducedMultiBucketConsumer(reduced, reduceBucketConsumer); aggs = new ArrayList<>(aggs.subList(r, toReduceSize)); aggs.add(reduced); @@ -398,7 +398,12 @@ protected A searchAndReduc new InternalAggregation.ReduceContext(root.context().bigArrays(), getMockScriptService(), reduceBucketConsumer, true); @SuppressWarnings("unchecked") - A internalAgg = (A) aggs.get(0).doReduce(aggs, context); + A internalAgg = (A) aggs.get(0).reduce(aggs, context); + + // materialize any parent pipelines + internalAgg = (A) internalAgg.reducePipelines(internalAgg, context); + + // materialize any sibling pipelines at top level if (internalAgg.pipelineAggregators().size() > 0) { for (PipelineAggregator pipelineAggregator : internalAgg.pipelineAggregators()) { internalAgg = (A) pipelineAggregator.reduce(internalAgg, context); diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/InternalSimpleLongValue.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/InternalSimpleLongValue.java index e8db75edad5d9..a6b8aad699824 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/InternalSimpleLongValue.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/InternalSimpleLongValue.java @@ -64,7 +64,7 @@ DocValueFormat formatter() { } @Override - public InternalSimpleLongValue doReduce(List aggregations, ReduceContext reduceContext) { + public InternalSimpleLongValue reduce(List aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException("Not supported"); } diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/InternalStringStats.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/InternalStringStats.java index 88cb505615cf7..007fbcea9024a 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/InternalStringStats.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/InternalStringStats.java @@ -182,7 +182,7 @@ public Object value(String name) { } @Override - public InternalStringStats doReduce(List aggregations, ReduceContext reduceContext) { + public InternalStringStats reduce(List aggregations, ReduceContext reduceContext) { long count = 0; long totalLength = 0; int minLength = Integer.MAX_VALUE; diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java index 25fe2f51b2f22..91953295bd523 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java @@ -610,7 +610,7 @@ public void testDateHistoWithGap() throws IOException { ScriptService scriptService = mock(ScriptService.class); InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(bigArrays, scriptService, true); - InternalAggregation reduced = ((InternalDateHistogram)unrolled).doReduce(Collections.singletonList(unrolled), context); + InternalAggregation reduced = ((InternalDateHistogram)unrolled).reduce(Collections.singletonList(unrolled), context); assertThat(reduced.toString(), equalTo("{\"histo\":{\"buckets\":[{\"key_as_string\":\"1970-01-01T00:00:00.100Z\",\"key\":100," + "\"doc_count\":1},{\"key_as_string\":\"1970-01-01T00:00:00.200Z\",\"key\":200,\"doc_count\":1}," + "{\"key_as_string\":\"1970-01-01T00:00:00.300Z\",\"key\":300,\"doc_count\":0,\"histo._count\":{\"value\":0.0}}," + diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/TestMultiValueAggregation.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/TestMultiValueAggregation.java index 7a24a5515b527..8061ce31ef832 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/TestMultiValueAggregation.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/TestMultiValueAggregation.java @@ -42,7 +42,7 @@ protected void doWriteTo(StreamOutput out) throws IOException { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException(); } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/TestSingleValueAggregation.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/TestSingleValueAggregation.java index 08f4edda3cbf6..56183414aa0ac 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/TestSingleValueAggregation.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/TestSingleValueAggregation.java @@ -37,7 +37,7 @@ protected void doWriteTo(StreamOutput out) throws IOException { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException(); }