diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 084cdaca2cc59..f54f101041d1b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -42,7 +42,6 @@ import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -65,8 +64,6 @@ import java.util.Map; import java.util.function.Function; import java.util.function.IntFunction; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; public final class SearchPhaseController { @@ -488,8 +485,8 @@ private ReducedQueryPhase reducedQueryPhase(Collection aggregationsList) { - ReduceContext reduceContext = reduceContextFunction.apply(false); - return aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList, - null, reduceContext); - } - - private static InternalAggregations reduceAggs(List aggregationsList, - List pipelineAggregators, ReduceContext reduceContext) { - InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, reduceContext); - if (pipelineAggregators != null) { - List newAggs = StreamSupport.stream(aggregations.spliterator(), false) - .map((p) -> (InternalAggregation) p) - .collect(Collectors.toList()); - for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) { - InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext); - newAggs.add(newAgg); - } - return new InternalAggregations(newAggs); - } - return aggregations; - } - public static final class ReducedQueryPhase { // the sum of all hits across all reduces shards final TotalHits totalHits; @@ -644,7 +615,8 @@ public void consumeResult(SearchPhaseResult result) { private synchronized void consumeInternal(QuerySearchResult querySearchResult) { if (index == bufferSize) { if (hasAggs) { - InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(aggsBuffer)); + ReduceContext reduceContext = controller.reduceContextFunction.apply(false); + InternalAggregations reducedAggs = InternalAggregations.reduce(Arrays.asList(aggsBuffer), reduceContext); Arrays.fill(aggsBuffer, null); aggsBuffer[0] = reducedAggs; } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index b1aeb945f5887..50f96a3703379 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -315,8 +315,6 @@ public void onFailure(Exception e) { if (localIndices != null) { ActionListener ccsListener = createCCSListener(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, false, countDown, skippedClusters, exceptions, searchResponseMerger, totalClusters, listener); - //here we provide the empty string a cluster alias, which means no prefix in index name, - //but the coord node will perform non final reduce as it's not null. SearchRequest ccsLocalSearchRequest = SearchRequest.crossClusterSearch(searchRequest, localIndices.indices(), RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false); localSearchConsumer.accept(ccsLocalSearchRequest, ccsListener); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index d6eb73514d9c3..9683651391cc2 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -161,8 +161,7 @@ public AggParseContext(String name) { } } - public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0], - new ArrayList()); + public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0], new ArrayList<>()); private AggregatorFactory[] factories; private List pipelineAggregatorFactories; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index eafdbe109776b..b525fd32d918a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -61,7 +61,7 @@ public ReduceContext(BigArrays bigArrays, ScriptService scriptService, IntConsum /** * Returns true iff the current reduce phase is the final reduce phase. This indicates if operations like * pipeline aggregations should be applied or if specific features like {@code minDocCount} should be taken into account. - * Operations that are potentially loosing information can only be applied during the final reduce phase. + * Operations that are potentially losing information can only be applied during the final reduce phase. */ public boolean isFinalReduce() { return isFinalReduce; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 95140b50d2bdf..69adb79cb2b84 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; import java.io.IOException; import java.util.ArrayList; @@ -52,19 +53,26 @@ private InternalAggregations() { } /** - * Constructs a new addAggregation. + * Constructs a new aggregation. */ public InternalAggregations(List aggregations) { super(aggregations); } /** - * Reduces the given lists of addAggregation. - * - * @param aggregationsList A list of aggregation to reduce - * @return The reduced addAggregation + * Reduces the given list of aggregations */ public static InternalAggregations reduce(List aggregationsList, ReduceContext context) { + return reduce(aggregationsList, null, context); + } + + /** + * Reduces the given list of aggregations as well as the provided sibling pipeline aggregators. + * Note that sibling pipeline aggregators are ignored when non final reduction is performed. + */ + public static InternalAggregations reduce(List aggregationsList, + List siblingPipelineAggregators, + ReduceContext context) { if (aggregationsList.isEmpty()) { return null; } @@ -89,6 +97,15 @@ public static InternalAggregations reduce(List aggregation InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand reducedAggregations.add(first.reduce(aggregations, context)); } + + if (siblingPipelineAggregators != null) { + if (context.isFinalReduce()) { + for (SiblingPipelineAggregator pipelineAggregator : siblingPipelineAggregators) { + InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context); + reducedAggregations.add(newAgg); + } + } + } return new InternalAggregations(reducedAggregations); }