Skip to content

Commit 713c07e

Browse files
authored
Add early termination support to BucketCollector (#33279)
This commit adds the support to early terminate the collection of a leaf in the aggregation framework. This change introduces a MultiBucketCollector which handles CollectionTerminatedException exactly like the Lucene MultiCollector. Any aggregator can now throw a CollectionTerminatedException without stopping the collection of a sibling aggregator. This is useful for aggregators that can infer their result without visiting all documents (e.g.: a min/max aggregation on a match_all query).
1 parent 3c367a2 commit 713c07e

File tree

11 files changed

+483
-68
lines changed

11 files changed

+483
-68
lines changed

docs/reference/search/profile.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,7 @@ And the response:
596596
]
597597
},
598598
{
599-
"name": "BucketCollector: [[my_scoped_agg, my_global_agg]]",
599+
"name": "MultiBucketCollector: [[my_scoped_agg, my_global_agg]]",
600600
"reason": "aggregation",
601601
"time_in_nanos": 8273
602602
}

server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public void preProcess(SearchContext context) {
6060
}
6161
context.aggregations().aggregators(aggregators);
6262
if (!collectors.isEmpty()) {
63-
Collector collector = BucketCollector.wrap(collectors);
63+
Collector collector = MultiBucketCollector.wrap(collectors);
6464
((BucketCollector)collector).preCollection();
6565
if (context.getProfilers() != null) {
6666
collector = new InternalProfileCollector(collector, CollectorResult.REASON_AGGREGATION,
@@ -97,7 +97,7 @@ public void execute(SearchContext context) {
9797

9898
// optimize the global collector based execution
9999
if (!globals.isEmpty()) {
100-
BucketCollector globalsCollector = BucketCollector.wrap(globals);
100+
BucketCollector globalsCollector = MultiBucketCollector.wrap(globals);
101101
Query query = context.buildFilteredQuery(Queries.newMatchAllQuery());
102102

103103
try {

server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ protected void doPreCollection() throws IOException {
183183
@Override
184184
public final void preCollection() throws IOException {
185185
List<BucketCollector> collectors = Arrays.asList(subAggregators);
186-
collectableSubAggregators = BucketCollector.wrap(collectors);
186+
collectableSubAggregators = MultiBucketCollector.wrap(collectors);
187187
doPreCollection();
188188
collectableSubAggregators.preCollection();
189189
}

server/src/main/java/org/elasticsearch/search/aggregations/BucketCollector.java

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,6 @@
2424
import org.apache.lucene.search.Collector;
2525

2626
import java.io.IOException;
27-
import java.util.ArrayList;
28-
import java.util.Arrays;
29-
import java.util.List;
30-
import java.util.stream.StreamSupport;
3127

3228
/**
3329
* A Collector that can collect data in separate buckets.
@@ -54,61 +50,6 @@ public boolean needsScores() {
5450
}
5551
};
5652

57-
/**
58-
* Wrap the given collectors into a single instance.
59-
*/
60-
public static BucketCollector wrap(Iterable<? extends BucketCollector> collectorList) {
61-
final BucketCollector[] collectors =
62-
StreamSupport.stream(collectorList.spliterator(), false).toArray(size -> new BucketCollector[size]);
63-
switch (collectors.length) {
64-
case 0:
65-
return NO_OP_COLLECTOR;
66-
case 1:
67-
return collectors[0];
68-
default:
69-
return new BucketCollector() {
70-
71-
@Override
72-
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
73-
List<LeafBucketCollector> leafCollectors = new ArrayList<>(collectors.length);
74-
for (BucketCollector c : collectors) {
75-
leafCollectors.add(c.getLeafCollector(ctx));
76-
}
77-
return LeafBucketCollector.wrap(leafCollectors);
78-
}
79-
80-
@Override
81-
public void preCollection() throws IOException {
82-
for (BucketCollector collector : collectors) {
83-
collector.preCollection();
84-
}
85-
}
86-
87-
@Override
88-
public void postCollection() throws IOException {
89-
for (BucketCollector collector : collectors) {
90-
collector.postCollection();
91-
}
92-
}
93-
94-
@Override
95-
public boolean needsScores() {
96-
for (BucketCollector collector : collectors) {
97-
if (collector.needsScores()) {
98-
return true;
99-
}
100-
}
101-
return false;
102-
}
103-
104-
@Override
105-
public String toString() {
106-
return Arrays.toString(collectors);
107-
}
108-
};
109-
}
110-
}
111-
11253
@Override
11354
public abstract LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException;
11455

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.search.aggregations;
21+
22+
import org.apache.lucene.index.LeafReaderContext;
23+
import org.apache.lucene.search.CollectionTerminatedException;
24+
import org.apache.lucene.search.LeafCollector;
25+
import org.apache.lucene.search.MultiCollector;
26+
import org.apache.lucene.search.ScoreCachingWrappingScorer;
27+
import org.apache.lucene.search.Scorer;
28+
29+
import java.io.IOException;
30+
import java.util.ArrayList;
31+
import java.util.Arrays;
32+
import java.util.List;
33+
34+
/**
35+
* A {@link BucketCollector} which allows running a bucket collection with several
36+
* {@link BucketCollector}s. It is similar to the {@link MultiCollector} except that the
37+
* {@link #wrap} method filters out the {@link BucketCollector#NO_OP_COLLECTOR}s and not
38+
* the null ones.
39+
*/
40+
public class MultiBucketCollector extends BucketCollector {
41+
42+
/** See {@link #wrap(Iterable)}. */
43+
public static BucketCollector wrap(BucketCollector... collectors) {
44+
return wrap(Arrays.asList(collectors));
45+
}
46+
47+
/**
48+
* Wraps a list of {@link BucketCollector}s with a {@link MultiBucketCollector}. This
49+
* method works as follows:
50+
* <ul>
51+
* <li>Filters out the {@link BucketCollector#NO_OP_COLLECTOR}s collectors, so they are not used
52+
* during search time.
53+
* <li>If the input contains 1 real collector, it is returned.
54+
* <li>Otherwise the method returns a {@link MultiBucketCollector} which wraps the
55+
* non-{@link BucketCollector#NO_OP_COLLECTOR} collectors.
56+
* </ul>
57+
*/
58+
public static BucketCollector wrap(Iterable<? extends BucketCollector> collectors) {
59+
// For the user's convenience, we allow NO_OP collectors to be passed.
60+
// However, to improve performance, these null collectors are found
61+
// and dropped from the array we save for actual collection time.
62+
int n = 0;
63+
for (BucketCollector c : collectors) {
64+
if (c != NO_OP_COLLECTOR) {
65+
n++;
66+
}
67+
}
68+
69+
if (n == 0) {
70+
return NO_OP_COLLECTOR;
71+
} else if (n == 1) {
72+
// only 1 Collector - return it.
73+
BucketCollector col = null;
74+
for (BucketCollector c : collectors) {
75+
if (c != null) {
76+
col = c;
77+
break;
78+
}
79+
}
80+
return col;
81+
} else {
82+
BucketCollector[] colls = new BucketCollector[n];
83+
n = 0;
84+
for (BucketCollector c : collectors) {
85+
if (c != null) {
86+
colls[n++] = c;
87+
}
88+
}
89+
return new MultiBucketCollector(colls);
90+
}
91+
}
92+
93+
private final boolean cacheScores;
94+
private final BucketCollector[] collectors;
95+
96+
private MultiBucketCollector(BucketCollector... collectors) {
97+
this.collectors = collectors;
98+
int numNeedsScores = 0;
99+
for (BucketCollector collector : collectors) {
100+
if (collector.needsScores()) {
101+
numNeedsScores += 1;
102+
}
103+
}
104+
this.cacheScores = numNeedsScores >= 2;
105+
}
106+
107+
@Override
108+
public void preCollection() throws IOException {
109+
for (BucketCollector collector : collectors) {
110+
collector.preCollection();
111+
}
112+
}
113+
114+
@Override
115+
public void postCollection() throws IOException {
116+
for (BucketCollector collector : collectors) {
117+
collector.postCollection();
118+
}
119+
}
120+
121+
@Override
122+
public boolean needsScores() {
123+
for (BucketCollector collector : collectors) {
124+
if (collector.needsScores()) {
125+
return true;
126+
}
127+
}
128+
return false;
129+
}
130+
131+
@Override
132+
public String toString() {
133+
return Arrays.toString(collectors);
134+
}
135+
136+
@Override
137+
public LeafBucketCollector getLeafCollector(LeafReaderContext context) throws IOException {
138+
final List<LeafBucketCollector> leafCollectors = new ArrayList<>();
139+
for (BucketCollector collector : collectors) {
140+
final LeafBucketCollector leafCollector;
141+
try {
142+
leafCollector = collector.getLeafCollector(context);
143+
} catch (CollectionTerminatedException e) {
144+
// this leaf collector does not need this segment
145+
continue;
146+
}
147+
leafCollectors.add(leafCollector);
148+
}
149+
switch (leafCollectors.size()) {
150+
case 0:
151+
throw new CollectionTerminatedException();
152+
case 1:
153+
return leafCollectors.get(0);
154+
default:
155+
return new MultiLeafBucketCollector(leafCollectors, cacheScores);
156+
}
157+
}
158+
159+
private static class MultiLeafBucketCollector extends LeafBucketCollector {
160+
161+
private final boolean cacheScores;
162+
private final LeafBucketCollector[] collectors;
163+
private int numCollectors;
164+
165+
private MultiLeafBucketCollector(List<LeafBucketCollector> collectors, boolean cacheScores) {
166+
this.collectors = collectors.toArray(new LeafBucketCollector[collectors.size()]);
167+
this.cacheScores = cacheScores;
168+
this.numCollectors = this.collectors.length;
169+
}
170+
171+
@Override
172+
public void setScorer(Scorer scorer) throws IOException {
173+
if (cacheScores) {
174+
scorer = new ScoreCachingWrappingScorer(scorer);
175+
}
176+
for (int i = 0; i < numCollectors; ++i) {
177+
final LeafCollector c = collectors[i];
178+
c.setScorer(scorer);
179+
}
180+
}
181+
182+
private void removeCollector(int i) {
183+
System.arraycopy(collectors, i + 1, collectors, i, numCollectors - i - 1);
184+
--numCollectors;
185+
collectors[numCollectors] = null;
186+
}
187+
188+
@Override
189+
public void collect(int doc, long bucket) throws IOException {
190+
final LeafBucketCollector[] collectors = this.collectors;
191+
int numCollectors = this.numCollectors;
192+
for (int i = 0; i < numCollectors; ) {
193+
final LeafBucketCollector collector = collectors[i];
194+
try {
195+
collector.collect(doc, bucket);
196+
++i;
197+
} catch (CollectionTerminatedException e) {
198+
removeCollector(i);
199+
numCollectors = this.numCollectors;
200+
if (numCollectors == 0) {
201+
throw new CollectionTerminatedException();
202+
}
203+
}
204+
}
205+
}
206+
}
207+
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.search.aggregations.BucketCollector;
3434
import org.elasticsearch.search.aggregations.InternalAggregation;
3535
import org.elasticsearch.search.aggregations.LeafBucketCollector;
36+
import org.elasticsearch.search.aggregations.MultiBucketCollector;
3637
import org.elasticsearch.search.internal.SearchContext;
3738

3839
import java.io.IOException;
@@ -90,7 +91,7 @@ public boolean needsScores() {
9091
/** Set the deferred collectors. */
9192
@Override
9293
public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
93-
this.collector = BucketCollector.wrap(deferredCollectors);
94+
this.collector = MultiBucketCollector.wrap(deferredCollectors);
9495
}
9596

9697
private void finishLeaf() {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.search.aggregations.Aggregator;
2323
import org.elasticsearch.search.aggregations.AggregatorFactories;
2424
import org.elasticsearch.search.aggregations.BucketCollector;
25+
import org.elasticsearch.search.aggregations.MultiBucketCollector;
2526
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregator;
2627
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
2728
import org.elasticsearch.search.internal.SearchContext;
@@ -59,7 +60,7 @@ protected void doPreCollection() throws IOException {
5960
recordingWrapper.setDeferredCollector(deferredCollectors);
6061
collectors.add(recordingWrapper);
6162
}
62-
collectableSubAggregators = BucketCollector.wrap(collectors);
63+
collectableSubAggregators = MultiBucketCollector.wrap(collectors);
6364
}
6465

6566
public static boolean descendsFromGlobalAggregator(Aggregator parent) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.search.aggregations.BucketCollector;
3232
import org.elasticsearch.search.aggregations.InternalAggregation;
3333
import org.elasticsearch.search.aggregations.LeafBucketCollector;
34+
import org.elasticsearch.search.aggregations.MultiBucketCollector;
3435
import org.elasticsearch.search.internal.SearchContext;
3536

3637
import java.io.IOException;
@@ -61,7 +62,7 @@ public MergingBucketsDeferringCollector(SearchContext context) {
6162

6263
@Override
6364
public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
64-
this.collector = BucketCollector.wrap(deferredCollectors);
65+
this.collector = MultiBucketCollector.wrap(deferredCollectors);
6566
}
6667

6768
@Override

server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.search.aggregations.InternalAggregation;
3939
import org.elasticsearch.search.aggregations.InternalAggregations;
4040
import org.elasticsearch.search.aggregations.LeafBucketCollector;
41+
import org.elasticsearch.search.aggregations.MultiBucketCollector;
4142
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
4243
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
4344
import org.elasticsearch.search.aggregations.support.ValuesSource;
@@ -93,7 +94,7 @@ protected void doClose() {
9394
@Override
9495
protected void doPreCollection() throws IOException {
9596
List<BucketCollector> collectors = Arrays.asList(subAggregators);
96-
deferredCollectors = BucketCollector.wrap(collectors);
97+
deferredCollectors = MultiBucketCollector.wrap(collectors);
9798
collectableSubAggregators = BucketCollector.NO_OP_COLLECTOR;
9899
}
99100

0 commit comments

Comments
 (0)