Skip to content

Commit 6344a97

Browse files
authored
Only call reduce on a single InternalAggregation when needed (#62525)
Adds a new abstract method in InternalAggregation that flags the framework if it needs to reduce on a single InternalAggregation.
1 parent 235521c commit 6344a97

File tree

15 files changed

+82
-4
lines changed

15 files changed

+82
-4
lines changed

modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,11 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
254254
return new InternalMatrixStats(name, runningStats.docCount, runningStats, null, getMetadata());
255255
}
256256

257+
@Override
258+
protected boolean mustReduceOnSingleInternalAgg() {
259+
return true;
260+
}
261+
257262
@Override
258263
public int hashCode() {
259264
return Objects.hash(super.hashCode(), stats, results);

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,9 +223,17 @@ public InternalAggregation reducePipelines(
223223
* aggregations are of the same type (the same type as this aggregation). For best efficiency, when implementing,
224224
* try reusing an existing instance (typically the first in the given list) to save on redundant object
225225
* construction.
226+
*
227+
* @see #mustReduceOnSingleInternalAgg()
226228
*/
227229
public abstract InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);
228230

231+
/**
232+
* Signal the framework if the {@linkplain InternalAggregation#reduce(List, ReduceContext)} phase needs to be called
233+
* when there is only one {@linkplain InternalAggregation}.
234+
*/
235+
protected abstract boolean mustReduceOnSingleInternalAgg();
236+
229237
/**
230238
* Return true if this aggregation is mapped, and can lead a reduction. If this agg returns
231239
* false, it should return itself if asked to lead a reduction

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,12 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
161161
// If all aggs are unmapped, the agg that leads the reduction will just return itself
162162
aggregations.sort(INTERNAL_AGG_COMPARATOR);
163163
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
164-
reducedAggregations.add(first.reduce(aggregations, context));
164+
if (first.mustReduceOnSingleInternalAgg() || aggregations.size() > 1) {
165+
reducedAggregations.add(first.reduce(aggregations, context));
166+
} else {
167+
// no need for reduce phase
168+
reducedAggregations.add(first);
169+
}
165170
}
166171

167172
return from(reducedAggregations);

server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations
177177
return modified ? create(newBuckets) : this;
178178
}
179179

180+
@Override
181+
protected boolean mustReduceOnSingleInternalAgg() {
182+
return true;
183+
}
184+
180185
@Override
181186
public void forEachBucket(Consumer<InternalAggregations> consumer) {
182187
for (B bucket : getBuckets()) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,11 @@ public final double sortValue(AggregationPath.PathElement head, Iterator<Aggrega
176176
return aggregations.sortValue(head, tail);
177177
}
178178

179+
@Override
180+
protected boolean mustReduceOnSingleInternalAgg() {
181+
return true;
182+
}
183+
179184
@Override
180185
public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations, InternalAggregations> rewriter) {
181186
InternalAggregations rewritten = rewriter.apply(aggregations);

server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoBounds.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
116116
return new InternalGeoBounds(name, top, bottom, posLeft, posRight, negLeft, negRight, wrapLongitude, getMetadata());
117117
}
118118

119+
@Override
120+
protected boolean mustReduceOnSingleInternalAgg() {
121+
return false;
122+
}
123+
119124
@Override
120125
public Object getProperty(List<String> path) {
121126
if (path.isEmpty()) {

server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoCentroid.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,11 @@ public InternalGeoCentroid reduce(List<InternalAggregation> aggregations, Reduce
133133
return new InternalGeoCentroid(name, result, totalCount, getMetadata());
134134
}
135135

136+
@Override
137+
protected boolean mustReduceOnSingleInternalAgg() {
138+
return false;
139+
}
140+
136141
@Override
137142
public Object getProperty(List<String> path) {
138143
if (path.isEmpty()) {

server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ public final double sortValue(AggregationPath.PathElement head, Iterator<Aggrega
128128
throw new IllegalArgumentException("Metrics aggregations cannot have sub-aggregations (at [>" + head + "]");
129129
}
130130

131+
@Override
132+
protected boolean mustReduceOnSingleInternalAgg() {
133+
return false;
134+
}
135+
131136
@Override
132137
public int hashCode() {
133138
return Objects.hash(super.hashCode(), format);

server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalScriptedMetric.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ protected void doWriteTo(StreamOutput out) throws IOException {
7474
/*
7575
* I *believe* that this situation can only happen in cross
7676
* cluster search right now. Thus the message. But computers
77-
* are hard.
77+
* are hard.
7878
*/
7979
throw new IllegalArgumentException("scripted_metric doesn't support cross cluster search until 7.8.0");
8080
}
@@ -134,6 +134,11 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
134134
return new InternalScriptedMetric(firstAggregation.getName(), aggregation, firstAggregation.reduceScript, getMetadata());
135135
}
136136

137+
@Override
138+
protected boolean mustReduceOnSingleInternalAgg() {
139+
return true;
140+
}
141+
137142
@Override
138143
public Object getProperty(List<String> path) {
139144
if (path.isEmpty()) {

server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTopHits.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@
4141
/**
4242
* Results of the {@link TopHitsAggregator}.
4343
*/
44-
public class InternalTopHits extends InternalAggregation implements TopHits {
44+
public class
45+
InternalTopHits extends InternalAggregation implements TopHits {
4546
private int from;
4647
private int size;
4748
private TopDocsAndMaxScore topDocs;
@@ -162,6 +163,11 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
162163
new SearchHits(hits, reducedTopDocs.totalHits, maxScore), getMetadata());
163164
}
164165

166+
@Override
167+
protected boolean mustReduceOnSingleInternalAgg() {
168+
return true;
169+
}
170+
165171
@Override
166172
public Object getProperty(List<String> path) {
167173
if (path.isEmpty()) {

0 commit comments

Comments
 (0)