Skip to content

Commit d46ea3c

Browse files
authored
Fix profiled global agg (backport of #71575) (#71634)
This fixes the `global` aggregator when `profile` is enabled. It does so by removing all of the special case handling for `global` aggs in `AggregationPhase` and having the global aggregator itself perform the scoped collection using the same trick that we use in filter-by-filter mode of the `filters` aggregation. Closes #71098
1 parent 463f853 commit d46ea3c

File tree

10 files changed

+166
-107
lines changed

10 files changed

+166
-107
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/AggConstructionContentionBenchmark.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,11 @@ public Query buildQuery(QueryBuilder builder) throws IOException {
261261
throw new UnsupportedOperationException();
262262
}
263263

264+
@Override
265+
public Query filterQuery(Query query) {
266+
throw new UnsupportedOperationException();
267+
}
268+
264269
@Override
265270
public IndexSettings getIndexSettings() {
266271
throw new UnsupportedOperationException();
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
setup:
2+
- do:
3+
bulk:
4+
refresh: true
5+
index: test
6+
body:
7+
- '{"index": {}}'
8+
- '{"name": "one"}'
9+
- '{"index": {}}'
10+
- '{"name": "two"}'
11+
- '{"index": {}}'
12+
- '{"name": "two"}'
13+
14+
---
15+
simple:
16+
- do:
17+
search:
18+
index: test
19+
body:
20+
size: 0
21+
query:
22+
match:
23+
name: two
24+
aggs:
25+
g:
26+
global: {}
27+
aggs:
28+
t:
29+
terms:
30+
field: name.keyword
31+
32+
- match: { aggregations.g.doc_count: 3 }
33+
- length: { aggregations.g.t.buckets: 2 }
34+
- match: { aggregations.g.t.buckets.0.key: two }
35+
- match: { aggregations.g.t.buckets.1.key: one }
36+
37+
---
38+
profile:
39+
- skip:
40+
version: " - 7.99.99"
41+
reason: fixed in 8.0.0 (to be backported to 7.13.0)
42+
43+
- do:
44+
search:
45+
index: test
46+
body:
47+
profile: true
48+
size: 0
49+
query:
50+
match:
51+
name: two
52+
aggs:
53+
g:
54+
global: {}
55+
aggs:
56+
t:
57+
terms:
58+
field: name.keyword
59+
60+
- match: { aggregations.g.doc_count: 3 }
61+
- length: { aggregations.g.t.buckets: 2 }
62+
- match: { aggregations.g.t.buckets.0.key: two }
63+
- match: { aggregations.g.t.buckets.1.key: one }
64+
- match: { profile.shards.0.aggregations.0.description: g }

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -983,7 +983,8 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
983983
context.bitsetFilterCache(),
984984
context.indexShard().shardId().hashCode(),
985985
context::getRelativeTimeInMillis,
986-
context::isCancelled
986+
context::isCancelled,
987+
context::buildFilteredQuery
987988
);
988989
context.addReleasable(aggContext);
989990
try {

server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java

Lines changed: 17 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,13 @@
88
package org.elasticsearch.search.aggregations;
99

1010
import org.apache.lucene.search.Collector;
11-
import org.apache.lucene.search.Query;
1211
import org.elasticsearch.common.inject.Inject;
13-
import org.elasticsearch.common.lucene.search.Queries;
14-
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregator;
1512
import org.elasticsearch.search.internal.SearchContext;
1613
import org.elasticsearch.search.profile.query.CollectorResult;
1714
import org.elasticsearch.search.profile.query.InternalProfileCollector;
18-
import org.elasticsearch.search.query.QueryPhaseExecutionException;
1915

2016
import java.io.IOException;
2117
import java.util.ArrayList;
22-
import java.util.Collections;
2318
import java.util.List;
2419

2520
/**
@@ -32,31 +27,24 @@ public AggregationPhase() {
3227
}
3328

3429
public void preProcess(SearchContext context) {
35-
if (context.aggregations() != null) {
36-
List<Aggregator> collectors = new ArrayList<>();
37-
Aggregator[] aggregators;
38-
try {
39-
aggregators = context.aggregations().factories().createTopLevelAggregators();
40-
for (int i = 0; i < aggregators.length; i++) {
41-
if (aggregators[i] instanceof GlobalAggregator == false) {
42-
collectors.add(aggregators[i]);
43-
}
44-
}
45-
context.aggregations().aggregators(aggregators);
46-
if (collectors.isEmpty() == false) {
47-
Collector collector = MultiBucketCollector.wrap(true, collectors);
48-
((BucketCollector)collector).preCollection();
49-
if (context.getProfilers() != null) {
50-
collector = new InternalProfileCollector(collector, CollectorResult.REASON_AGGREGATION,
51-
// TODO: report on child aggs as well
52-
Collections.emptyList());
53-
}
54-
context.queryCollectors().put(AggregationPhase.class, collector);
55-
}
56-
} catch (IOException e) {
57-
throw new AggregationInitializationException("Could not initialize aggregators", e);
58-
}
30+
if (context.aggregations() == null) {
31+
return;
32+
}
33+
BucketCollector bucketCollector;
34+
try {
35+
context.aggregations().aggregators(context.aggregations().factories().createTopLevelAggregators());
36+
bucketCollector = MultiBucketCollector.wrap(
37+
true,
38+
org.elasticsearch.common.collect.List.of(context.aggregations().aggregators())
39+
);
40+
bucketCollector.preCollection();
41+
} catch (IOException e) {
42+
throw new AggregationInitializationException("Could not initialize aggregators", e);
5943
}
44+
Collector collector = context.getProfilers() == null
45+
? bucketCollector
46+
: new InternalProfileCollector(bucketCollector, CollectorResult.REASON_AGGREGATION, org.elasticsearch.common.collect.List.of());
47+
context.queryCollectors().put(AggregationPhase.class, collector);
6048
}
6149

6250
public void execute(SearchContext context) {
@@ -71,37 +59,6 @@ public void execute(SearchContext context) {
7159
}
7260

7361
Aggregator[] aggregators = context.aggregations().aggregators();
74-
List<Aggregator> globals = new ArrayList<>();
75-
for (int i = 0; i < aggregators.length; i++) {
76-
if (aggregators[i] instanceof GlobalAggregator) {
77-
globals.add(aggregators[i]);
78-
}
79-
}
80-
81-
// optimize the global collector based execution
82-
if (globals.isEmpty() == false) {
83-
BucketCollector globalsCollector = MultiBucketCollector.wrap(false, globals);
84-
Query query = context.buildFilteredQuery(Queries.newMatchAllQuery());
85-
86-
try {
87-
final Collector collector;
88-
if (context.getProfilers() == null) {
89-
collector = globalsCollector;
90-
} else {
91-
InternalProfileCollector profileCollector = new InternalProfileCollector(
92-
globalsCollector, CollectorResult.REASON_AGGREGATION_GLOBAL,
93-
// TODO: report on sub collectors
94-
Collections.emptyList());
95-
collector = profileCollector;
96-
// start a new profile with this collector
97-
context.getProfilers().addQueryProfiler().setCollector(profileCollector);
98-
}
99-
globalsCollector.preCollection();
100-
context.searcher().search(query, collector);
101-
} catch (Exception e) {
102-
throw new QueryPhaseExecutionException(context.shardTarget(), "Failed to execute global aggregators", e);
103-
}
104-
}
10562

10663
List<InternalAggregation> aggregations = new ArrayList<>(aggregators.length);
10764
if (context.aggregations().factories().context() != null) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregator.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,15 @@
88
package org.elasticsearch.search.aggregations.bucket.global;
99

1010
import org.apache.lucene.index.LeafReaderContext;
11+
import org.apache.lucene.search.BulkScorer;
12+
import org.apache.lucene.search.LeafCollector;
13+
import org.apache.lucene.search.MatchAllDocsQuery;
14+
import org.apache.lucene.search.Scorable;
15+
import org.apache.lucene.search.Weight;
1116
import org.elasticsearch.search.aggregations.AggregatorFactories;
1217
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
1318
import org.elasticsearch.search.aggregations.InternalAggregation;
1419
import org.elasticsearch.search.aggregations.LeafBucketCollector;
15-
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
1620
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
1721
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
1822
import org.elasticsearch.search.aggregations.support.AggregationContext;
@@ -21,22 +25,34 @@
2125
import java.util.Map;
2226

2327
public class GlobalAggregator extends BucketsAggregator implements SingleBucketAggregator {
28+
private final Weight weight;
2429

2530
public GlobalAggregator(String name, AggregatorFactories subFactories, AggregationContext context, Map<String, Object> metadata)
2631
throws IOException {
32+
2733
super(name, subFactories, context, null, CardinalityUpperBound.ONE, metadata);
34+
weight = context.filterQuery(new MatchAllDocsQuery()).createWeight(context.searcher(), scoreMode(), 1.0f);
2835
}
2936

3037
@Override
31-
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
32-
final LeafBucketCollector sub) throws IOException {
33-
return new LeafBucketCollectorBase(sub, null) {
38+
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
39+
// Run sub-aggregations on child documents
40+
BulkScorer scorer = weight.bulkScorer(ctx);
41+
if (scorer == null) {
42+
return LeafBucketCollector.NO_OP_COLLECTOR;
43+
}
44+
scorer.score(new LeafCollector() {
45+
@Override
46+
public void collect(int doc) throws IOException {
47+
collectBucket(sub, doc, 0);
48+
}
49+
3450
@Override
35-
public void collect(int doc, long bucket) throws IOException {
36-
assert bucket == 0 : "global aggregator can only be a top level aggregator";
37-
collectBucket(sub, doc, bucket);
51+
public void setScorer(Scorable scorer) throws IOException {
52+
sub.setScorer(scorer);
3853
}
39-
};
54+
}, ctx.reader().getLiveDocs());
55+
return LeafBucketCollector.NO_OP_COLLECTOR;
4056
}
4157

4258
@Override

server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,13 @@ public final AggregationUsageService getUsageService() {
160160
*/
161161
public abstract Query buildQuery(QueryBuilder builder) throws IOException;
162162

163+
/**
164+
* Add filters from slice or filtered aliases. If you make a new query
165+
* and don't combine it with the {@link #query() top level query} then
166+
* you must provide it to this method.
167+
*/
168+
public abstract Query filterQuery(Query query);
169+
163170
/**
164171
* The settings for the index against which this search is running.
165172
*/
@@ -259,6 +266,7 @@ public static class ProductionAggregationContext extends AggregationContext {
259266
private final int randomSeed;
260267
private final LongSupplier relativeTimeInMillis;
261268
private final Supplier<Boolean> isCancelled;
269+
private final Function<Query, Query> filterQuery;
262270

263271
private final List<Aggregator> releaseMe = new ArrayList<>();
264272

@@ -273,7 +281,8 @@ public ProductionAggregationContext(
273281
BitsetFilterCache bitsetFilterCache,
274282
int randomSeed,
275283
LongSupplier relativeTimeInMillis,
276-
Supplier<Boolean> isCancelled
284+
Supplier<Boolean> isCancelled,
285+
Function<Query, Query> filterQuery
277286
) {
278287
this.context = context;
279288
if (bytesToPreallocate == 0) {
@@ -303,6 +312,7 @@ public ProductionAggregationContext(
303312
this.randomSeed = randomSeed;
304313
this.relativeTimeInMillis = relativeTimeInMillis;
305314
this.isCancelled = isCancelled;
315+
this.filterQuery = filterQuery;
306316
}
307317

308318
@Override
@@ -378,6 +388,11 @@ public Query buildQuery(QueryBuilder builder) throws IOException {
378388
return Rewriteable.rewrite(builder, context, true).toQuery(context);
379389
}
380390

391+
@Override
392+
public Query filterQuery(Query query) {
393+
return filterQuery.apply(query);
394+
}
395+
381396
@Override
382397
public IndexSettings getIndexSettings() {
383398
return context.getIndexSettings();

server/src/test/java/org/elasticsearch/search/aggregations/bucket/DocCountProviderTests.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
import org.elasticsearch.index.mapper.DocCountFieldMapper;
1919
import org.elasticsearch.index.mapper.MappedFieldType;
2020
import org.elasticsearch.index.mapper.NumberFieldMapper;
21+
import org.elasticsearch.index.query.MatchAllQueryBuilder;
22+
import org.elasticsearch.search.aggregations.AggregationBuilder;
2123
import org.elasticsearch.search.aggregations.AggregatorTestCase;
22-
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
23-
import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal;
24+
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
25+
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
2426

2527
import java.io.IOException;
2628
import org.elasticsearch.common.collect.List;
@@ -88,10 +90,10 @@ public void testQueryFiltering() throws IOException {
8890

8991
private void testAggregation(Query query,
9092
CheckedConsumer<RandomIndexWriter, IOException> indexer,
91-
Consumer<InternalGlobal> verify) throws IOException {
92-
GlobalAggregationBuilder aggregationBuilder = new GlobalAggregationBuilder("_name");
93+
Consumer<InternalFilter> verify) throws IOException {
94+
AggregationBuilder builder = new FilterAggregationBuilder("f", new MatchAllQueryBuilder());
9395
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NUMBER_FIELD, NumberFieldMapper.NumberType.LONG);
9496
MappedFieldType docCountFieldType = new DocCountFieldMapper.DocCountFieldType();
95-
testCase(aggregationBuilder, query, indexer, verify, fieldType, docCountFieldType);
97+
testCase(builder, query, indexer, verify, fieldType, docCountFieldType);
9698
}
9799
}

0 commit comments

Comments
 (0)