Skip to content

Commit 65ba372

Browse files
Address review feedback
1 parent 0382e6a commit 65ba372

File tree

5 files changed

+248
-87
lines changed

5 files changed

+248
-87
lines changed

core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketsort/BucketSortPipelineAggregationBuilder.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import org.elasticsearch.common.xcontent.ObjectParser;
2626
import org.elasticsearch.common.xcontent.XContentBuilder;
2727
import org.elasticsearch.common.xcontent.XContentParser;
28+
import org.elasticsearch.search.aggregations.AggregationBuilder;
29+
import org.elasticsearch.search.aggregations.AggregatorFactory;
30+
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
2831
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
2932
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
3033
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@@ -34,6 +37,7 @@
3437

3538
import java.io.IOException;
3639
import java.util.ArrayList;
40+
import java.util.Arrays;
3741
import java.util.Collections;
3842
import java.util.List;
3943
import java.util.Locale;
@@ -140,6 +144,16 @@ protected PipelineAggregator createInternal(Map<String, Object> metaData) throws
140144
return new BucketSortPipelineAggregator(name, sorts, from, size, gapPolicy, metaData);
141145
}
142146

147+
@Override
148+
public void doValidate(AggregatorFactory<?> parent, List<AggregationBuilder> aggFactories,
149+
List<PipelineAggregationBuilder> pipelineAggregatoractories) {
150+
if (sorts.isEmpty() && size == null && from == 0) {
151+
throw new IllegalStateException("[" + name + "] is configured to perform nothing. Please set either of "
152+
+ Arrays.asList(SearchSourceBuilder.SORT_FIELD.getPreferredName(), SIZE.getPreferredName(), FROM.getPreferredName())
153+
+ " to use " + NAME);
154+
}
155+
}
156+
143157
@Override
144158
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
145159
builder.field(SearchSourceBuilder.SORT_FIELD.getPreferredName(), sorts);

core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketsort/BucketSortPipelineAggregator.java

Lines changed: 47 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.io.IOException;
3636
import java.util.ArrayList;
3737
import java.util.Collections;
38+
import java.util.HashMap;
3839
import java.util.LinkedList;
3940
import java.util.List;
4041
import java.util.Map;
@@ -85,97 +86,93 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext
8586
(InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket>) aggregation;
8687
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = originalAgg.getBuckets();
8788
int bucketsCount = buckets.size();
88-
int offset = reduceContext.isFinalReduce() ? from : 0;
8989
int currentSize = size == null ? bucketsCount : size;
9090

91-
if (offset >= bucketsCount) {
91+
if (from >= bucketsCount) {
9292
return originalAgg.create(Collections.emptyList());
9393
}
9494

95-
int resultSize = Math.min(currentSize, bucketsCount - offset);
96-
9795
// If no sorting needs to take place, we just truncate and return
9896
if (sorts.size() == 0) {
99-
return originalAgg.create(truncate(buckets, offset, currentSize));
97+
return originalAgg.create(new ArrayList<>(buckets.subList(from, Math.min(from + currentSize, bucketsCount))));
10098
}
10199

102-
int queueSize = Math.min(offset + currentSize, bucketsCount);
100+
int queueSize = Math.min(from + currentSize, bucketsCount);
103101
PriorityQueue<ComparableBucket> ordered = new TopNPriorityQueue(queueSize);
104102
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
105-
ordered.insertWithOverflow(new ComparableBucket(originalAgg, bucket));
103+
ComparableBucket comparableBucket = new ComparableBucket(originalAgg, bucket);
104+
if (comparableBucket.skip() == false) {
105+
ordered.insertWithOverflow(new ComparableBucket(originalAgg, bucket));
106+
}
106107
}
107108

109+
int resultSize = Math.max(ordered.size() - from, 0);
110+
108111
// Popping from the priority queue returns the least element. The elements we want to skip due to offset would pop last.
109112
// Thus, we just have to pop as many elements as we expect in results and store them in reverse order.
110113
LinkedList<InternalMultiBucketAggregation.InternalBucket> newBuckets = new LinkedList<>();
111-
for (int i = resultSize - 1; i >= 0; --i) {
112-
ComparableBucket comparableBucket = ordered.pop();
113-
if (comparableBucket.skip == false) {
114-
newBuckets.addFirst(comparableBucket.getInternalBucket());
115-
}
114+
for (int i = 0; i < resultSize; ++i) {
115+
newBuckets.addFirst(ordered.pop().internalBucket);
116116
}
117117
return originalAgg.create(newBuckets);
118118
}
119119

120-
private static List<InternalMultiBucketAggregation.InternalBucket> truncate(
121-
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets, int offset, int size) {
122-
123-
List<InternalMultiBucketAggregation.InternalBucket> truncated = new ArrayList<>(size);
124-
for (int i = offset; i < offset + size; ++i) {
125-
truncated.add(buckets.get(i));
126-
}
127-
return truncated;
128-
}
129-
130120
private class ComparableBucket implements Comparable<ComparableBucket> {
131121

132122
private final MultiBucketsAggregation parentAgg;
133123
private final InternalMultiBucketAggregation.InternalBucket internalBucket;
134-
135-
/**
136-
* Whether the bucket should be skipped due to the gap policy
137-
*/
138-
private boolean skip = false;
124+
private final Map<FieldSortBuilder, Comparable<Object>> sortValues;
139125

140126
private ComparableBucket(MultiBucketsAggregation parentAgg, InternalMultiBucketAggregation.InternalBucket internalBucket) {
141127
this.parentAgg = parentAgg;
142128
this.internalBucket = internalBucket;
129+
this.sortValues = resolveAndCacheSortValues();
143130
}
144131

145-
private InternalMultiBucketAggregation.InternalBucket getInternalBucket() {
146-
return internalBucket;
147-
}
148-
149-
@Override
150-
public int compareTo(ComparableBucket that) {
132+
private final Map<FieldSortBuilder, Comparable<Object>> resolveAndCacheSortValues() {
133+
Map<FieldSortBuilder, Comparable<Object>> resolved = new HashMap<>();
151134
for (FieldSortBuilder sort : sorts) {
152135
String sortField = sort.getFieldName();
153-
int compareResult = "_key".equals(sortField) ? compareKeys(this, that) : comparePathValues(sortField, this, that);
154-
if (compareResult != 0) {
155-
return sort.order() == SortOrder.DESC ? compareResult : -compareResult;
136+
if ("_key".equals(sortField)) {
137+
resolved.put(sort, (Comparable<Object>) internalBucket.getKey());
138+
} else {
139+
Double bucketValue = BucketHelpers.resolveBucketValue(parentAgg, internalBucket, sortField, gapPolicy);
140+
if (GapPolicy.SKIP == gapPolicy && Double.isNaN(bucketValue)) {
141+
continue;
142+
}
143+
resolved.put(sort, (Comparable<Object>) (Object) bucketValue);
156144
}
157145
}
158-
return 0;
146+
return resolved;
159147
}
160148

161-
private int compareKeys(ComparableBucket b1, ComparableBucket b2) {
162-
Comparable<Object> b1Key = (Comparable<Object>) b1.internalBucket.getKey();
163-
Comparable<Object> b2Key = (Comparable<Object>) b2.internalBucket.getKey();
164-
return b1Key.compareTo(b2Key);
149+
/**
150+
* Whether the bucket should be skipped due to the gap policy
151+
*/
152+
private boolean skip() {
153+
return sortValues.isEmpty();
165154
}
166155

167-
private int comparePathValues(String sortField, ComparableBucket b1, ComparableBucket b2) {
168-
Double b1Value = BucketHelpers.resolveBucketValue(parentAgg, b1.internalBucket, sortField, gapPolicy);
169-
Double b2Value = BucketHelpers.resolveBucketValue(parentAgg, b2.internalBucket, sortField, gapPolicy);
170-
if (GapPolicy.SKIP == gapPolicy) {
171-
if (Double.isNaN(b1Value)) {
172-
b1.skip = true;
156+
@Override
157+
public int compareTo(ComparableBucket that) {
158+
int compareResult = 0;
159+
for (FieldSortBuilder sort : sorts) {
160+
Comparable<Object> thisValue = this.sortValues.get(sort);
161+
Comparable<Object> thatValue = that.sortValues.get(sort);
162+
if (thisValue == null && thatValue == null) {
163+
continue;
164+
} else if (thisValue == null) {
165+
return -1;
166+
} else if (thatValue == null) {
167+
return 1;
168+
} else {
169+
compareResult = sort.order() == SortOrder.DESC ? thisValue.compareTo(thatValue) : -thisValue.compareTo(thatValue);
173170
}
174-
if (Double.isNaN(b2Value)) {
175-
b2.skip = true;
171+
if (compareResult != 0) {
172+
break;
176173
}
177174
}
178-
return b1Value.compareTo(b2Value);
175+
return compareResult;
179176
}
180177
}
181178

0 commit comments

Comments
 (0)