Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
return new InternalMatrixStats(name, runningStats.docCount, runningStats, null, getMetadata());
}

@Override
protected boolean mustReduceOnSingleInternalAgg() {
return true;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), stats, results);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,17 @@ public InternalAggregation reducePipelines(
* aggregations are of the same type (the same type as this aggregation). For best efficiency, when implementing,
* try reusing an existing instance (typically the first in the given list) to save on redundant object
* construction.
*
* @see #mustReduceOnSingleInternalAgg()
*/
public abstract InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);

/**
* Signal the framework if the {@linkplain InternalAggregation#reduce(List, ReduceContext)} phase needs to be called
* when there is only one {@linkplain InternalAggregation}.
*/
protected abstract boolean mustReduceOnSingleInternalAgg();

/**
* Return true if this aggregation is mapped, and can lead a reduction. If this agg returns
* false, it should return itself if asked to lead a reduction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,12 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
// If all aggs are unmapped, the agg that leads the reduction will just return itself
aggregations.sort(INTERNAL_AGG_COMPARATOR);
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
reducedAggregations.add(first.reduce(aggregations, context));
if (first.mustReduceOnSingleInternalAgg() || aggregations.size() > 1) {
reducedAggregations.add(first.reduce(aggregations, context));
} else {
// no need for reduce phase
reducedAggregations.add(first);
}
}

return from(reducedAggregations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations
return modified ? create(newBuckets) : this;
}

@Override
protected boolean mustReduceOnSingleInternalAgg() {
return true;
}

@Override
public void forEachBucket(Consumer<InternalAggregations> consumer) {
for (B bucket : getBuckets()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ public final double sortValue(AggregationPath.PathElement head, Iterator<Aggrega
return aggregations.sortValue(head, tail);
}

@Override
protected boolean mustReduceOnSingleInternalAgg() {
return true;
}

@Override
public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations, InternalAggregations> rewriter) {
InternalAggregations rewritten = rewriter.apply(aggregations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
return new InternalGeoBounds(name, top, bottom, posLeft, posRight, negLeft, negRight, wrapLongitude, getMetadata());
}

@Override
protected boolean mustReduceOnSingleInternalAgg() {
return false;
}

@Override
public Object getProperty(List<String> path) {
if (path.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ public InternalGeoCentroid reduce(List<InternalAggregation> aggregations, Reduce
return new InternalGeoCentroid(name, result, totalCount, getMetadata());
}

@Override
protected boolean mustReduceOnSingleInternalAgg() {
return false;
}

@Override
public Object getProperty(List<String> path) {
if (path.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ public final double sortValue(AggregationPath.PathElement head, Iterator<Aggrega
throw new IllegalArgumentException("Metrics aggregations cannot have sub-aggregations (at [>" + head + "]");
}

@Override
protected boolean mustReduceOnSingleInternalAgg() {
return false;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), format);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected void doWriteTo(StreamOutput out) throws IOException {
/*
* I *believe* that this situation can only happen in cross
* cluster search right now. Thus the message. But computers
* are hard.
* are hard.
*/
throw new IllegalArgumentException("scripted_metric doesn't support cross cluster search until 7.8.0");
}
Expand Down Expand Up @@ -134,6 +134,11 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
return new InternalScriptedMetric(firstAggregation.getName(), aggregation, firstAggregation.reduceScript, getMetadata());
}

@Override
protected boolean mustReduceOnSingleInternalAgg() {
return true;
}

@Override
public Object getProperty(List<String> path) {
if (path.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
/**
* Results of the {@link TopHitsAggregator}.
*/
public class InternalTopHits extends InternalAggregation implements TopHits {
public class
InternalTopHits extends InternalAggregation implements TopHits {
private int from;
private int size;
private TopDocsAndMaxScore topDocs;
Expand Down Expand Up @@ -162,6 +163,11 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
new SearchHits(hits, reducedTopDocs.totalHits, maxScore), getMetadata());
}

@Override
protected boolean mustReduceOnSingleInternalAgg() {
return true;
}

@Override
public Object getProperty(List<String> path) {
if (path.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,11 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
return new InternalThrowing(name, false, metadata);
}

@Override
protected boolean mustReduceOnSingleInternalAgg() {
return true;
}

@Override
public Object getProperty(List<String> path) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,11 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
return new InternalAggCardinality(name, cardinality, metadata);
}

@Override
protected boolean mustReduceOnSingleInternalAgg() {
return true;
}

@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
return builder.array("cardinality", cardinality);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ public InternalStringStats reduce(List<InternalAggregation> aggregations, Reduce
showDistribution, format, getMetadata());
}

@Override
protected boolean mustReduceOnSingleInternalAgg() {
return false;
}

@Override
public Object getProperty(List<String> path) {
if (path.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.Map;
import java.util.Objects;

public class InternalInferenceAggregation extends InternalAggregation {
public class InternalInferenceAggregation extends InternalAggregation {

private final InferenceResults inferenceResult;

Expand Down Expand Up @@ -47,6 +47,10 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
throw new UnsupportedOperationException("Reducing an inference aggregation is not supported");
}

@Override
protected boolean mustReduceOnSingleInternalAgg() {
return true;
}

@Override
public Object getProperty(List<String> path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
throw new UnsupportedOperationException();
}

@Override
protected boolean mustReduceOnSingleInternalAgg() {
return true;
}

@Override
public Object getProperty(List<String> path) {
if (this.path.equals(path)) {
Expand Down