Skip to content

Commit 6a3d731

Browse files
authored
Only call reduce on a single InternalAggregation when needed (#62525) (#62594)
Adds a new abstract method in InternalAggregation that flags the framework if it needs to reduce on a single InternalAggregation.
1 parent 5b72461 commit 6a3d731

File tree

15 files changed

+80
-3
lines changed

15 files changed

+80
-3
lines changed

modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,11 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
254254
return new InternalMatrixStats(name, runningStats.docCount, runningStats, null, getMetadata());
255255
}
256256

257+
@Override
258+
protected boolean mustReduceOnSingleInternalAgg() {
259+
return true;
260+
}
261+
257262
@Override
258263
public int hashCode() {
259264
return Objects.hash(super.hashCode(), stats, results);

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,9 +255,17 @@ public InternalAggregation reducePipelines(
255255
* aggregations are of the same type (the same type as this aggregation). For best efficiency, when implementing,
256256
* try reusing an existing instance (typically the first in the given list) to save on redundant object
257257
* construction.
258+
*
259+
* @see #mustReduceOnSingleInternalAgg()
258260
*/
259261
public abstract InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);
260262

263+
/**
264+
* Signal the framework if the {@linkplain InternalAggregation#reduce(List, ReduceContext)} phase needs to be called
265+
* when there is only one {@linkplain InternalAggregation}.
266+
*/
267+
protected abstract boolean mustReduceOnSingleInternalAgg();
268+
261269
/**
262270
* Return true if this aggregation is mapped, and can lead a reduction. If this agg returns
263271
* false, it should return itself if asked to lead a reduction

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,12 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
255255
// If all aggs are unmapped, the agg that leads the reduction will just return itself
256256
aggregations.sort(INTERNAL_AGG_COMPARATOR);
257257
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
258-
reducedAggregations.add(first.reduce(aggregations, context));
258+
if (first.mustReduceOnSingleInternalAgg() || aggregations.size() > 1) {
259+
reducedAggregations.add(first.reduce(aggregations, context));
260+
} else {
261+
// no need for reduce phase
262+
reducedAggregations.add(first);
263+
}
259264
}
260265

261266
return ctor.apply(reducedAggregations);

server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations
177177
return modified ? create(newBuckets) : this;
178178
}
179179

180+
@Override
181+
protected boolean mustReduceOnSingleInternalAgg() {
182+
return true;
183+
}
184+
180185
@Override
181186
public void forEachBucket(Consumer<InternalAggregations> consumer) {
182187
for (B bucket : getBuckets()) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,11 @@ public final double sortValue(AggregationPath.PathElement head, Iterator<Aggrega
176176
return aggregations.sortValue(head, tail);
177177
}
178178

179+
@Override
180+
protected boolean mustReduceOnSingleInternalAgg() {
181+
return true;
182+
}
183+
179184
@Override
180185
public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations, InternalAggregations> rewriter) {
181186
InternalAggregations rewritten = rewriter.apply(aggregations);

server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoBounds.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
116116
return new InternalGeoBounds(name, top, bottom, posLeft, posRight, negLeft, negRight, wrapLongitude, getMetadata());
117117
}
118118

119+
@Override
120+
protected boolean mustReduceOnSingleInternalAgg() {
121+
return false;
122+
}
123+
119124
@Override
120125
public Object getProperty(List<String> path) {
121126
if (path.isEmpty()) {

server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoCentroid.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,11 @@ public InternalGeoCentroid reduce(List<InternalAggregation> aggregations, Reduce
133133
return new InternalGeoCentroid(name, result, totalCount, getMetadata());
134134
}
135135

136+
@Override
137+
protected boolean mustReduceOnSingleInternalAgg() {
138+
return false;
139+
}
140+
136141
@Override
137142
public Object getProperty(List<String> path) {
138143
if (path.isEmpty()) {

server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ public final double sortValue(AggregationPath.PathElement head, Iterator<Aggrega
128128
throw new IllegalArgumentException("Metrics aggregations cannot have sub-aggregations (at [>" + head + "]");
129129
}
130130

131+
@Override
132+
protected boolean mustReduceOnSingleInternalAgg() {
133+
return false;
134+
}
135+
131136
@Override
132137
public int hashCode() {
133138
return Objects.hash(super.hashCode(), format);

server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalScriptedMetric.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ protected void doWriteTo(StreamOutput out) throws IOException {
7474
/*
7575
* I *believe* that this situation can only happen in cross
7676
* cluster search right now. Thus the message. But computers
77-
* are hard.
77+
* are hard.
7878
*/
7979
throw new IllegalArgumentException("scripted_metric doesn't support cross cluster search until 7.8.0");
8080
}
@@ -134,6 +134,11 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
134134
return new InternalScriptedMetric(firstAggregation.getName(), aggregation, firstAggregation.reduceScript, getMetadata());
135135
}
136136

137+
@Override
138+
protected boolean mustReduceOnSingleInternalAgg() {
139+
return true;
140+
}
141+
137142
@Override
138143
public Object getProperty(List<String> path) {
139144
if (path.isEmpty()) {

server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTopHits.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,11 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
162162
new SearchHits(hits, reducedTopDocs.totalHits, maxScore), getMetadata());
163163
}
164164

165+
@Override
166+
protected boolean mustReduceOnSingleInternalAgg() {
167+
return true;
168+
}
169+
165170
@Override
166171
public Object getProperty(List<String> path) {
167172
if (path.isEmpty()) {

0 commit comments

Comments
 (0)