Skip to content

Commit fec882a

Browse files
committed
Decouple pipeline reductions from final agg reduction (#45796)
Historically only two things happened in the final reduction: empty buckets were filled, and pipeline aggs were reduced (since it was the final reduction, this was safe). Usage of the final reduction is growing however. Auto-date-histo might need to perform many reductions on final-reduce to merge down buckets, CCS may need to side-step the final reduction if sending to a different cluster, etc Having pipelines generate their output in the final reduce was convenient, but is becoming increasingly difficult to manage as the rest of the agg framework advances. This commit decouples pipeline aggs from the final reduction by introducing a new "top level" reduce, which should be called at the beginning of the reduce cycle (e.g. from the SearchPhaseController). This will only reduce pipeline aggs on the final reduce after the non-pipeline agg tree has been fully reduced. By separating pipeline reduction into their own set of methods, aggregations are free to use the final reduction for whatever purpose without worrying about generating pipeline results which are non-reducible
1 parent 721a8b3 commit fec882a

File tree

72 files changed

+223
-122
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+223
-122
lines changed

modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ public Object getProperty(List<String> path) {
233233
}
234234

235235
@Override
236-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
236+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
237237
// merge stats across all shards
238238
List<InternalAggregation> aggs = new ArrayList<>(aggregations);
239239
aggs.removeIf(p -> ((InternalMatrixStats)p).stats == null);

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
487487
}
488488
ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
489489
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null :
490-
InternalAggregations.reduce(aggregationsList, reduceContext);
490+
InternalAggregations.topLevelReduce(aggregationsList, reduceContext);
491491
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
492492
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size,
493493
reducedCompletionSuggestions);
@@ -625,7 +625,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
625625
if (index == bufferSize) {
626626
if (hasAggs) {
627627
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
628-
InternalAggregations reducedAggs = InternalAggregations.reduce(Arrays.asList(aggsBuffer), reduceContext);
628+
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext);
629629
Arrays.fill(aggsBuffer, null);
630630
aggsBuffer[0] = reducedAggs;
631631
}

server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ SearchResponse getMergedResponse(Clusters clusters) {
196196
SearchHits mergedSearchHits = topDocsToSearchHits(topDocs, topDocsStats);
197197
setSuggestShardIndex(shards, groupedSuggestions);
198198
Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions));
199-
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContextFunction.apply(true));
199+
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, reduceContextFunction.apply(true));
200200
ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY);
201201
SearchProfileShardResults profileShardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
202202
//make failures ordering consistent between ordinary search and CCS by looking at the shard they come from

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -126,23 +126,25 @@ public String getName() {
126126
return name;
127127
}
128128

129+
/**
130+
* Creates the output from all pipeline aggs that this aggregation is associated with. Should only
131+
* be called after all aggregations have been fully reduced
132+
*/
133+
public InternalAggregation reducePipelines(InternalAggregation reducedAggs, ReduceContext reduceContext) {
134+
assert reduceContext.isFinalReduce();
135+
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
136+
reducedAggs = pipelineAggregator.reduce(reducedAggs, reduceContext);
137+
}
138+
return reducedAggs;
139+
}
140+
129141
/**
130142
* Reduces the given aggregations to a single one and returns it. In <b>most</b> cases, the assumption will be the all given
131143
* aggregations are of the same type (the same type as this aggregation). For best efficiency, when implementing,
132144
* try reusing an existing instance (typically the first in the given list) to save on redundant object
133145
* construction.
134146
*/
135-
public final InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
136-
InternalAggregation aggResult = doReduce(aggregations, reduceContext);
137-
if (reduceContext.isFinalReduce()) {
138-
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
139-
aggResult = pipelineAggregator.reduce(aggResult, reduceContext);
140-
}
141-
}
142-
return aggResult;
143-
}
144-
145-
public abstract InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);
147+
public abstract InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);
146148

147149
/**
148150
* Return true if this aggregation is mapped, and can lead a reduction. If this agg returns

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.List;
3535
import java.util.Map;
3636
import java.util.Objects;
37+
import java.util.stream.Collectors;
3738

3839
/**
3940
* An internal implementation of {@link Aggregations}.
@@ -98,10 +99,47 @@ public List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
9899
return topLevelPipelineAggregators;
99100
}
100101

102+
@SuppressWarnings("unchecked")
103+
private List<InternalAggregation> getInternalAggregations() {
104+
return (List<InternalAggregation>) aggregations;
105+
}
106+
107+
/**
108+
* Begin the reduction process. This should be the entry point for the "first" reduction, e.g. called by
109+
* SearchPhaseController or anywhere else that wants to initiate a reduction. It _should not_ be called
110+
* as an intermediate reduction step (e.g. in the middle of an aggregation tree).
111+
*
112+
* This method first reduces the aggregations, and if it is the final reduce, then reduce the pipeline
113+
* aggregations (both embedded parent/sibling as well as top-level sibling pipelines)
114+
*/
115+
public static InternalAggregations topLevelReduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
116+
InternalAggregations reduced = reduce(aggregationsList, context);
117+
if (reduced == null) {
118+
return null;
119+
}
120+
121+
if (context.isFinalReduce()) {
122+
List<InternalAggregation> reducedInternalAggs = reduced.getInternalAggregations();
123+
reducedInternalAggs = reducedInternalAggs.stream()
124+
.map(agg -> agg.reducePipelines(agg, context))
125+
.collect(Collectors.toList());
126+
127+
List<SiblingPipelineAggregator> topLevelPipelineAggregators = aggregationsList.get(0).getTopLevelPipelineAggregators();
128+
for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) {
129+
InternalAggregation newAgg
130+
= pipelineAggregator.doReduce(new InternalAggregations(reducedInternalAggs), context);
131+
reducedInternalAggs.add(newAgg);
132+
}
133+
return new InternalAggregations(reducedInternalAggs);
134+
}
135+
return reduced;
136+
}
137+
101138
/**
102139
* Reduces the given list of aggregations as well as the top-level pipeline aggregators extracted from the first
103140
* {@link InternalAggregations} object found in the list.
104-
* Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched.
141+
* Note that pipeline aggregations _are not_ reduced by this method. Pipelines are handled
142+
* separately by {@link InternalAggregations#topLevelReduce(List, ReduceContext)}
105143
*/
106144
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
107145
if (aggregationsList.isEmpty()) {
@@ -130,13 +168,6 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
130168
reducedAggregations.add(first.reduce(aggregations, context));
131169
}
132170

133-
if (context.isFinalReduce()) {
134-
for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) {
135-
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context);
136-
reducedAggregations.add(newAgg);
137-
}
138-
return new InternalAggregations(reducedAggregations);
139-
}
140171
return new InternalAggregations(reducedAggregations, topLevelPipelineAggregators);
141172
}
142173
}

server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
2727

2828
import java.io.IOException;
29+
import java.util.ArrayList;
2930
import java.util.List;
3031
import java.util.Map;
3132

@@ -73,7 +74,7 @@ protected InternalMultiBucketAggregation(StreamInput in) throws IOException {
7374
protected abstract B reduceBucket(List<B> buckets, ReduceContext context);
7475

7576
@Override
76-
public abstract List<? extends InternalBucket> getBuckets();
77+
public abstract List<B> getBuckets();
7778

7879
@Override
7980
public Object getProperty(List<String> path) {
@@ -141,6 +142,30 @@ public static int countInnerBucket(Aggregation agg) {
141142
return size;
142143
}
143144

145+
/**
146+
* Unlike {@link InternalAggregation#reducePipelines(InternalAggregation, ReduceContext)}, a multi-bucket
147+
* agg needs to first reduce the buckets (and their parent pipelines) before allowing sibling pipelines
148+
* to materialize
149+
*/
150+
@Override
151+
public final InternalAggregation reducePipelines(InternalAggregation reducedAggs, ReduceContext reduceContext) {
152+
assert reduceContext.isFinalReduce();
153+
List<B> materializedBuckets = reducePipelineBuckets(reduceContext);
154+
return super.reducePipelines(create(materializedBuckets), reduceContext);
155+
}
156+
157+
private List<B> reducePipelineBuckets(ReduceContext reduceContext) {
158+
List<B> reducedBuckets = new ArrayList<>();
159+
for (B bucket : getBuckets()) {
160+
List<InternalAggregation> aggs = new ArrayList<>();
161+
for (Aggregation agg : bucket.getAggregations()) {
162+
aggs.add(((InternalAggregation)agg).reducePipelines((InternalAggregation)agg, reduceContext));
163+
}
164+
reducedBuckets.add(createBucket(new InternalAggregations(aggs), bucket));
165+
}
166+
return reducedBuckets;
167+
}
168+
144169
public abstract static class InternalBucket implements Bucket, Writeable {
145170

146171
public Object getProperty(String containingAggName, List<String> path) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public InternalSingleBucketAggregation create(InternalAggregations subAggregatio
9797
protected abstract InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations);
9898

9999
@Override
100-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
100+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
101101
long docCount = 0L;
102102
List<InternalAggregations> subAggregationsList = new ArrayList<>(aggregations.size());
103103
for (InternalAggregation aggregation : aggregations) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public InternalBucket getBucketByKey(String key) {
181181
}
182182

183183
@Override
184-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
184+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
185185
Map<String, List<InternalBucket>> bucketsMap = new HashMap<>();
186186
for (InternalAggregation aggregation : aggregations) {
187187
InternalAdjacencyMatrix filters = (InternalAdjacencyMatrix) aggregation;

server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ int[] getReverseMuls() {
156156
}
157157

158158
@Override
159-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
159+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
160160
PriorityQueue<BucketIterator> pq = new PriorityQueue<>(aggregations.size());
161161
for (InternalAggregation agg : aggregations) {
162162
InternalComposite sortedAgg = (InternalComposite) agg;

server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ public InternalBucket getBucketByKey(String key) {
189189
}
190190

191191
@Override
192-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
192+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
193193
List<List<InternalBucket>> bucketsList = null;
194194
for (InternalAggregation aggregation : aggregations) {
195195
InternalFilters filters = (InternalFilters) aggregation;

0 commit comments

Comments
 (0)