From 3073152fc8b5168672616c64c79bcf877f4336f9 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 15 Mar 2019 16:09:28 +0100 Subject: [PATCH] Skip sibling pipeline aggregators reduction during non-final reduce Today a coordinating node forces a final reduction of sibling pipeline aggregators whenever reducing aggs, unless it is reducing aggs incrementally. This works well for incremental reduction of aggs, but breaks CCS when minimizing roundtrips as each cluster ends up reducing its own pipeline aggregators locally while that should only be done by the CCS coordinating node later. This causes issues as after their reduction, pipeline aggs cannot be further reduced, which is what happens with CCS causing errors like "java.lang.UnsupportedOperationException: Not supported" being returned. Each coordinating node should rather honour the reduce context flag that indicates whether we are executing a final reduce or not. If not, it should leave the sibling pipeline aggregations alone. Note that his bug affects only pipeline aggs that don't have a parent in the aggs tree, while all the others work well. Relates to #40059 but does not fix it yet, as the CCS coordinating node also needs to be adapted to recreate sibling pipeline aggregators from the request. --- .../action/search/SearchPhaseController.java | 36 +++---------------- .../action/search/TransportSearchAction.java | 2 -- .../aggregations/AggregatorFactories.java | 3 +- .../aggregations/InternalAggregation.java | 2 +- .../aggregations/InternalAggregations.java | 27 +++++++++++--- 5 files changed, 28 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 084cdaca2cc59..f54f101041d1b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -42,7 +42,6 @@ import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -65,8 +64,6 @@ import java.util.Map; import java.util.function.Function; import java.util.function.IntFunction; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; public final class SearchPhaseController { @@ -488,8 +485,8 @@ private ReducedQueryPhase reducedQueryPhase(Collection aggregationsList) { - ReduceContext reduceContext = reduceContextFunction.apply(false); - return aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList, - null, reduceContext); - } - - private static InternalAggregations reduceAggs(List aggregationsList, - List pipelineAggregators, ReduceContext reduceContext) { - InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, reduceContext); - if (pipelineAggregators != null) { - List newAggs = StreamSupport.stream(aggregations.spliterator(), false) - .map((p) -> (InternalAggregation) p) - .collect(Collectors.toList()); - for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) { - InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext); - newAggs.add(newAgg); - } - return new InternalAggregations(newAggs); - } - return aggregations; - } - public static final class ReducedQueryPhase { // the sum of all hits across all reduces shards final TotalHits totalHits; @@ -644,7 +615,8 @@ public void consumeResult(SearchPhaseResult result) { private synchronized void consumeInternal(QuerySearchResult querySearchResult) { if (index == bufferSize) { if (hasAggs) { - InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(aggsBuffer)); + ReduceContext reduceContext = controller.reduceContextFunction.apply(false); + InternalAggregations reducedAggs = InternalAggregations.reduce(Arrays.asList(aggsBuffer), reduceContext); Arrays.fill(aggsBuffer, null); aggsBuffer[0] = reducedAggs; } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index b1aeb945f5887..50f96a3703379 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -315,8 +315,6 @@ public void onFailure(Exception e) { if (localIndices != null) { ActionListener ccsListener = createCCSListener(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, false, countDown, skippedClusters, exceptions, searchResponseMerger, totalClusters, listener); - //here we provide the empty string a cluster alias, which means no prefix in index name, - //but the coord node will perform non final reduce as it's not null. SearchRequest ccsLocalSearchRequest = SearchRequest.crossClusterSearch(searchRequest, localIndices.indices(), RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false); localSearchConsumer.accept(ccsLocalSearchRequest, ccsListener); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index d6eb73514d9c3..9683651391cc2 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -161,8 +161,7 @@ public AggParseContext(String name) { } } - public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0], - new ArrayList()); + public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0], new ArrayList<>()); private AggregatorFactory[] factories; private List pipelineAggregatorFactories; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index eafdbe109776b..b525fd32d918a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -61,7 +61,7 @@ public ReduceContext(BigArrays bigArrays, ScriptService scriptService, IntConsum /** * Returns true iff the current reduce phase is the final reduce phase. This indicates if operations like * pipeline aggregations should be applied or if specific features like {@code minDocCount} should be taken into account. - * Operations that are potentially loosing information can only be applied during the final reduce phase. + * Operations that are potentially losing information can only be applied during the final reduce phase. */ public boolean isFinalReduce() { return isFinalReduce; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 95140b50d2bdf..69adb79cb2b84 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; import java.io.IOException; import java.util.ArrayList; @@ -52,19 +53,26 @@ private InternalAggregations() { } /** - * Constructs a new addAggregation. + * Constructs a new aggregation. */ public InternalAggregations(List aggregations) { super(aggregations); } /** - * Reduces the given lists of addAggregation. - * - * @param aggregationsList A list of aggregation to reduce - * @return The reduced addAggregation + * Reduces the given list of aggregations */ public static InternalAggregations reduce(List aggregationsList, ReduceContext context) { + return reduce(aggregationsList, null, context); + } + + /** + * Reduces the given list of aggregations as well as the provided sibling pipeline aggregators. + * Note that sibling pipeline aggregators are ignored when non final reduction is performed. + */ + public static InternalAggregations reduce(List aggregationsList, + List siblingPipelineAggregators, + ReduceContext context) { if (aggregationsList.isEmpty()) { return null; } @@ -89,6 +97,15 @@ public static InternalAggregations reduce(List aggregation InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand reducedAggregations.add(first.reduce(aggregations, context)); } + + if (siblingPipelineAggregators != null) { + if (context.isFinalReduce()) { + for (SiblingPipelineAggregator pipelineAggregator : siblingPipelineAggregators) { + InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context); + reducedAggregations.add(newAgg); + } + } + } return new InternalAggregations(reducedAggregations); }