Skip to content

Commit 9c34ff9

Browse files
authored
Decouple pipeline reductions from final agg reduction (#45796)
Historically only two things happened in the final reduction: empty buckets were filled, and pipeline aggs were reduced (since it was the final reduction, this was safe). Usage of the final reduction is growing however. Auto-date-histo might need to perform many reductions on final-reduce to merge down buckets, CCS may need to side-step the final reduction if sending to a different cluster, etc Having pipelines generate their output in the final reduce was convenient, but is becoming increasingly difficult to manage as the rest of the agg framework advances. This commit decouples pipeline aggs from the final reduction by introducing a new "top level" reduce, which should be called at the beginning of the reduce cycle (e.g. from the SearchPhaseController). This will only reduce pipeline aggs on the final reduce after the non-pipeline agg tree has been fully reduced. By separating pipeline reduction into their own set of methods, aggregations are free to use the final reduction for whatever purpose without worrying about generating pipeline results which are non-reducible
1 parent 1a349af commit 9c34ff9

File tree

72 files changed

+223
-122
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+223
-122
lines changed

modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ public Object getProperty(List<String> path) {
233233
}
234234

235235
@Override
236-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
236+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
237237
// merge stats across all shards
238238
List<InternalAggregation> aggs = new ArrayList<>(aggregations);
239239
aggs.removeIf(p -> ((InternalMatrixStats)p).stats == null);

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
487487
}
488488
ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
489489
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null :
490-
InternalAggregations.reduce(aggregationsList, reduceContext);
490+
InternalAggregations.topLevelReduce(aggregationsList, reduceContext);
491491
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
492492
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size,
493493
reducedCompletionSuggestions);
@@ -625,7 +625,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
625625
if (index == bufferSize) {
626626
if (hasAggs) {
627627
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
628-
InternalAggregations reducedAggs = InternalAggregations.reduce(Arrays.asList(aggsBuffer), reduceContext);
628+
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext);
629629
Arrays.fill(aggsBuffer, null);
630630
aggsBuffer[0] = reducedAggs;
631631
}

server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ SearchResponse getMergedResponse(Clusters clusters) {
196196
SearchHits mergedSearchHits = topDocsToSearchHits(topDocs, topDocsStats);
197197
setSuggestShardIndex(shards, groupedSuggestions);
198198
Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions));
199-
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContextFunction.apply(true));
199+
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, reduceContextFunction.apply(true));
200200
ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY);
201201
SearchProfileShardResults profileShardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
202202
//make failures ordering consistent between ordinary search and CCS by looking at the shard they come from

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -126,23 +126,25 @@ public String getName() {
126126
return name;
127127
}
128128

129+
/**
130+
* Creates the output from all pipeline aggs that this aggregation is associated with. Should only
131+
* be called after all aggregations have been fully reduced
132+
*/
133+
public InternalAggregation reducePipelines(InternalAggregation reducedAggs, ReduceContext reduceContext) {
134+
assert reduceContext.isFinalReduce();
135+
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
136+
reducedAggs = pipelineAggregator.reduce(reducedAggs, reduceContext);
137+
}
138+
return reducedAggs;
139+
}
140+
129141
/**
130142
* Reduces the given aggregations to a single one and returns it. In <b>most</b> cases, the assumption will be the all given
131143
* aggregations are of the same type (the same type as this aggregation). For best efficiency, when implementing,
132144
* try reusing an existing instance (typically the first in the given list) to save on redundant object
133145
* construction.
134146
*/
135-
public final InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
136-
InternalAggregation aggResult = doReduce(aggregations, reduceContext);
137-
if (reduceContext.isFinalReduce()) {
138-
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
139-
aggResult = pipelineAggregator.reduce(aggResult, reduceContext);
140-
}
141-
}
142-
return aggResult;
143-
}
144-
145-
public abstract InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);
147+
public abstract InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);
146148

147149
/**
148150
* Return true if this aggregation is mapped, and can lead a reduction. If this agg returns

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.List;
3434
import java.util.Map;
3535
import java.util.Objects;
36+
import java.util.stream.Collectors;
3637

3738
/**
3839
* An internal implementation of {@link Aggregations}.
@@ -91,10 +92,47 @@ public List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
9192
return topLevelPipelineAggregators;
9293
}
9394

95+
@SuppressWarnings("unchecked")
96+
private List<InternalAggregation> getInternalAggregations() {
97+
return (List<InternalAggregation>) aggregations;
98+
}
99+
100+
/**
101+
* Begin the reduction process. This should be the entry point for the "first" reduction, e.g. called by
102+
* SearchPhaseController or anywhere else that wants to initiate a reduction. It _should not_ be called
103+
* as an intermediate reduction step (e.g. in the middle of an aggregation tree).
104+
*
105+
* This method first reduces the aggregations, and if it is the final reduce, then reduce the pipeline
106+
* aggregations (both embedded parent/sibling as well as top-level sibling pipelines)
107+
*/
108+
public static InternalAggregations topLevelReduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
109+
InternalAggregations reduced = reduce(aggregationsList, context);
110+
if (reduced == null) {
111+
return null;
112+
}
113+
114+
if (context.isFinalReduce()) {
115+
List<InternalAggregation> reducedInternalAggs = reduced.getInternalAggregations();
116+
reducedInternalAggs = reducedInternalAggs.stream()
117+
.map(agg -> agg.reducePipelines(agg, context))
118+
.collect(Collectors.toList());
119+
120+
List<SiblingPipelineAggregator> topLevelPipelineAggregators = aggregationsList.get(0).getTopLevelPipelineAggregators();
121+
for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) {
122+
InternalAggregation newAgg
123+
= pipelineAggregator.doReduce(new InternalAggregations(reducedInternalAggs), context);
124+
reducedInternalAggs.add(newAgg);
125+
}
126+
return new InternalAggregations(reducedInternalAggs);
127+
}
128+
return reduced;
129+
}
130+
94131
/**
95132
* Reduces the given list of aggregations as well as the top-level pipeline aggregators extracted from the first
96133
* {@link InternalAggregations} object found in the list.
97-
* Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched.
134+
* Note that pipeline aggregations _are not_ reduced by this method. Pipelines are handled
135+
* separately by {@link InternalAggregations#topLevelReduce(List, ReduceContext)}
98136
*/
99137
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
100138
if (aggregationsList.isEmpty()) {
@@ -123,13 +161,6 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
123161
reducedAggregations.add(first.reduce(aggregations, context));
124162
}
125163

126-
if (context.isFinalReduce()) {
127-
for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) {
128-
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context);
129-
reducedAggregations.add(newAgg);
130-
}
131-
return new InternalAggregations(reducedAggregations);
132-
}
133164
return new InternalAggregations(reducedAggregations, topLevelPipelineAggregators);
134165
}
135166
}

server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
2727

2828
import java.io.IOException;
29+
import java.util.ArrayList;
2930
import java.util.List;
3031
import java.util.Map;
3132

@@ -73,7 +74,7 @@ protected InternalMultiBucketAggregation(StreamInput in) throws IOException {
7374
protected abstract B reduceBucket(List<B> buckets, ReduceContext context);
7475

7576
@Override
76-
public abstract List<? extends InternalBucket> getBuckets();
77+
public abstract List<B> getBuckets();
7778

7879
@Override
7980
public Object getProperty(List<String> path) {
@@ -141,6 +142,30 @@ public static int countInnerBucket(Aggregation agg) {
141142
return size;
142143
}
143144

145+
/**
146+
* Unlike {@link InternalAggregation#reducePipelines(InternalAggregation, ReduceContext)}, a multi-bucket
147+
* agg needs to first reduce the buckets (and their parent pipelines) before allowing sibling pipelines
148+
* to materialize
149+
*/
150+
@Override
151+
public final InternalAggregation reducePipelines(InternalAggregation reducedAggs, ReduceContext reduceContext) {
152+
assert reduceContext.isFinalReduce();
153+
List<B> materializedBuckets = reducePipelineBuckets(reduceContext);
154+
return super.reducePipelines(create(materializedBuckets), reduceContext);
155+
}
156+
157+
private List<B> reducePipelineBuckets(ReduceContext reduceContext) {
158+
List<B> reducedBuckets = new ArrayList<>();
159+
for (B bucket : getBuckets()) {
160+
List<InternalAggregation> aggs = new ArrayList<>();
161+
for (Aggregation agg : bucket.getAggregations()) {
162+
aggs.add(((InternalAggregation)agg).reducePipelines((InternalAggregation)agg, reduceContext));
163+
}
164+
reducedBuckets.add(createBucket(new InternalAggregations(aggs), bucket));
165+
}
166+
return reducedBuckets;
167+
}
168+
144169
public abstract static class InternalBucket implements Bucket, Writeable {
145170

146171
public Object getProperty(String containingAggName, List<String> path) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public InternalSingleBucketAggregation create(InternalAggregations subAggregatio
9797
protected abstract InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations);
9898

9999
@Override
100-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
100+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
101101
long docCount = 0L;
102102
List<InternalAggregations> subAggregationsList = new ArrayList<>(aggregations.size());
103103
for (InternalAggregation aggregation : aggregations) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public InternalBucket getBucketByKey(String key) {
181181
}
182182

183183
@Override
184-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
184+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
185185
Map<String, List<InternalBucket>> bucketsMap = new HashMap<>();
186186
for (InternalAggregation aggregation : aggregations) {
187187
InternalAdjacencyMatrix filters = (InternalAdjacencyMatrix) aggregation;

server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ int[] getReverseMuls() {
143143
}
144144

145145
@Override
146-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
146+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
147147
PriorityQueue<BucketIterator> pq = new PriorityQueue<>(aggregations.size());
148148
for (InternalAggregation agg : aggregations) {
149149
InternalComposite sortedAgg = (InternalComposite) agg;

server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ public InternalBucket getBucketByKey(String key) {
189189
}
190190

191191
@Override
192-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
192+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
193193
List<List<InternalBucket>> bucketsList = null;
194194
for (InternalAggregation aggregation : aggregations) {
195195
InternalFilters filters = (InternalFilters) aggregation;

0 commit comments

Comments
 (0)