Skip to content

Commit 859e27c

Browse files
Aggregations: bucket_sort pipeline aggregation (#27152)
This commit adds a parent pipeline aggregation that allows sorting the buckets of a parent multi-bucket aggregation. The aggregation also offers [from] and [size] parameters in order to truncate the result as desired. Closes #14928
1 parent 0293c40 commit 859e27c

File tree

9 files changed

+1112
-1
lines changed

9 files changed

+1112
-1
lines changed

core/src/main/java/org/elasticsearch/search/SearchModule.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@
202202
import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregator;
203203
import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregationBuilder;
204204
import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregator;
205+
import org.elasticsearch.search.aggregations.pipeline.bucketsort.BucketSortPipelineAggregationBuilder;
206+
import org.elasticsearch.search.aggregations.pipeline.bucketsort.BucketSortPipelineAggregator;
205207
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregationBuilder;
206208
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregator;
207209
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregationBuilder;
@@ -496,6 +498,11 @@ private void registerPipelineAggregations(List<SearchPlugin> plugins) {
496498
BucketSelectorPipelineAggregationBuilder::new,
497499
BucketSelectorPipelineAggregator::new,
498500
BucketSelectorPipelineAggregationBuilder::parse));
501+
registerPipelineAggregation(new PipelineAggregationSpec(
502+
BucketSortPipelineAggregationBuilder.NAME,
503+
BucketSortPipelineAggregationBuilder::new,
504+
BucketSortPipelineAggregator::new,
505+
BucketSortPipelineAggregationBuilder::parse));
499506
registerPipelineAggregation(new PipelineAggregationSpec(
500507
SerialDiffPipelineAggregationBuilder.NAME,
501508
SerialDiffPipelineAggregationBuilder::new,

core/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class BucketHelpers {
4949
* function.
5050
*
5151
* "insert_zeros": empty buckets will be filled with zeros for all metrics
52-
* "ignore": empty buckets will simply be ignored
52+
* "skip": empty buckets will simply be ignored
5353
*/
5454
public enum GapPolicy {
5555
INSERT_ZEROS((byte) 0, "insert_zeros"), SKIP((byte) 1, "skip");

core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,14 @@
2929
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregationBuilder;
3030
import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregationBuilder;
3131
import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregationBuilder;
32+
import org.elasticsearch.search.aggregations.pipeline.bucketsort.BucketSortPipelineAggregationBuilder;
3233
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregationBuilder;
3334
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregationBuilder;
3435
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder;
3536
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder;
37+
import org.elasticsearch.search.sort.FieldSortBuilder;
3638

39+
import java.util.List;
3740
import java.util.Map;
3841

3942
public final class PipelineAggregatorBuilders {
@@ -99,6 +102,10 @@ public static BucketSelectorPipelineAggregationBuilder bucketSelector(String nam
99102
return new BucketSelectorPipelineAggregationBuilder(name, script, bucketsPaths);
100103
}
101104

105+
public static BucketSortPipelineAggregationBuilder bucketSort(String name, List<FieldSortBuilder> sorts) {
106+
return new BucketSortPipelineAggregationBuilder(name, sorts);
107+
}
108+
102109
public static CumulativeSumPipelineAggregationBuilder cumulativeSum(String name,
103110
String bucketsPath) {
104111
return new CumulativeSumPipelineAggregationBuilder(name, bucketsPath);
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.search.aggregations.pipeline.bucketsort;
20+
21+
import org.elasticsearch.common.ParseField;
22+
import org.elasticsearch.common.io.stream.StreamInput;
23+
import org.elasticsearch.common.io.stream.StreamOutput;
24+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
25+
import org.elasticsearch.common.xcontent.ObjectParser;
26+
import org.elasticsearch.common.xcontent.XContentBuilder;
27+
import org.elasticsearch.common.xcontent.XContentParser;
28+
import org.elasticsearch.search.aggregations.AggregationBuilder;
29+
import org.elasticsearch.search.aggregations.AggregatorFactory;
30+
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
31+
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
32+
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
33+
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
34+
import org.elasticsearch.search.builder.SearchSourceBuilder;
35+
import org.elasticsearch.search.sort.FieldSortBuilder;
36+
import org.elasticsearch.search.sort.SortBuilder;
37+
38+
import java.io.IOException;
39+
import java.util.ArrayList;
40+
import java.util.Arrays;
41+
import java.util.Collections;
42+
import java.util.List;
43+
import java.util.Locale;
44+
import java.util.Map;
45+
import java.util.Objects;
46+
47+
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
48+
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.GAP_POLICY;
49+
50+
/**
51+
* Builds a pipeline aggregation that allows sorting the buckets of its parent
52+
* aggregation. The bucket {@code _key}, {@code _count} or sub-aggregations may be used as sort
53+
* keys. Parameters {@code from} and {@code size} may also be set in order to truncate the
54+
* result bucket list.
55+
*/
56+
public class BucketSortPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<BucketSortPipelineAggregationBuilder> {
57+
public static final String NAME = "bucket_sort";
58+
59+
private static final ParseField FROM = new ParseField("from");
60+
private static final ParseField SIZE = new ParseField("size");
61+
62+
public static final ConstructingObjectParser<BucketSortPipelineAggregationBuilder, String> PARSER = new ConstructingObjectParser<>(NAME,
63+
false, (a, context) -> new BucketSortPipelineAggregationBuilder(context, (List<FieldSortBuilder>) a[0]));
64+
65+
static {
66+
PARSER.declareField(optionalConstructorArg(), (p, c) -> {
67+
List<SortBuilder<?>> sorts = SortBuilder.fromXContent(p);
68+
List<FieldSortBuilder> fieldSorts = new ArrayList<>(sorts.size());
69+
for (SortBuilder<?> sort : sorts) {
70+
if (sort instanceof FieldSortBuilder == false) {
71+
throw new IllegalArgumentException("[" + NAME + "] only supports field based sorting; incompatible sort: ["
72+
+ sort + "]");
73+
}
74+
fieldSorts.add((FieldSortBuilder) sort);
75+
}
76+
return fieldSorts;
77+
}, SearchSourceBuilder.SORT_FIELD,
78+
ObjectParser.ValueType.OBJECT_ARRAY);
79+
PARSER.declareInt(BucketSortPipelineAggregationBuilder::from, FROM);
80+
PARSER.declareInt(BucketSortPipelineAggregationBuilder::size, SIZE);
81+
PARSER.declareField(BucketSortPipelineAggregationBuilder::gapPolicy, p -> {
82+
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
83+
return GapPolicy.parse(p.text().toLowerCase(Locale.ROOT), p.getTokenLocation());
84+
}
85+
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
86+
}, GAP_POLICY, ObjectParser.ValueType.STRING);
87+
}
88+
89+
private List<FieldSortBuilder> sorts = Collections.emptyList();
90+
private int from = 0;
91+
private Integer size;
92+
private GapPolicy gapPolicy = GapPolicy.SKIP;
93+
94+
public BucketSortPipelineAggregationBuilder(String name, List<FieldSortBuilder> sorts) {
95+
super(name, NAME, sorts == null ? new String[0] : sorts.stream().map(s -> s.getFieldName()).toArray(String[]::new));
96+
this.sorts = sorts == null ? Collections.emptyList() : sorts;
97+
}
98+
99+
/**
100+
* Read from a stream.
101+
*/
102+
public BucketSortPipelineAggregationBuilder(StreamInput in) throws IOException {
103+
super(in, NAME);
104+
sorts = in.readList(FieldSortBuilder::new);
105+
from = in.readVInt();
106+
size = in.readOptionalVInt();
107+
gapPolicy = GapPolicy.readFrom(in);
108+
}
109+
110+
@Override
111+
protected void doWriteTo(StreamOutput out) throws IOException {
112+
out.writeList(sorts);
113+
out.writeVInt(from);
114+
out.writeOptionalVInt(size);
115+
gapPolicy.writeTo(out);
116+
}
117+
118+
public BucketSortPipelineAggregationBuilder from(int from) {
119+
if (from < 0) {
120+
throw new IllegalArgumentException("[" + FROM.getPreferredName() + "] must be a non-negative integer: [" + from + "]");
121+
}
122+
this.from = from;
123+
return this;
124+
}
125+
126+
public BucketSortPipelineAggregationBuilder size(Integer size) {
127+
if (size != null && size <= 0) {
128+
throw new IllegalArgumentException("[" + SIZE.getPreferredName() + "] must be a positive integer: [" + size + "]");
129+
}
130+
this.size = size;
131+
return this;
132+
}
133+
134+
public BucketSortPipelineAggregationBuilder gapPolicy(GapPolicy gapPolicy) {
135+
if (gapPolicy == null) {
136+
throw new IllegalArgumentException("[" + GAP_POLICY.getPreferredName() + "] must not be null: [" + name + "]");
137+
}
138+
this.gapPolicy = gapPolicy;
139+
return this;
140+
}
141+
142+
@Override
143+
protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException {
144+
return new BucketSortPipelineAggregator(name, sorts, from, size, gapPolicy, metaData);
145+
}
146+
147+
@Override
148+
public void doValidate(AggregatorFactory<?> parent, List<AggregationBuilder> aggFactories,
149+
List<PipelineAggregationBuilder> pipelineAggregatoractories) {
150+
if (sorts.isEmpty() && size == null && from == 0) {
151+
throw new IllegalStateException("[" + name + "] is configured to perform nothing. Please set either of "
152+
+ Arrays.asList(SearchSourceBuilder.SORT_FIELD.getPreferredName(), SIZE.getPreferredName(), FROM.getPreferredName())
153+
+ " to use " + NAME);
154+
}
155+
}
156+
157+
@Override
158+
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
159+
builder.field(SearchSourceBuilder.SORT_FIELD.getPreferredName(), sorts);
160+
builder.field(FROM.getPreferredName(), from);
161+
if (size != null) {
162+
builder.field(SIZE.getPreferredName(), size);
163+
}
164+
builder.field(GAP_POLICY.getPreferredName(), gapPolicy);
165+
return builder;
166+
}
167+
168+
public static BucketSortPipelineAggregationBuilder parse(String reducerName, XContentParser parser) throws IOException {
169+
return PARSER.parse(parser, reducerName);
170+
}
171+
172+
@Override
173+
protected boolean overrideBucketsPath() {
174+
return true;
175+
}
176+
177+
@Override
178+
protected int doHashCode() {
179+
return Objects.hash(sorts, from, size, gapPolicy);
180+
}
181+
182+
@Override
183+
protected boolean doEquals(Object obj) {
184+
BucketSortPipelineAggregationBuilder other = (BucketSortPipelineAggregationBuilder) obj;
185+
return Objects.equals(sorts, other.sorts)
186+
&& Objects.equals(from, other.from)
187+
&& Objects.equals(size, other.size)
188+
&& Objects.equals(gapPolicy, other.gapPolicy);
189+
}
190+
191+
@Override
192+
public String getWriteableName() {
193+
return NAME;
194+
}
195+
}

0 commit comments

Comments
 (0)