Skip to content

Commit b43f2ab

Browse files
authored
Use #updateTop to speed up InternalComposite#reduce (#71278)
Co-authored-by: guofeng.my <[email protected]>
1 parent 5bcd1f3 commit b43f2ab

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.search.aggregations.bucket.composite;
1010

1111
import org.apache.lucene.util.BytesRef;
12+
import org.apache.lucene.util.PriorityQueue;
1213
import org.elasticsearch.Version;
1314
import org.elasticsearch.common.io.stream.StreamInput;
1415
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -29,7 +30,6 @@
2930
import java.util.List;
3031
import java.util.Map;
3132
import java.util.Objects;
32-
import java.util.PriorityQueue;
3333
import java.util.Set;
3434

3535
public class InternalComposite
@@ -149,7 +149,12 @@ int[] getReverseMuls() {
149149

150150
@Override
151151
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
152-
PriorityQueue<BucketIterator> pq = new PriorityQueue<>(aggregations.size());
152+
PriorityQueue<BucketIterator> pq = new PriorityQueue<>(aggregations.size()) {
153+
@Override
154+
protected boolean lessThan(BucketIterator a, BucketIterator b) {
155+
return a.compareTo(b) < 0;
156+
}
157+
};
153158
boolean earlyTerminated = false;
154159
for (InternalAggregation agg : aggregations) {
155160
InternalComposite sortedAgg = (InternalComposite) agg;
@@ -163,7 +168,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
163168
List<InternalBucket> buckets = new ArrayList<>();
164169
List<InternalBucket> result = new ArrayList<>();
165170
while (pq.size() > 0) {
166-
BucketIterator bucketIt = pq.poll();
171+
BucketIterator bucketIt = pq.top();
167172
if (lastBucket != null && bucketIt.current.compareKey(lastBucket) != 0) {
168173
InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
169174
buckets.clear();
@@ -175,7 +180,9 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
175180
lastBucket = bucketIt.current;
176181
buckets.add(bucketIt.current);
177182
if (bucketIt.next() != null) {
178-
pq.add(bucketIt);
183+
pq.updateTop();
184+
} else {
185+
pq.pop();
179186
}
180187
}
181188
if (buckets.size() > 0) {

0 commit comments

Comments
 (0)